Gil Tinde
02/16/2024, 11:47 AMScout
02/16/2024, 11:47 AMGil Tinde
02/16/2024, 11:49 AMScout
02/16/2024, 11:50 AMPartitionMapping
. A PartitionMapping
defines the relationship between partitions of different assets or between different partitions of the same asset.
In the case of sequential dependencies, where each partition should depend on the previous one, you would need to create a custom PartitionMapping
that expresses this relationship. The documentation does not provide a direct example of such a custom PartitionMapping
, but it does mention the concept of overriding default partition dependency rules by providing a PartitionMapping
.
Here's a conceptual example based on the documentation's guidance on partition mappings:
python
from dagster import PartitionMapping, PartitionKeyRange
class SequentialPartitionMapping(PartitionMapping):
def get_upstream_partitions_for_partition_range(
self,
downstream_partition_key_range: PartitionKeyRange,
downstream_partitions_def,
upstream_partitions_def,
) -> PartitionKeyRange:
# Assuming downstream_partition_key_range has a single partition key
downstream_partition_key = downstream_partition_key_range.start
# Calculate the previous partition key based on your partition scheme
previous_partition_key = calculate_previous_partition_key(downstream_partition_key)
return PartitionKeyRange(previous_partition_key, previous_partition_key)
def get_downstream_partitions_for_partition_range(
self,
upstream_partition_key_range: PartitionKeyRange,
downstream_partitions_def,
upstream_partitions_def,
) -> PartitionKeyRange:
# This method would define which downstream partitions are affected by an upstream range
# For sequential execution, you might not need to implement this method
raise NotImplementedError()
# Use the custom partition mapping in your asset definition
@asset(partitions_def=my_partitions_def, ins={"my_input": AssetIn(partition_mapping=SequentialPartitionMapping())})
def my_asset(context, my_input):
# Asset computation logic here
pass
In this example, calculate_previous_partition_key
is a placeholder for a function you would write to determine the previous partition key based on your partitioning scheme.
Please note that this is a conceptual example to illustrate the approach. The actual implementation will depend on your specific partitioning scheme and how you define and calculate partition keys. You may need to adjust the logic to fit the requirements of your partitions.
If you need further guidance on implementing a custom PartitionMapping
for your specific use case, you can refer to the Dagster documentation or seek help from the DagstScout
02/16/2024, 11:51 AM