https://dagster.io/ logo
#ask-community
Title
# ask-community
i

Iain Kennedy

07/19/2023, 4:36 PM
I have a pretty straightforward
@run_failure_sensor
... we had some failures on production and it appeared to work fine, except it started repeating the same message over and over, every time it runs (for the same run ID). In the UI the ticks are all there and have the following message:
Copy code
dagster._core.errors.SensorExecutionError: Error occurred during the execution of evaluation_fn for sensor slack_on_run_failure
...
...
...
The above exception was caused by the following exception:
dagster._check.CheckError: Failure condition: Expected a single SkipReason or one or more DagsterRunReaction: received both DagsterRunReaction and SkipReason
The failure sensor only ever does a
return SkipReason
before it outputs the message. I've disabled the sensor for 24 hours, but as soon as I turn it back on, it starts sending the same message for the same run ID as before (which is well past it's retry limit and is not actually running, of course).
I can see that the
@run_failure_sensor
decorator wraps
@run_status_sensor
and then a
RunStatusSensorDefinition
- I just can't figure out why it's finding a
DagsterRunReaction
- I'm not yielding one from my sensor at any point - and I can't repeat the error locally when running it through tests using
execute_in_process
with various mocked contexts - the only results I can get are: •
None
SkipReason(skip_message='FooBar')
Is that I need to clear the sensor's cursor on the dagster UI so it doesn't keep trying to process this same run failure over and over again and assume it was some anomaly on that particular run?
c

claire

07/19/2023, 11:37 PM
Hi Iain. I looked into this and found one spot where we are yielding a run reaction via the run failure sensor. I'm not entirely sure why we're doing this, but it's only possible to hit this code path with the error you're receiving if some error is occurring in your sensor after you've yielded the skip reason. For example, the following sensor would hit this error if a failed run exists:
Copy code
@run_failure_sensor
def exception_sensor(context: RunFailureSensorContext):
    yield SkipReason("skipping")
    raise Exception("blah blah blah")
Here's a code snippet you can use to debug your sensor and see what exception is occurring:
Copy code
if __name__ == "__main__":

    @op
    def fails():
        raise Exception("alksdjksald")

    @job
    def my_job_2():
        fails()

    instance = DagsterInstance.ephemeral()
    result = my_job_2.execute_in_process(instance=instance, raise_on_error=False)

    dagster_run = result.dagster_run
    dagster_event = result.get_job_failure_event()

    with build_run_status_sensor_context(
        sensor_name="exception_sensor",
        dagster_instance=instance,
        dagster_run=dagster_run,
        dagster_event=dagster_event,
    ) as context:
        list(exception_sensor(context))
An alternative would just be to stop yielding the skip reason, which should display the real error in the UI
i

Iain Kennedy

07/20/2023, 8:21 AM
That would make sense, but this sensor does a
return SkipReason()
not a
yield
so execution would stop at that point and there's no code before that might be raising an exception:
Copy code
@run_failure_sensor(request_job=document_pipeline)
def slack_on_run_failure(context: RunFailureSensorContext):
    tags = context.dagster_run.tags
    try:
        max_retries = tags["dagster/max_retries"]
    except KeyError:
        max_retries = None
    try:
        retry_number = tags["dagster/retry_number"]
    except KeyError:
        retry_number = None

    # Avoid message spam if run retries are enabled                                                                                                                                                                                    
    if max_retries is not None:
        if retry_number is None or int(retry_number) < int(max_retries):
            return SkipReason(
                f"Not sending message because retry_number {retry_number} is < max_retries {max_retries}"
            )
    
    # Send message and don't yield or return anything
    ...
When the sensor runs, it does send a message and the logic from the tags does not reach the
return SkipReason()
I'm using a similar approach to test (adding the relevant tags for the logic in the sensor), but can't reproduce the problem in the dev environment:
Copy code
def test__run_failure():

    @op(retry_policy=RetryPolicy(max_retries=3))
    def fails():
        raise Exception("Run failed!")

    @job(tags={"dagster/max_retries": 3, "dagster/retry_number": 3})
    def this_job_fails():
        fails()

    # Execute the job                                                                                                                                                                                                                  
    instance = DagsterInstance.ephemeral()
    result = this_job_fails.execute_in_process(instance=instance, raise_on_error=False)

    dagster_run = result.dagster_run

    dagster_event = result.get_job_failure_event()

    # create the context                                                                                                                                                                                                               
    run_status_sensor_context = build_run_status_sensor_context(
        sensor_name="slack_on_run_failure",
        dagster_instance=instance,
        dagster_run=dagster_run,
        dagster_event=dagster_event,
    )
    # run the sensor                                                                                                                                                                                                                   
    result = slack_on_run_failure(run_status_sensor_context)
c

claire

07/21/2023, 10:41 PM
Hmm this is odd... I'm unable to repro, I'd expect the sensor and the test to work without issue. What dagster versions are you running on production/locally? I wonder if this is an issue that existed in a prior version of dagster
i

Iain Kennedy

07/24/2023, 8:32 AM
Not entirely sure how to get the version from dagster.cloud - the agent version is
v1.3.6
Locally I've got
v1.3.14
I'm not able to reproduce locally either - if this is likely just a glitch, I'd like to clear the sensor so I can switch it back on without spamming our support folks with the same failed run... do I just wipe all values from the cursor 'edit' on the Sensor page, or do I update the record_id:
{"__class__": "RunStatusSensorCursor", "record_id": 5499826882, "update_timestamp": "2023-07-18T16:59:12.181043+00:00"}
We could return a
SkipReason("Sent message")
after the message send rather than not returning anything.
2 Views