Keshav
02/14/2024, 6:34 AMScout
02/14/2024, 6:34 AMrun_key
parameter when yielding RunRequest
objects from a sensor. The run_key
is a unique identifier for a run request, and Dagster guarantees that for a given sensor, at most one run is created for each unique run_key
. If a sensor yields a new run request with a previously used run_key
, Dagster skips processing the new run request. This can be used to avoid triggering a new run if a run with the same key is already in progress or has been completed recently.
Here's a rough example of how you might use run_key
in a sensor to prevent triggering a new run if a similar run is already queued or in progress:
python
from dagster import sensor, RunRequest, SkipReason, DagsterRunStatus, RunsFilter
@sensor(job=my_pipeline)
def my_sensor(context):
run_key = "unique_run_key_based_on_some_logic"
# Check if a run with this run_key is already queued or in progress
existing_runs = context.instance.get_runs(
limit=1,
filters=RunsFilter(
pipeline_name=my_pipeline.name,
statuses=[DagsterRunStatus.QUEUED, DagsterRunStatus.STARTED],
tags={"run_key": run_key}
),
)
if existing_runs:
# If a run with the run_key exists, skip triggering a new run
return SkipReason(f"A run with run_key {run_key} is already queued or in progress.")
# Otherwise, trigger a new run with the run_key
return RunRequest(run_key=run_key, run_config={})
In this example, the sensor checks for existing runs with the same run_key
before yielding a RunRequest
. If a run is already queued or in progress, it returns a SkipReason
instead, effectively ignoring the scheduled run.
Please note that you should be cautious with querying the run storage within a sensor, as it can impact performance if not used judiciously. The example provided is a simplified illustration and should be adapted to your specific use case and performance considerations.
For more complex scenarios, you might need to implement additional custom logic to manage run concurrency based on your specific requirements. If you have further questions or need assistance with a particular use case, feel free to ask! Handling Backlogged RunRequests Triggere... button job concurrency button How to restart a run status sensor witho... button Dagster 0.12.0: Into the Groove | Dagste... button Thank you for your feedback!