https://dagster.io/ logo
Title
d

Dmytro Tsyliuryk

12/14/2021, 10:39 AM
hello dagster community I have one quick question: I have job + scheduler to run this job every minute; is here some way to check is this job running now and do not start new one if it is (so just skip next schedule run)? thank you in advance)
y

yuhan

12/14/2021, 5:12 PM
Hi @Dmytro Tsyliuryk did you mean a programmatic way to tell if a job is running?
d

Dmytro Tsyliuryk

12/14/2021, 5:14 PM
im not sure about the way to do that, but as equipment from Airflow its max_active_runs, so you can put number of jobs that can run in parallel
but it this case it also would be great to just skip jobs to run if one correctly in run
y

yuhan

12/14/2021, 5:35 PM
just to clarify, Airflow’s max_active_runs applies to the number of the same job or the number of concurrent runs for all jobs?
if that’s the former, i don’t think we have that out-of-box support yet. but you can get the run records via
context.instance.get_run_records(
    filters=PipelineRunsFilter(pipeline_name="your_job_name")
)
and run records give you info about the run status which indicates if a run is running or has succeeded/failed.
d

Dmytro Tsyliuryk

12/15/2021, 7:16 AM
just to clarify, Airflow’s max_active_runs applies to the number of the same job or the number of concurrent runs for all jobs?
the same job
ive found something like this https://docs.dagster.io/_apidocs/execution#dagster.multiprocess_executor but not sure how it works yet
@yuhan so do you have any another suggestions?
y

yuhan

12/20/2021, 8:39 PM
Hi @Dmytro Tsyliuryk sorry for the delayed response (I was out later last week). • The multiprocess_executor config max_concurrent is used for setting concurrent running ops inside one job run. • We also have a way to set max concurrent (across all jobs) in run coordinator: https://docs.dagster.io/deployment/run-coordinator#run-limits Unfortunately, what you are looking for (setting max concurrent runs for the same job) isn’t supported out-of-box at the moment. I’d suggest to write a custom logic in your schedule, something like:
@schedule(...)
def my_schedule(context):
	run_records = context.instance.get_run_records(
	    PipelineRunsFilter(
	    	pipeline_name="your_job_name", 
		    statuses=[PipelineRunStatus.STARTED],
		)
	)
	if len(run_records) == 0:
		yield RunRequest(...)
you can tune the filter to better fit your needs. here’s the get run records api: https://docs.dagster.io/_apidocs/internals#dagster.DagsterInstance.get_run_records
d

Dmytro Tsyliuryk

12/21/2021, 2:36 PM
cool, i see now. thank you, that should work )
y

yuhan

06/15/2022, 7:24 PM
@Dagster Bot discussion How to not schedule a new run if a same job run is running
d

Dagster Bot

06/15/2022, 7:24 PM
Question in the thread has been surfaced to GitHub Discussions for future discoverability: https://github.com/dagster-io/dagster/discussions/8414