https://dagster.io/ logo
#ask-community
Title
# ask-community
n

Nathan Calandra

03/31/2022, 4:02 PM
Hi all, I’m trying to write a run failure sensor that will re-execute a job if it fails, but running into some issues trying to submit the new job. I cannot quite find the right logic to re-execute the job. Is there anyway to do that from inside the sensor? I’ve tried a few different things but haven’t been able to get it to work right. Thanks!
Copy code
@run_failure_sensor()
def failed_job_handler(context: RunFailureSensorContext):
    if not context.dagster_run.previous_run_id:
        # Send message to slack
        slack.send_message(f"retrying job: {context.dagster_run.run_id}")
        # Create and submit the new run
        # Something goes here:...
    else:
        slack.send_message(f"Job failed!: {context.dagster_run.run_id}")
👋 1
o

owen

03/31/2022, 4:45 PM
cc @yuhan
n

Nathan Calandra

03/31/2022, 5:00 PM
I looked into that, but it’s part of the legacy API, this comment makes me feel it’s not right:
Users will typically call this API when testing pipeline reexecution, or running standalone scripts.
👍 1
y

yuhan

03/31/2022, 5:46 PM
Hi there! Run failure sensors aren’t fully integrated with all the sensor functionalities, e.g., it doesn’t support yield run request at the moment. Here is an issue for tracking a similar feature request: https://github.com/dagster-io/dagster/issues/4964 Meanwhile, as a workaround, if you’d like to react on failures and instigate a run, you could customize your own sensor as a regular sensor, something like:
Copy code
@sensor(job=my_job)
def multi_asset_sensor(context):
    # or get_run_records where the run status is failure
    run_failure_records = context.instance.get_event_records(
        EventRecordsFilter(
            event_type=DagsterEventType.RUN_FAILURE,
        ),
        ascending=False,
        limit=1,
    )

    if not run_failure_records:
        return

    yield RunRequest(...)
n

Nathan Calandra

03/31/2022, 5:59 PM
@yuhan Thanks for the help! I’ll give that a try.
y

yuhan

03/31/2022, 6:01 PM
Question in the thread has been surfaced to GitHub Discussions for future discoverability: https://github.com/dagster-io/dagster/discussions/7257
n

Nathan Calandra

03/31/2022, 6:09 PM
Actually, another question: Do I need to have a separate sensor for each job or can I yield run requests for different jobs?
y

yuhan

03/31/2022, 6:15 PM
You can yield run requests for different jobs as:
Copy code
@sensor(jobs=[job_1, job_2])
def multi_job_sensor(context):
    ...
    yield RunRequest(job_name=job_1.name, ...) # specify the targeted job name in the RunRequest
n

Nathan Calandra

03/31/2022, 6:15 PM
perfect, thanks!
4 Views