https://dagster.io/ logo
#ask-ai
Title
# ask-ai
t

Tara Larrue

02/15/2024, 9:17 PM
@Scout can I create a sensor from one code location that triggers the materialization of an asset based on upstream materialization of an asset in another code location?
s

Scout

02/15/2024, 9:17 PM
Yes, you can create a sensor that triggers the materialization of an asset based on the upstream materialization of an asset, even if they are defined in different code locations. An asset sensor checks for new
AssetMaterialization
events for a particular asset key and can be used to kick off a job that computes downstream assets. This pattern enables cross-job and even cross-code-location dependencies, as each job run instigated by an asset sensor is agnostic to the job that caused it. For example, you can use the
@multi_asset_sensor
decorator to define custom sensors that trigger based on the materializations of multiple assets. This allows you to model cross-team dependencies with asset-based sensors, where you can explicitly encode your pipelines' dependencies on assets created by other teams and trigger your pipelines only when those assets are updated. Here is an example from the documentation that monitors two upstream daily-partitioned assets and kicks off materializations of the corresponding partition in the downstream daily-partitioned asset:
Copy code
python @multi_asset_sensor(     monitored_assets=[         AssetKey("upstream_daily_1"),         AssetKey("upstream_daily_2"),     ],     job=downstream_daily_job, ) def trigger_daily_asset_when_any_upstream_partitions_have_new_materializations(context):     run_requests = []     for (         partition,         materializations_by_asset,     ) in context.latest_materialization_records_by_partition_and_asset().items():         if all(             [                 context.all_partitions_materialized(asset_key, [partition])                 for asset_key in context.asset_keys             ]         ):             run_requests.append(RunRequest(partition_key=partition))             for asset_key, materialization in materializations_by_asset.items():                 if asset_key in context.asset_keys:                     context.advance_cursor({asset_key: materialization})     return run_requests
This sensor will yield a run request for the downstream asset whenever either upstream partition has a new materialization. The
context.latest_materialization_records_by_partition_and_asset()
method is used to check the latest materializations for each partition, and
context.all_partitions_materialized()
is used to verify that all required upstream partitions have been materialized before triggering the downstream job. Remember that the assets can be defined in different code locations, and the sensor will still be able to trigger the appropriate downstream computations as long as it has access to the asset keys and the job that should be run.