https://dagster.io/ logo
#ask-community
Title
# ask-community
w

Wei Keng Lee

08/24/2023, 9:48 AM
Hi may I know how to skip the next scheduled run if the previous scheduled run overran ?
🎉 1
j

jamie

08/24/2023, 2:22 PM
by overran, do you mean that the first scheduled run was still executing by the time the second scheduled run started?
one option is to set a limit on the job so that dagster only ever runs one of those jobs at a time https://github.com/dagster-io/dagster/discussions/9921
🌈 1
w

Wei Keng Lee

08/25/2023, 1:36 AM
Hi @jamie yeah my issue was 2nd schedule started while 1st schedule still running. Thanks, its working by making the process into QUEUE which prevented multiple run in a same period. However, would like to ask is there a way we can completely SKIP the run instead of QUEUE it because the queue might get congested if there is consecutive run overran.
j

jamie

08/25/2023, 1:29 PM
to do that, you would need to query the DagsterInstance for currently executing runs for the job in question and then return a SkipReason if there is one executing. let me see if i can find an example
ok i didn’t find an exact example for this, so i’m writing this one from scratch. it may have a couple typos or syntax issues, so if you run into anything you can’t figure out let me know and i’ll help debug
Copy code
from dagster import RunsFilter, SkipReason, RunRequest, schedule
from dagster._core.storage.pipeline_run import IN_PROGRESS_RUN_STATUSES

@schedule(
    job=my_job,
)
def my_schedule(context):
    latest_run_id_for_job = context.instance.get_run_ids(filters=RunsFilter(job_name="my_job"), limit=1)
    in_progress = context.instance.get_runs(
                        filters=RunsFilter(
                            run_ids=[
                                latest_run_id_for_job
                            ],
                            statuses=IN_PROGRESS_RUN_STATUSES,
                        )
                    )

    if len(in_progress) > 0:
        return SkipReason("An execution of my_job is currently in progress")

    return RunRequest()
note that IN_PROGRESS_RUN_STATUSES is not in the public API and is subject to change (but this change is unlikely imo)
w

Wei Keng Lee

09/25/2023, 6:38 AM
hi @jamie thanks for the suggestion! daggy love context.instance.get_run_ids seems not exist, i made some amendment on this to produce something similar... if there is a better way or better practice in doing this, do share 🌈
Copy code
@schedule(
    name="every_05_minutes",
    job=every_05_minutes_job,
    cron_schedule="0/5 * * * *",
)
def every_05_minutes_schedule(context):
    latest_run_id_for_job = context.instance.get_runs(
        filters=RunsFilter(
            job_name="every_05_minutes", statuses=IN_PROGRESS_RUN_STATUSES
        )
    )

    if len(latest_run_id_for_job) > 0:
        return SkipReason(
            "An execution of every_05_minutes_job is currently in progress"
        )
    return RunRequest()
j

jamie

09/25/2023, 4:27 PM
hey @Wei Keng Lee - glad you got it working!
instance.get_run_ids
is in the
DagsterInstance
class but was only added a little over a month ago. if you’re on an older version of dagster, that’s why you might not see it
daggy love 1