Félix Tremblay
12/12/2022, 9:35 PMSensorEvaluationContext
, and I'm confused about the following descriptions:
• cursor (Optional[str]): The cursor, passed back from the last sensor evaluation via the cursor attribute of SkipReason and RunRequest
• last_completion_time (float): DEPRECATED The last time that the sensor was evaluated (UTC).
• last_run_key (str): DEPRECATED The run key of the RunRequest most recently created by this sensor. Use the preferred cursor
attribute instead.
I would like to understand a few things:
Question 1: If last_completion_time
is deprecated, then was is the correct way to get this value? Also, I find that _last_completion_time_ & _last_run_key_ methods are very intuitive and it seems odd that they are being deprecated
Question 2: Using the SensorEvaluationContext, is there a way to inspect (e.g. fetch the status) of the runs that were triggered. I need to know, for example, if other runs are completed or still running.prha
12/12/2022, 11:22 PMlast_completion_time
and last_run_key
are generally good with regards to sensor machinery, but are not sufficient for all cases that we want sensors to handle.
Abstractly, sensors are detecting some external state change (e.g. via an API or some external event) and may need some persistent state to keep track of across evaluations to track progress against this external state. It makes sense that that might not always be time-based. Also, last_completion_time
can suffer from some timing issues due to clock skew across machines.
With sensors and schedules, run_key
is mostly used for idempotence, to prevent duplicate runs. In practice, we found people manipulating the run_key to keep track of external state. Introducing a generic cursor
state seemed to be a better general solution. Within the body of the sensor, you can call context.update_cursor(new_cursor_value)
and you should get the new value in context.cursor
for the next sensor evaluation.
For #2, I think you cloud call some of the instance methods off of the context
in order to fetch run status.
@sensor(job=my_job)
def my_sensor(context):
latest_in_progress_runs = context.instance.get_run_records(
filters=RunsFilter(
job_name="my_job",
statuses=[DagsterRunStatus.QUEUED, DagsterRunStatus.NOT_STARTED, DagsterRunStatus.STARTING, DagsterRunStatus.STARTED],
tags={"dagster/sensor_name": "my_sensor"}
),
limit=1
)
if len(latest_in_progress_runs) > 0:
latest_run_id = latest_in_progress_runs[0].run_id
return SkipReason(f'run in flight: {latest_run_id}')
# check some external state
...
context.update_cursor(new_value)
return RunRequest(run_key=...)
Félix Tremblay
12/14/2022, 4:00 PMcontext.instance.get_run_records(...)
to query the run records for runs that were triggered by another sensor?prha
12/14/2022, 4:03 PMdagster/sensor_name
tag