Keshav
02/14/2024, 6:31 AMScout
02/14/2024, 6:31 AMSkipReason
to skip the current tick.
Here's a rough example of how you might implement this logic:
python
from dagster import sensor, SkipReason
from dagster.core.storage.pipeline_run import RunsFilter
from dagster.core.definitions.run_status_sensor_definition import RunStatusSensorContext
@sensor(job=my_pipeline)
def my_sensor(context: RunStatusSensorContext):
recent_runs = context.instance.get_runs(
limit=1,
filters=RunsFilter(
pipeline_name=my_pipeline.name,
tags={"dagster/schedule_name": "my_schedule"},
),
)
if recent_runs:
# Check if the recent run was triggered within your defined "recent" time frame
if is_recent(recent_runs[0]):
return SkipReason(f"A run for {my_pipeline.name} was recently triggered by a schedule.")
# If no recent scheduled run, proceed with the sensor logic
# ...
In this example, is_recent
would be a custom function you define to determine if the run is recent enough to warrant skipping the sensor tick. You would need to implement the logic to check the timestamp of the last run against the current time.
Remember that this is a conceptual example and you would need to adapt it to fit your specific use case, including handling the run timestamps and defining what constitutes a "recent" run.
Please note that running queries inside a sensor, as mentioned in one of the GitHub issues, can impact Dagster's performance as the number of sensors and jobs increases. It's important to consider the performance implications of any custom logic you add to your sensors. Handling Backlogged RunRequests Triggere... button job concurrency button How to restart a run status sensor witho... button Dagster 0.12.0: Into the Groove | Dagste... button Thank you for your feedback!