geoHeil
04/14/2022, 4:07 AMgeoHeil
04/14/2022, 4:08 AMgeoHeil
04/14/2022, 6:40 AMgeoHeil
04/14/2022, 6:40 AMsandy
04/14/2022, 3:18 PMgeoHeil
04/15/2022, 8:28 AMgeoHeil
04/15/2022, 8:36 AMgeoHeil
04/15/2022, 8:52 AMsandy
04/15/2022, 9:16 PMa1_cleaned
, put a tag on the run with the partition for `a1`:
yield RunRequest(tags={"latest_partition": ...})
Then, in your IOManager, grab that tag and add it as metadata that will show up on the materialization for `a1_cleaned`:
partition_tag_value = context.step_context._plan_data.pipeline_run.tags["latest_partition"]
context.add_output_metadata({"latest_partition": })
Then, in the sensor for combined
, you can access the metadata on a1_cleaned
with something like the following:
materialization = record.event_log_entry.dagster_event.event_specific_data.materialization
latest_partition = [me for me in materialization.metadata_entries if me.label == "latest_partition"][0].entry_data.text
geoHeil
04/16/2022, 6:42 AMgeoHeil
04/17/2022, 7:03 PMtags={"latest_partition": ...}
is ill-defined as a1 is partitioned, a1_cleaned is not partitoned and the run_sensor which listens to changes has a hard time getting the partition on a1. So where is the latest materialization available for a1 (to listen/subscribe to?geoHeil
04/17/2022, 8:37 PMgeoHeil
04/18/2022, 11:05 AMgeoHeil
04/19/2022, 2:21 PMgeoHeil
04/19/2022, 7:49 PMfrom dagster import asset, AssetGroup, repository, DailyPartitionsDefinition
my_partitions_def = DailyPartitionsDefinition(start_date="2022-04-10")
@asset(partitions_def=my_partitions_def)
def a1():
return 1
@asset
def a1_cleaned(a1):
return 11
@asset(partitions_def=my_partitions_def)
def a2():
return 2
@asset
def a2_cleaned(a2):
return 22
@asset
def a1_a2_combined(a1_cleaned, a2_cleaned):
return a1_cleaned + a2_cleaned
group_dummy = AssetGroup([a1, a2, a1_cleaned, a2_cleaned, a1_a2_combined])
asset_job_dummy = group_dummy.build_job("dummy_assets")
@repository
def my_job():
return [asset_job_dummy]
geoHeil
04/19/2022, 7:57 PMgroup_dummy = AssetGroup([a1, a2, a1_cleaned, a2_cleaned, a1_a2_combined])
contains all the assets (and also in the launchpad the full run configuration would be triggered for a global refresh of all the assets in the group. Could I get around the sensors and jobs per asset in this case and thus no longer need this quite complicated handling/propagation of the partitioned metadata?geoHeil
04/20/2022, 6:44 AMclaire
04/20/2022, 5:39 PMrecords = context.instance.get_event_records(
EventRecordsFilter(
event_type=DagsterEventType.ASSET_MATERIALIZATION,
asset_key=AssetKey("upstream_daily_partitioned_asset_1"),
)
)
materialized_partitions = set([record.event_log_entry.dagster_event.step_materialization_data.materialization.partition for record in records])
geoHeil
04/20/2022, 9:07 PMclaire
04/20/2022, 11:47 PMclaire
04/20/2022, 11:54 PMa1
in the sensor for a1_cleaned
?geoHeil
04/21/2022, 3:41 AMgeoHeil
04/21/2022, 3:44 AMclaire
04/21/2022, 4:49 PMget_event_records
on the instance rather than using the graphQL API, because graphQL API calls generally are just another layer of code that make calls to the instance.
I think that when backfilling and the metadata tags are out of order, you may just have to query for all materializations in a1
and see from the event log entries which partitions have been materialized, and compare those partitions against the metadata for the a1_cleaned
materializations.