Wei Keng Lee
08/24/2023, 9:48 AMjamie
08/24/2023, 2:22 PMjamie
08/24/2023, 2:28 PMWei Keng Lee
08/25/2023, 1:36 AMjamie
08/25/2023, 1:29 PMjamie
08/25/2023, 1:47 PMfrom dagster import RunsFilter, SkipReason, RunRequest, schedule
from dagster._core.storage.pipeline_run import IN_PROGRESS_RUN_STATUSES
@schedule(
job=my_job,
)
def my_schedule(context):
latest_run_id_for_job = context.instance.get_run_ids(filters=RunsFilter(job_name="my_job"), limit=1)
in_progress = context.instance.get_runs(
filters=RunsFilter(
run_ids=[
latest_run_id_for_job
],
statuses=IN_PROGRESS_RUN_STATUSES,
)
)
if len(in_progress) > 0:
return SkipReason("An execution of my_job is currently in progress")
return RunRequest()
note that IN_PROGRESS_RUN_STATUSES is not in the public API and is subject to change (but this change is unlikely imo)Wei Keng Lee
09/25/2023, 6:38 AM@schedule(
name="every_05_minutes",
job=every_05_minutes_job,
cron_schedule="0/5 * * * *",
)
def every_05_minutes_schedule(context):
latest_run_id_for_job = context.instance.get_runs(
filters=RunsFilter(
job_name="every_05_minutes", statuses=IN_PROGRESS_RUN_STATUSES
)
)
if len(latest_run_id_for_job) > 0:
return SkipReason(
"An execution of every_05_minutes_job is currently in progress"
)
return RunRequest()
jamie
09/25/2023, 4:27 PMinstance.get_run_ids
is in the DagsterInstance
class but was only added a little over a month ago. if you’re on an older version of dagster, that’s why you might not see it