Iain Kennedy
07/19/2023, 4:36 PM@run_failure_sensor
... we had some failures on production and it appeared to work fine, except it started repeating the same message over and over, every time it runs (for the same run ID).
In the UI the ticks are all there and have the following message:
dagster._core.errors.SensorExecutionError: Error occurred during the execution of evaluation_fn for sensor slack_on_run_failure
...
...
...
The above exception was caused by the following exception:
dagster._check.CheckError: Failure condition: Expected a single SkipReason or one or more DagsterRunReaction: received both DagsterRunReaction and SkipReason
The failure sensor only ever does a return SkipReason
before it outputs the message.
I've disabled the sensor for 24 hours, but as soon as I turn it back on, it starts sending the same message for the same run ID as before (which is well past it's retry limit and is not actually running, of course).Iain Kennedy
07/19/2023, 8:57 PM@run_failure_sensor
decorator wraps @run_status_sensor
and then a RunStatusSensorDefinition
- I just can't figure out why it's finding a DagsterRunReaction
- I'm not yielding one from my sensor at any point - and I can't repeat the error locally when running it through tests using execute_in_process
with various mocked contexts - the only results I can get are:
• None
• SkipReason(skip_message='FooBar')
Iain Kennedy
07/19/2023, 9:19 PMclaire
07/19/2023, 11:37 PM@run_failure_sensor
def exception_sensor(context: RunFailureSensorContext):
yield SkipReason("skipping")
raise Exception("blah blah blah")
claire
07/19/2023, 11:43 PMif __name__ == "__main__":
@op
def fails():
raise Exception("alksdjksald")
@job
def my_job_2():
fails()
instance = DagsterInstance.ephemeral()
result = my_job_2.execute_in_process(instance=instance, raise_on_error=False)
dagster_run = result.dagster_run
dagster_event = result.get_job_failure_event()
with build_run_status_sensor_context(
sensor_name="exception_sensor",
dagster_instance=instance,
dagster_run=dagster_run,
dagster_event=dagster_event,
) as context:
list(exception_sensor(context))
claire
07/19/2023, 11:43 PMIain Kennedy
07/20/2023, 8:21 AMreturn SkipReason()
not a yield
so execution would stop at that point and there's no code before that might be raising an exception:
@run_failure_sensor(request_job=document_pipeline)
def slack_on_run_failure(context: RunFailureSensorContext):
tags = context.dagster_run.tags
try:
max_retries = tags["dagster/max_retries"]
except KeyError:
max_retries = None
try:
retry_number = tags["dagster/retry_number"]
except KeyError:
retry_number = None
# Avoid message spam if run retries are enabled
if max_retries is not None:
if retry_number is None or int(retry_number) < int(max_retries):
return SkipReason(
f"Not sending message because retry_number {retry_number} is < max_retries {max_retries}"
)
# Send message and don't yield or return anything
...
When the sensor runs, it does send a message and the logic from the tags does not reach the return SkipReason()
I'm using a similar approach to test (adding the relevant tags for the logic in the sensor), but can't reproduce the problem in the dev environment:
def test__run_failure():
@op(retry_policy=RetryPolicy(max_retries=3))
def fails():
raise Exception("Run failed!")
@job(tags={"dagster/max_retries": 3, "dagster/retry_number": 3})
def this_job_fails():
fails()
# Execute the job
instance = DagsterInstance.ephemeral()
result = this_job_fails.execute_in_process(instance=instance, raise_on_error=False)
dagster_run = result.dagster_run
dagster_event = result.get_job_failure_event()
# create the context
run_status_sensor_context = build_run_status_sensor_context(
sensor_name="slack_on_run_failure",
dagster_instance=instance,
dagster_run=dagster_run,
dagster_event=dagster_event,
)
# run the sensor
result = slack_on_run_failure(run_status_sensor_context)
claire
07/21/2023, 10:41 PMIain Kennedy
07/24/2023, 8:32 AMv1.3.6
Locally I've got v1.3.14
I'm not able to reproduce locally either - if this is likely just a glitch, I'd like to clear the sensor so I can switch it back on without spamming our support folks with the same failed run... do I just wipe all values from the cursor 'edit' on the Sensor page, or do I update the record_id:
{"__class__": "RunStatusSensorCursor", "record_id": 5499826882, "update_timestamp": "2023-07-18T16:59:12.181043+00:00"}
Iain Kennedy
07/24/2023, 1:16 PMSkipReason("Sent message")
after the message send rather than not returning anything.