jasono
02/10/2024, 4:30 AM@asset
def asset1(context):
filename = context.op_config["filename"]
# materializa with the filename
@asset
def asset2(asset1):
# materialize 2nd asset based on asset1
asset_job = define_asset_job("asset_job", "*")
@sensor(job=asset_job)
def materializes_asset_sensor():
yield RunRequest(
run_key=filename,
run_config={
"ops": {"import_file": {"config": {"filename": filename}}}
},
)
Scout
02/10/2024, 4:30 AMAssetMaterialization
events for a particular asset key and can be used to kick off a job that computes downstream assets.
Here's a simplified example based on the documentation that demonstrates how to define an asset sensor that triggers a job and passes the filename from an asset materialization:
python
from dagster import AssetKey, EventLogEntry, RunRequest, SensorEvaluationContext, asset_sensor
@asset_sensor(asset_key=AssetKey("my_table"), job=my_job)
def my_asset_sensor(context: SensorEvaluationContext, asset_event: EventLogEntry):
assert asset_event.dagster_event and asset_event.dagster_event.asset_key
filename = asset_event.dagster_event.event_specific_data.materialization.metadata_entries["filename"].entry_data.text
yield RunRequest(
run_key=context.cursor,
run_config={
"ops": {
"import_file": {
"config": {"filename": filename}
}
}
},
)
In this example, my_asset_sensor
is an asset sensor that listens for materializations of the asset with key my_table
. When a materialization occurs, it extracts the filename from the metadata of the materialization event and uses it to configure the import_file
op within the job my_job
.
Please note that the actual implementation will depend on how you've structured your assets and jobs, and how you're storing the filename in the materialization metadata. The example above assumes that the filename is stored in the metadata of the AssetMaterialization
event under the key "filename".
If you need to test your asset sensor, the documentation also provides guidance on constructing a context object using a test instance that can log AssetMaterialization
events, and then directly invoking your asset sensor with that context to evaluate the results.
Remember to replace my_job
with the actual job you want to trigger and adjust the metadata access according to how you store the filename in the materialization event. Configure IO manager in sensor button Thank you for your feedback!jasono
02/10/2024, 4:33 AM