Arslan Aziz
10/03/2022, 7:58 PMsandy
10/03/2022, 8:47 PMclaire
10/04/2022, 12:20 AM@multi_asset_sensor(asset_keys=[AssetKey("upstream")], job=downstream_job)
def my_sensor(context):
run_requests_by_partition = {}
for partition, materialization in context.latest_materialization_records_by_partition(
AssetKey("upstream")
).items():
run_requests_by_partition[partition] = downstream_job.run_request_for_partition(partition)
context.advance_cursor({AssetKey("upstream"): materialization})
return list(run_requests_by_partition.values())
Arslan Aziz
10/04/2022, 1:24 PMrun_request_for_partition
? I can't seem to find examples of either of these so I appreciate your help. My intent is to use metatadata from the asset materialization to set the run config for each partition.claire
10/04/2022, 4:52 PMevent_log_record.event_log_entry.dagster_event.event_specific_data.materialization.metadata_entries
We are currently missing the ability to add run config in run_request_for_partition
, I can file a PR and hopefully make it out in the next releaseDagster Bot
10/04/2022, 5:04 PM@dagster_bot <issue|docs|discussion> <title>
claire
10/04/2022, 5:04 PMDagster Bot
10/04/2022, 5:04 PMLucia Ambrogi
11/15/2022, 2:15 PMcontext.log_event(
AssetMaterialization(
asset_key=AssetKey('my_asset'),
partition=my_partition_str,
metadata={
'my_metadata_key': my_metadata_value
},
)
)
and for the asset_sensor I am implementing your multi_asset_sensor
suggestion above, where I replace run_requests_by_partition
with a RunRequest
as I want to pass a run_config to the job. The AssetMaterialization op and the multi_asset_sensor are in two different repos. My problem: the multi_asset_sensor is failing with the following error
dagster._core.errors.DagsterInvariantViolationError: Cannot get latest materialization by partition for assets with no partitions
Is my approach to pass the partition to the AssetMaterialization incorrect? Many thanks!claire
11/15/2022, 5:41 PMrun_request_for_partition
now accepts a run config argument!Lucia Ambrogi
11/16/2022, 4:05 PMdagster._core.errors.DagsterInvariantViolationError: Cannot get latest materialization by partition for assets with no partitions
raised as soon as I enable the sensor. Is there a way to check whether or not an asset is properly defined with a partition? As mentioned in my prev message, this is how I materialize my asset
context.log_event(
AssetMaterialization(
asset_key=AssetKey('my_asset'),
partition=my_partition_str,
metadata={
'my_metadata_key': my_metadata_value
},
)
)
so I d expect my asset to have a partitionclaire
11/17/2022, 7:03 PMlatest_materialization_by_partition
expects the asset to be a software-defined asset with a partitions definition, so dagster knows the full set of partitions to retrieve. I think what you can use instead is the context.materialization_records_for_key
method, which should fetch all the unconsumed materializations. You'll be able to fetch the partition off the event record