Terry Lines
02/13/2024, 6:24 PMScout
02/13/2024, 6:24 PMpython @asset(partitions_def=DailyPartitionsDefinition("2023-01-01")) def daily_data(): ... @asset def aggregated_report(context, daily_data): <http://context.log.info|context.log.info>(daily_data) # the default IO manager returns a dictionary of outputs mapped by partition key {"2023-01-01": ..., "2023-01-02": ..., ...} ...
In this example, daily_data
is a partitioned asset, and aggregated_report
is an unpartitioned asset that depends on all partitions of daily_data
. Under the hood, Dagster applies the AllPartitionMapping
to the daily_data
input, which defines that all partitions of daily_data
are input partitions. If you want to define this explicitly, you can use the AssetIn
and AllPartitionMapping
as shown below: python @asset(ins={"daily_data": AssetIn(partition_mapping=AllPartitionMapping())}) def aggregated_report(context, daily_data): ...
This setup allows the downstream asset to include data from all partitions of the upstream asset, and when a new partition is added to the upstream asset, you can process just the new partition's data and append it to the downstream asset without the need to rerun old partitions.Terry Lines
02/13/2024, 6:31 PMScout
02/13/2024, 6:31 PM