https://dagster.io/ logo
#ask-ai
Title
# ask-ai
k

Keshav

02/14/2024, 6:34 AM
Is there a way to ignore a scheduled run for a pipeline if a run from a sensor for the same pipeline was triggered recently ?
s

Scout

02/14/2024, 6:34 AM
We provide features that can help you manage the concurrency of pipeline runs and ensure that runs are not unnecessarily duplicated. Specifically, you can use sensors to control when new runs are triggered and to prevent overlapping runs. For instance, you can use the
run_key
parameter when yielding
RunRequest
objects from a sensor. The
run_key
is a unique identifier for a run request, and Dagster guarantees that for a given sensor, at most one run is created for each unique
run_key
. If a sensor yields a new run request with a previously used
run_key
, Dagster skips processing the new run request. This can be used to avoid triggering a new run if a run with the same key is already in progress or has been completed recently. Here's a rough example of how you might use
run_key
in a sensor to prevent triggering a new run if a similar run is already queued or in progress:
Copy code
python
from dagster import sensor, RunRequest, SkipReason, DagsterRunStatus, RunsFilter

@sensor(job=my_pipeline)
def my_sensor(context):
    run_key = "unique_run_key_based_on_some_logic"

    # Check if a run with this run_key is already queued or in progress
    existing_runs = context.instance.get_runs(
        limit=1,
        filters=RunsFilter(
            pipeline_name=my_pipeline.name,
            statuses=[DagsterRunStatus.QUEUED, DagsterRunStatus.STARTED],
            tags={"run_key": run_key}
        ),
    )

    if existing_runs:
        # If a run with the run_key exists, skip triggering a new run
        return SkipReason(f"A run with run_key {run_key} is already queued or in progress.")

    # Otherwise, trigger a new run with the run_key
    return RunRequest(run_key=run_key, run_config={})
In this example, the sensor checks for existing runs with the same
run_key
before yielding a
RunRequest
. If a run is already queued or in progress, it returns a
SkipReason
instead, effectively ignoring the scheduled run. Please note that you should be cautious with querying the run storage within a sensor, as it can impact performance if not used judiciously. The example provided is a simplified illustration and should be adapted to your specific use case and performance considerations. For more complex scenarios, you might need to implement additional custom logic to manage run concurrency based on your specific requirements. If you have further questions or need assistance with a particular use case, feel free to ask! Handling Backlogged RunRequests Triggere... button job concurrency button How to restart a run status sensor witho... button Dagster 0.12.0: Into the Groove | Dagste... button Thank you for your feedback!