Ismael Rodrigues
02/29/2024, 1:31 PMScout
02/29/2024, 1:31 PMIdentityPartitionMapping
, AllPartitionMapping
, and LastPartitionMapping
. These mappings define how partitions in downstream assets relate to partitions in upstream assets. If your dbt assets depend on the output of the main asset, you need to ensure that the partition mapping is set up correctly so that the dbt assets understand which partitions of the main asset they depend on. 2. Dependencies: Dependencies between assets are defined using the ins
parameter in the @asset
decorator. If your dbt assets depend on the main asset, you need to specify this dependency explicitly. 3. Partition Alignment: While it's not strictly necessary for all assets to use the same partitioning scheme, the dependencies between them need to be clearly defined. If your dbt assets are meant to run after the main asset, the partition mapping and dependencies should reflect that relationship. 4. Scheduling and Materialization Policies: The timing of asset materialization can also be influenced by scheduling and materialization policies. For instance, AutoMaterializePolicy
can be used to define rules for when assets should be materialized. Without more specific details about your asset definitions and how you've configured the dependencies and partition mappings, it's challenging to pinpoint the exact reason why your dbt assets did not start after your main assets. However, ensuring that your partition mappings and dependencies are correctly set up is a critical step. If you continue to face issues, you might want to review the partitioning and dependency configurations in your asset definitions, and ensure that the partition mappings align with the intended execution order of your assets. If necessary, you can also consult the documentation or community discussions for examples and guidance on setting up complex partitioning schemes and dependencies between assets.Ismael Rodrigues
02/29/2024, 3:29 PMScout
02/29/2024, 3:29 PMPartitionMapping
and implementing the get_downstream_partitions_for_partition_range
and get_upstream_partitions_for_partition_range
methods to define the relationship between partitions.
Here's a hypothetical example to illustrate how you might set up a custom partition mapping:
python
from dagster import PartitionMapping, PartitionKeyRange
class CustomPartitionMapping(PartitionMapping):
def get_downstream_partitions_for_partition_range(
self,
upstream_partition_key_range: PartitionKeyRange,
downstream_partitions_def: Optional[PartitionsDefinition],
upstream_partitions_def: PartitionsDefinition,
) -> PartitionKeyRange:
# Custom logic to map upstream partitions to downstream partitions
pass
def get_upstream_partitions_for_partition_range(
self,
downstream_partition_key_range: Optional[PartitionKeyRange],
downstream_partitions_def: Optional[PartitionsDefinition],
upstream_partitions_def: PartitionsDefinition,
) -> PartitionKeyRange:
# Custom logic to map downstream partitions to upstream partitions
pass
In this example, CustomPartitionMapping
is a subclass of PartitionMapping
where you would implement the logic to map between upstream and downstream partitions based on your specific requirements.
Please note that the above code is a template to show where you would insert your custom logic. The actual implementation would depend on how you want to map your partitions.
For your specific case, where you have a multidimensional partition with a date and a list of entities in your main asset, and daily partitions in your dbt assets, you would need to define a partiScout
02/29/2024, 3:30 PMIsmael Rodrigues
02/29/2024, 3:32 PMScout
02/29/2024, 3:32 PMpython
from dagster import asset, DailyPartitionsDefinition, MultiPartitionsDefinition, StaticPartitionsDefinition, PartitionMapping, PartitionKeyRange
# Define your custom partition mapping
class MyCustomPartitionMapping(PartitionMapping):
def get_downstream_partitions_for_partition_range(
self,
upstream_partition_key_range: PartitionKeyRange,
downstream_partitions_def,
upstream_partitions_def,
):
# Implement logic to map from upstream to downstream partitions
# For example, you might extract the date from the multidimensional key
# and map it to the corresponding daily partition in the downstream asset
pass
def get_upstream_partitions_for_partition_range(
self,
downstream_partition_key_range: PartitionKeyRange,
downstream_partitions_def,
upstream_partitions_def,
):
# Implement logic to map from downstream to upstream partitions
pass
# Define your upstream asset with multidimensional partitions
@asset(partitions_def=MultiPartitionsDefinition({
"date": DailyPartitionsDefinition(start_date="2022-01-01"),
"color": StaticPartitionsDefinition(["red", "yellow", "blue"]),
}))
def upstream_asset():
# Your computation logic here
pass
# Define your downstream asset with daily partitions
@asset(
partitions_def=DailyPartitionsDefinition(start_date="2022-01-01"),
ins={"upstream_asset": AssetIn(partition_mapping=MyCustomPartitionMapping())}
)
def downstream_asset(upstream_asset):
# Your computation logic here, which depends on the upstream_asset
pass
In this example, MyCustomPartitionMapping
is a subclass of PartitionMapping
that you would need to implement to define the logic for mapping between the multidimensional partitions of the upstream_asset
and the daily partitions of the downstream_asset
. The ins
parameter in the @asset
decorator for downstream_asset
specifies that it depends on `upstream_assScout
02/29/2024, 3:33 PMMyCustomPartitionMapping
to determine which partitions of upstream_asset
it depends on. Please note that this example is illustrative and does not contain the actual implementation logic for the partition mapping, which you would need to define based on your specific use case. The partition mapping logic will depend on how you want to relate the multidimensional partition keys to the daily partition keys.Ismael Rodrigues
02/29/2024, 3:35 PMjob_fidc_idtrust_reports_partitions_def = MultiPartitionsDefinition(
{
"entity": StaticPartitionsDefinition(ID_TRUST_ENTITIES),
"date": DailyPartitionsDefinition(start_date='2023-12-01', timezone='America/Fortaleza')
}
)
And then extracts only the DailyPartitionsDefinition from itScout
02/29/2024, 3:35 PMDailyPartitionsDefinition
from a MultiPartitionsDefinition
, you can subclass PartitionMapping
and implement the logic to map the multidimensional partition keys to the daily partition keys.
Here's an example of how you might define such a custom partition mapping:
python
from dagster import (
PartitionMapping,
PartitionKeyRange,
MultiPartitionsDefinition,
StaticPartitionsDefinition,
DailyPartitionsDefinition,
)
# Assuming ID_TRUST_ENTITIES is defined somewhere
ID_TRUST_ENTITIES = ["entity1", "entity2", "entity3"] # Example entities
# Define the MultiPartitionsDefinition
job_fidc_idtrust_reports_partitions_def = MultiPartitionsDefinition(
{
"entity": StaticPartitionsDefinition(ID_TRUST_ENTITIES),
"date": DailyPartitionsDefinition(start_date='2023-12-01', timezone='America/Fortaleza')
}
)
# Custom partition mapping to extract only the DailyPartitionsDefinition
class ExtractDailyPartitionMapping(PartitionMapping):
def get_downstream_partitions_for_partition_range(
self,
upstream_partition_key_range: PartitionKeyRange,
downstream_partitions_def,
upstream_partitions_def: MultiPartitionsDefinition,
):
# Assuming that the downstream asset is partitioned daily
# Extract the date component from the upstream partition keys
date_partition_keys = [
key["date"] for key in upstream_partition_key_range.get_partition_keys()
]
return PartitionKeyRange(date_partition_keys)
def get_upstream_partitions_for_partition_range(
self,
downstream_partition_key_range: PartitionKeyRange,
downstream_partitions_def,
upstream_partitions_def: MultiPartitionsDefinition,
):
# Map each downstream daily partition key to a range of upstream partition keys
# that have the same date but different entities
upstream_partition_keys = []
for downstream_key in downstream_partition_key_range.get_partition_keys():
for entity in ID_TRUST_ENTITIES:
upstream_partition_keys.append(
{"entity": entity, "date": downstream_key}
)
return PartitionKeyRange(upstream_partition_keys)
# Use the custom partition mapping in your asset definitions
In this example, ExtractDailyPartitionMapping
is a custom partition mapping that you would use to map between the multidimensional partition keys of an upstream asset and the daily partition keys of a downstream asset. The get_downstream_partitions_for_partition_range
method extracts the date component from the upstream partition keys, while the get_upstream_partitions_for_partition_range
method maps each dowScout
02/29/2024, 3:36 PMIsmael Rodrigues
02/29/2024, 3:39 PMScout
02/29/2024, 3:39 PMMultiPartitionsDefinition
that includes both an entity and a date, and a downstream asset with only a DailyPartitionsDefinition
, the custom PartitionMapping
I described should allow you to define the relationship between these assets. The mapping ensures that the downstream asset's daily partitions correspond to the appropriate date partitions of the upstream asset, regardless of the entity. Here's how you might use the ExtractDailyPartitionMapping
in your asset definitions: python from dagster import asset, repository, AssetIn # Assuming the custom ExtractDailyPartitionMapping is defined as shown previously # Define your upstream asset with the MultiPartitionsDefinition @asset(partitions_def=job_fidc_idtrust_reports_partitions_def) def upstream_asset(): # Your computation logic here pass # Define your downstream asset with DailyPartitionsDefinition @asset( partitions_def=DailyPartitionsDefinition(start_date='2023-12-01', timezone='America/Fortaleza'), ins={"upstream_asset": AssetIn(partition_mapping=ExtractDailyPartitionMapping())} ) def downstream_asset(upstream_asset): # Your computation logic here, which depends on the upstream_asset pass # Collect assets into a repository @repository def my_repository(): return [upstream_asset, downstream_asset]
In this setup, downstream_asset
is defined to depend on upstream_asset
through the ins
parameter, which specifies the custom ExtractDailyPartitionMapping
. This mapping tells Dagster how to resolve the dependencies between the partitions of the two assets. When you materialize downstream_asset
, Dagster will use the ExtractDailyPartitionMapping
to determine which partitions of upstream_asset
are required. The get_downstream_partitions_for_partition_range
method will be used to map a range of upstream partition keys to the corresponding downstream partition keys, and vice versa for get_upstream_partitions_for_partition_range
. This should work fine as long as the partition mapping is implemented correctly and aligns with the actual partitioning logic of your assets. If you encounter any issues, you may need to adjust the partition mapping logic to match the specific requirements of your data and dependencies.Ismael Rodrigues
02/29/2024, 3:42 PMScout
02/29/2024, 3:42 PMPartitionKeyRange
object does not have a get_partition_keys()
method. Instead, it has start
and end
attributes that represent the range of partition keys. To correct the example, you would need to use these attributes to determine the appropriate partition keys for the downstream asset.
Here's a revised example of how you might define the `ExtractDailyPartitionMapping`:
python
from dagster import PartitionMapping, PartitionKeyRange
class ExtractDailyPartitionMapping(PartitionMapping):
def get_downstream_partitions_for_partition_range(
self,
upstream_partition_key_range: PartitionKeyRange,
downstream_partitions_def,
upstream_partitions_def,
):
# Assuming that the downstream asset is partitioned daily
# Extract the date component from the upstream partition keys
# Here we assume that the upstream_partition_key_range.start and .end are tuples
# where the first element is the entity and the second element is the date
start_date = upstream_partition_key_range.start[1]
end_date = upstream_partition_key_range.end[1]
return PartitionKeyRange(start_date, end_date)
def get_upstream_partitions_for_partition_range(
self,
downstream_partition_key_range: PartitionKeyRange,
downstream_partitions_def,
upstream_partitions_def,
):
# Map each downstream daily partition key to a range of upstream partition keys
# that have the same date but different entities
# Here we assume that the downstream_partition_key_range.start and .end are strings representing dates
start_date = downstream_partition_key_range.start
end_date = downstream_partition_key_range.end
# Assuming ID_TRUST_ENTITIES is a list of entities
upstream_partition_keys = [
(entity, date)
for entity in ID_TRUST_ENTITIES
for date in [start_date, end_date] # Replace with a range if necessary
]
return PartitionKeyRange(upstream_partition_keys[0], upstream_partition_keys[-1])
In this revised example, the get_downstream_partitions_for_partition_range
method extracts the date component from the start and end of the upstream_partition_key_range
and creates a new PartitionKeyRange
for the downstream partitions. The get_upstream_partitions_for_partition_range
method does the opposite: it takes the start and end dates from the downstream_partition_key_range
and creates a range of upstream partition keys that include all entities for those dates.
Please note that this example assumes that the partition keys are tuples where the first element is the entity and the second element is the date for the upstream asset, and stScout
02/29/2024, 3:43 PMIsmael Rodrigues
02/29/2024, 3:44 PMScout
02/29/2024, 3:44 PMPartitionKeyRange
object expects start
and end
to be strings, not tuples. If you're working with a MultiPartitionsDefinition
where the partition keys are tuples, you'll need to convert these tuples into a string representation that uniquely identifies each partition. Here's how you might adjust the get_upstream_partitions_for_partition_range
method to return a range of string partition keys: python from dagster import PartitionMapping, PartitionKeyRange class ExtractDailyPartitionMapping(PartitionMapping): # ... (other methods remain the same) def get_upstream_partitions_for_partition_range( self, downstream_partition_key_range: PartitionKeyRange, downstream_partitions_def, upstream_partitions_def, ): # Map each downstream daily partition key to a range of upstream partition keys # that have the same date but different entities start_date = downstream_partition_key_range.start end_date = downstream_partition_key_range.end # Generate a list of partition keys for each entity and date upstream_partition_keys = [ f"{entity}-{date}" # This format assumes entity and date are strings for entity in ID_TRUST_ENTITIES for date in [start_date, end_date] # Replace with a range if necessary ] # Return a PartitionKeyRange with the start and end keys as strings return PartitionKeyRange( start=upstream_partition_keys[0], end=upstream_partition_keys[-1] )
In this example, the get_upstream_partitions_for_partition_range
method generates a list of partition keys as strings by concatenating the entity and date with a delimiter (in this case, a hyphen). It then creates a PartitionKeyRange
with the first and last keys in this list. Please ensure that the format used to generate the string partition keys matches the format expected by your upstream asset's partitioning scheme. If your upstream asset expects a different format or delimiter, you should adjust the string concatenation accordingly.Ismael Rodrigues
02/29/2024, 3:49 PMScout
02/29/2024, 3:49 PMExtractDailyPartitionMapping
class is missing implementations for required abstract methods from the PartitionMapping
base class. To resolve this error, you need to implement all the abstract methods defined in the PartitionMapping
class.
As of my knowledge cutoff date, the PartitionMapping
class in Dagster requires you to implement the following methods:
- get_downstream_partitions_for_partitions
- get_upstream_partitions_for_partitions
- get_upstream_mapped_partitions_result_for_partitions
- description
Here's an example of how you might implement these methods in your ExtractDailyPartitionMapping
class:
python
from dagster import PartitionMapping, PartitionsDefinition, MappedPartition
class ExtractDailyPartitionMapping(PartitionMapping):
def get_downstream_partitions_for_partitions(self, upstream_partitions, downstream_partitions_def):
# Implement logic to map from upstream to downstream partitions
# This should return a list of partition keys for the downstream asset
return [upstream_partition.split('-')[1] for upstream_partition in upstream_partitions]
def get_upstream_partitions_for_partitions(self, downstream_partitions, upstream_partitions_def):
# Implement logic to map from downstream to upstream partitions
# This should return a list of partition keys for the upstream asset
return [f"{entity}-{downstream_partition}" for entity in ID_TRUST_ENTITIES for downstream_partition in downstream_partitions]
def get_upstream_mapped_partitions_result_for_partitions(self, downstream_partitions, upstream_partitions_def):
# Implement logic to map from downstream to upstream partitions
# This should return a list of MappedPartition objects
return [
MappedPartition(
upstream_partition_key=f"{entity}-{downstream_partition}",
downstream_partition_key=downstream_partition
)
for entity in ID_TRUST_ENTITIES
for downstream_partition in downstream_partitions
]
def description(self):
# Return a string description of the partition mapping
return "Extracts the daily partition from a multidimensional partition."
In this example, the get_downstream_partitions_for_partitions
method assumes that the upstream partition keys are strings in the format "entity-date" and extracts the date part to create the downstream partition keys. The get_upstream_partitions_for_partitions
method does the opposite, combining each entity with the downstream date partition to create the upstream partition keys.
The get_upstream_mapped_partitions_result_for_partitions
method returns a list of `MappedPartitioScout
02/29/2024, 3:49 PMdescription
method returns a human-readable description of the partition mapping. Please note that the actual implementation of these methods will depend on the specific structure of your partition keys and how you want to map between the upstream and downstream partitions. Adjust the logic in these methods to fit your use case.