I have an asset-job that may very occasionally tak...
# ask-community
g
I have an asset-job that may very occasionally take longer to process than its scheduled frequency. I know I can limit execution concurrency if using a QueuedRunCoordinator. However is it possible to build a schedule that simply skips if there's a current instance of the job being executed?
s
Hi Giovanni - there's no built-in option for this, but you could implement this logic inside the decorated function for your schedule. It might require a little code spelunking, but you could basically use
context.instance.get_run_records
and apply a
RunsFilter
to filter to runs that that are active.
g
oh, thanks for the pointers! i'll do some digging
I was thinking maybe a combination of concurrency-limiting and should_run but I wasn't sure when should_run would be evaluated
(especifically to drop runs that had been queued for longer than some specified amount of time)
I think I managed to get it to work as expected. Thank you, Sandy! here's a snippet for anyone stumbling into this in the future
Copy code
@schedule(job=my_job, cron_schedule="* * * * *")
def my_schedule(context: ScheduleEvaluationContext):
    f = RunsFilter(job_name="my_job", statuses=[DagsterRunStatus.STARTED])
    runs = context.instance.get_run_records(filters=f)
    if len(runs):
        return SkipReason(f"skipping because there are {len(runs)} started runs")
    return RunRequest(run_key=None)
s
awesome