https://dagster.io/ logo
#dagster-support
Title
# dagster-support
f

Félix Tremblay

12/12/2022, 9:35 PM
Hi, I'm looking at the definition of the class
SensorEvaluationContext
, and I'm confused about the following descriptions: • cursor (Optional[str]): The cursor, passed back from the last sensor evaluation via the cursor attribute of SkipReason and RunRequest • last_completion_time (float): DEPRECATED The last time that the sensor was evaluated (UTC). • last_run_key (str): DEPRECATED The run key of the RunRequest most recently created by this sensor. Use the preferred
cursor
attribute instead. I would like to understand a few things: Question 1: If
last_completion_time
is deprecated, then was is the correct way to get this value? Also, I find that _last_completion_time_ & _last_run_key_ methods are very intuitive and it seems odd that they are being deprecated Question 2: Using the SensorEvaluationContext, is there a way to inspect (e.g. fetch the status) of the runs that were triggered. I need to know, for example, if other runs are completed or still running.
p

prha

12/12/2022, 11:22 PM
Hi Félix. I think
last_completion_time
and
last_run_key
are generally good with regards to sensor machinery, but are not sufficient for all cases that we want sensors to handle. Abstractly, sensors are detecting some external state change (e.g. via an API or some external event) and may need some persistent state to keep track of across evaluations to track progress against this external state. It makes sense that that might not always be time-based. Also,
last_completion_time
can suffer from some timing issues due to clock skew across machines. With sensors and schedules,
run_key
is mostly used for idempotence, to prevent duplicate runs. In practice, we found people manipulating the run_key to keep track of external state. Introducing a generic
cursor
state seemed to be a better general solution. Within the body of the sensor, you can call
context.update_cursor(new_cursor_value)
and you should get the new value in
context.cursor
for the next sensor evaluation. For #2, I think you cloud call some of the instance methods off of the
context
in order to fetch run status.
Copy code
@sensor(job=my_job)
def my_sensor(context):
    latest_in_progress_runs = context.instance.get_run_records(
        filters=RunsFilter(
            job_name="my_job",
            statuses=[DagsterRunStatus.QUEUED, DagsterRunStatus.NOT_STARTED, DagsterRunStatus.STARTING, DagsterRunStatus.STARTED],
            tags={"dagster/sensor_name": "my_sensor"}
        ),
        limit=1
    )
    if len(latest_in_progress_runs) > 0:
        latest_run_id = latest_in_progress_runs[0].run_id
        return SkipReason(f'run in flight: {latest_run_id}')

    # check some external state
    ...

    context.update_cursor(new_value)
    return RunRequest(run_key=...)
f

Félix Tremblay

12/14/2022, 4:00 PM
Thank you, @prha that's very helpful and solves my issue. I would also very much like to know if it is possible to use
context.instance.get_run_records(...)
to query the run records for runs that were triggered by another sensor?
My goal is to have a sensor that checks if jobs (which use an RDS resource) are running or planned, and if not, shut down the RDS resource
p

prha

12/14/2022, 4:03 PM
Yes, you could change the value of for the
dagster/sensor_name
tag
👍 1
2 Views