Nathan Calandra
03/31/2022, 4:02 PM@run_failure_sensor()
def failed_job_handler(context: RunFailureSensorContext):
if not context.dagster_run.previous_run_id:
# Send message to slack
slack.send_message(f"retrying job: {context.dagster_run.run_id}")
# Create and submit the new run
# Something goes here:...
else:
slack.send_message(f"Job failed!: {context.dagster_run.run_id}")
owen
03/31/2022, 4:45 PMStephen Bailey
03/31/2022, 4:56 PMNathan Calandra
03/31/2022, 5:00 PMUsers will typically call this API when testing pipeline reexecution, or running standalone scripts.
yuhan
03/31/2022, 5:46 PM@sensor(job=my_job)
def multi_asset_sensor(context):
# or get_run_records where the run status is failure
run_failure_records = context.instance.get_event_records(
EventRecordsFilter(
event_type=DagsterEventType.RUN_FAILURE,
),
ascending=False,
limit=1,
)
if not run_failure_records:
return
yield RunRequest(...)
Nathan Calandra
03/31/2022, 5:59 PMyuhan
03/31/2022, 6:01 PMNathan Calandra
03/31/2022, 6:09 PMyuhan
03/31/2022, 6:15 PM@sensor(jobs=[job_1, job_2])
def multi_job_sensor(context):
...
yield RunRequest(job_name=job_1.name, ...) # specify the targeted job name in the RunRequest
Nathan Calandra
03/31/2022, 6:15 PM