Andreas
03/07/2024, 10:06 AMScout
03/07/2024, 10:06 AMRunRequest
for every materialization of the asset key `my_table`: python from dagster import ( AssetKey, EventLogEntry, RunConfig, SensorEvaluationContext, asset_sensor, ) @asset_sensor(asset_key=AssetKey("my_table"), job=my_job) def my_asset_sensor(context: SensorEvaluationContext, asset_event: EventLogEntry): assert asset_event.dagster_event and asset_event.dagster_event.asset_key yield RunRequest( run_key=context.cursor, run_config=RunConfig( ops={ "read_materialization": ReadMaterializationConfig( asset_key=list(asset_event.dagster_event.asset_key.path) ) } ), )
In this example, my_job
is the job that you want to execute when the sensor fires. The @asset_sensor
decorator is used to define the sensor, and it takes the asset_key
of the asset to monitor and the job
that should be executed when the sensor triggers. When the sensor detects a new materialization event for the specified asset key, it yields a RunRequest
that includes a run_key
and a run_config
. The run_config
can be used to pass specific configurations to the job when it runs. To use this sensor, you would define it in your Dagster repository alongside your assets and jobs. Once the sensor is defined and the Dagster instance is running, the sensor will monitor for materializations of the specified asset and trigger the associated job accordingly. Remember to replace my_job
with the actual job you want to trigger and to configure the run_config
as needed for your specific use case.