May Bohadana
02/06/2023, 8:01 AMZach
02/06/2023, 6:31 PMfrom dagster import sensor, SensorExecutionContext, EventRecordsFilter, DagsterEventType
@sensor(job=send_datadog_metric)
def canceling_sensor(context: SensorExecutionContext):
canceling_ids = []
canceling_events = context.instance.get_event_records(EventRecordsFilter(event_type=DagsterEventType.PIPELINE_CANCELING))
for e in canceling_events:
if e.timestamp > datetime.now().timestamp() - 30*60:
canceling_ids.append(e.run_id)
if canceling_ids:
yield RunRequest(run_config={"ops": ..."}
else:
raise SkipReason("No canceling jobs detected")
May Bohadana
02/09/2023, 4:23 PM