https://dagster.io/ logo
Title
a

Arslan Aziz

10/03/2022, 7:58 PM
Can an asset_sensor trigger a run per materialized partition of an asset? I have a pipeline that dynamically discovers new partitions and emits an AssetMaterialization for each partition to the same asset_key (but with unique partitions). My asset_sensor (which is configured on the same asset_key) only triggers a run for the latest partition. My expectation would be that the asset sensor triggers a run for each asset partition.
s

sandy

10/03/2022, 8:47 PM
@claire
c

claire

10/04/2022, 12:20 AM
Hi Arslan. You can use the multi-asset sensor to achieve this. Something like:
@multi_asset_sensor(asset_keys=[AssetKey("upstream")], job=downstream_job)
def my_sensor(context):
    run_requests_by_partition = {}
    for partition, materialization in context.latest_materialization_records_by_partition(
        AssetKey("upstream")
    ).items():
        run_requests_by_partition[partition] = downstream_job.run_request_for_partition(partition)
        context.advance_cursor({AssetKey("upstream"): materialization})
    return list(run_requests_by_partition.values())
❤️ 1
a

Arslan Aziz

10/04/2022, 1:24 PM
This is exactly what I need. Thank you!
@claire Is there a way that I can access the AssetMaterialization metadata within the sensor? And can I provide run_config to the to the job from within the call to
run_request_for_partition
? I can't seem to find examples of either of these so I appreciate your help. My intent is to use metatadata from the asset materialization to set the run config for each partition.
c

claire

10/04/2022, 4:52 PM
Hi Arslan. You can access the metadata by reading it off of the event log:
event_log_record.event_log_entry.dagster_event.event_specific_data.materialization.metadata_entries
We are currently missing the ability to add run config in
run_request_for_partition
, I can file a PR and hopefully make it out in the next release
Actually, for now I will just file an issue for tracking purposes. Not sure when this feature will make it out, but I will look into it
@Dagster Bot provide run config to run_request_for_partition
d

Dagster Bot

10/04/2022, 5:04 PM
Invalid command. Did you mean to create an issue or a discussion? Try
@dagster_bot <issue|docs|discussion> <title>
c

claire

10/04/2022, 5:04 PM
@Dagster Bot issue provide run config to run_request_for_partition
d

Dagster Bot

10/04/2022, 5:04 PM
l

Lucia Ambrogi

11/15/2022, 2:15 PM
Hi @claire I am in a similar situation where I want my asset_sensor to yield a RunRequest when a AssetMaterialization event takes place. My AssetMaterialization is emitted from a op as
context.log_event(
        AssetMaterialization(
            asset_key=AssetKey('my_asset'),
            partition=my_partition_str,
            metadata={
                'my_metadata_key': my_metadata_value
            },
        )
    )
and for the asset_sensor I am implementing your
multi_asset_sensor
suggestion above, where I replace
run_requests_by_partition
with a
RunRequest
as I want to pass a run_config to the job. The AssetMaterialization op and the multi_asset_sensor are in two different repos. My problem: the multi_asset_sensor is failing with the following error
dagster._core.errors.DagsterInvariantViolationError: Cannot get latest materialization by partition for assets with no partitions
Is my approach to pass the partition to the AssetMaterialization incorrect? Many thanks!
c

claire

11/15/2022, 5:41 PM
As of 1.0.16
run_request_for_partition
now accepts a run config argument!
l

Lucia Ambrogi

11/16/2022, 4:05 PM
thanks @claire! Passing the run config via run_request_for_partition now. However, I am still facing the same error
dagster._core.errors.DagsterInvariantViolationError: Cannot get latest materialization by partition for assets with no partitions
raised as soon as I enable the sensor. Is there a way to check whether or not an asset is properly defined with a partition? As mentioned in my prev message, this is how I materialize my asset
context.log_event(
        AssetMaterialization(
            asset_key=AssetKey('my_asset'),
            partition=my_partition_str,
            metadata={
                'my_metadata_key': my_metadata_value
            },
        )
    )
so I d expect my asset to have a partition
c

claire

11/17/2022, 7:03 PM
ah... I see the issue. So
latest_materialization_by_partition
expects the asset to be a software-defined asset with a partitions definition, so dagster knows the full set of partitions to retrieve. I think what you can use instead is the
context.materialization_records_for_key
method, which should fetch all the unconsumed materializations. You'll be able to fetch the partition off the event record