I have an upstream asset with dynamic partitions. ...
# ask-ai
t
I have an upstream asset with dynamic partitions. I want a downstream asset which includes the data from all partitions. Can I append the data from an new partition without rerunning old partitions?
s
Yes, you can define a downstream asset that depends on all partitions of an upstream asset with dynamic partitions, and you can append data from a new partition without rerunning old partitions. The documentation provides an example of how to define an unpartitioned asset that depends on all upstream partitions. When an unpartitioned asset is downstream of a partitioned asset, Dagster automatically infers that the unpartitioned asset depends on all upstream partitions. Here is an example from the documentation on how to define such a relationship:
Copy code
python @asset(partitions_def=DailyPartitionsDefinition("2023-01-01")) def daily_data():     ...  @asset def aggregated_report(context, daily_data):     <http://context.log.info|context.log.info>(daily_data)  # the default IO manager returns a dictionary of outputs mapped by partition key {"2023-01-01": ..., "2023-01-02": ..., ...}     ...
In this example,
daily_data
is a partitioned asset, and
aggregated_report
is an unpartitioned asset that depends on all partitions of
daily_data
. Under the hood, Dagster applies the
AllPartitionMapping
to the
daily_data
input, which defines that all partitions of
daily_data
are input partitions. If you want to define this explicitly, you can use the
AssetIn
and
AllPartitionMapping
as shown below:
Copy code
python @asset(ins={"daily_data": AssetIn(partition_mapping=AllPartitionMapping())}) def aggregated_report(context, daily_data):     ...
This setup allows the downstream asset to include data from all partitions of the upstream asset, and when a new partition is added to the upstream asset, you can process just the new partition's data and append it to the downstream asset without the need to rerun old partitions.
t
@Scout I think this will materialize the downstream asset by accessing the data from all partitions, whereas I want to just append the new partition.
s
Apologies for the confusion. If you want to append data from a new partition without rerunning old partitions, you would typically use a partitioned asset downstream as well, where each partition of the downstream asset corresponds to a partition of the upstream asset. This way, when a new partition is available upstream, you can materialize the corresponding downstream partition, which effectively appends the new data. However, if you want to maintain a single downstream asset that accumulates data from all upstream partitions, you would need to manage the appending logic within the asset computation itself. This would involve checking if the partition has already been processed and then appending only the new data. The documentation does not provide a direct example of this use case, but you can achieve this by implementing custom logic in your asset computation function. Here's a conceptual approach: 1. Store metadata about which partitions have been processed in the downstream asset. 2. When the downstream asset computation is triggered, check the metadata to determine which partitions are new. 3. Load the data from the new partitions and append it to the existing downstream asset data. 4. Update the metadata to mark the new partitions as processed. This approach requires careful management of state and idempotency to ensure that partitions are not processed more than once and that the append operation does not duplicate data. Please note that this is a conceptual approach and the actual implementation details would depend on your specific data storage and processing requirements. If you need further guidance on implementing this pattern, you may want to explore Dagster's community channels or consider reaching out for more specific support. Produce dynamic partitions after an asse... button Thank you for your feedback!