Amit Taller
08/28/2023, 4:23 PMRUN_SUCCESS
status. This job required a list of all the assets that were materialized by the job that initiated the sensor.
When I've tested this sensor while manually initiate a temp_job, I was able to grab the materialized assets from asset_selection
attribute of dagster_run
, but when this temp_job
was initiated by a schedule - the asset_selection
attribute was empty (None).
Is there a way to get all these assets from inside that sensor?
Thanks!
temp_job = define_asset_job(
name="temp_job",
selection=AssetSelection.keys("prospect_lists")
)
temp_job_schedule = ScheduleDefinition(
job=temp_job,
cron_schedule="* * * * *",
)
@run_status_sensor(
run_status=DagsterRunStatus.SUCCESS,
request_job=watchdog_job,
minimum_interval_seconds=15,
)
def success_sensor(
context: RunStatusSensorContext
):
assets_keys = [asset.to_user_string() for asset in context.dagster_run.asset_selection]
sandy
08/28/2023, 6:10 PMAmit Taller
08/28/2023, 6:27 PM