Hi all, I'm trying to schedule some jobs to run h...
# ask-community
m
Hi all, I'm trying to schedule some jobs to run hourly. Some jobs may take longer than that to complete -- we want to skip a scheduled run for a job if it is already running. I'm seeing we are able to limit concurrent runs is an option but it will queue the job instead of skipping? Please advise. Thanks !
d
If you make this a sensor, you can use the
cursor
attribute to do this.https://docs.dagster.io/_apidocs/schedules-sensors#dagster.SensorEvaluationContext
m
Thanks Daniel. I'm not sure I quite understand. Can you provide some sample code? Would the sensor be checking jobs in the queue?
p
Hi My Mai. I think the simplest way to achieve this is to use a schedule that checks the run storage for in flight runs and skips if they exist. Here’s some untested sample code:
Copy code
from dagster import DagsterRunStatus, RunRequest, SkipReason, schedule
from dagster.core.storage.pipeline_run import RunsFilter

@schedule("0 * * * *")
def my_schedule(context):
    in_flight_runs_count = context.instance.get_runs_count(
        RunsFilter(job_name="my_job", statuses=[DagsterRunStatus.STARTED])
    )
    if in_flight_runs_count > 0:
        return SkipReason("in flight run")
    else:
        return RunRequest(run_config={}, tags={})
❤️ 2
m
@prha this works ! Thank you.