Heiner Hippke
08/17/2023, 1:51 PMrun_coordinator:
module: dagster._core.run_coordinator
class: QueuedRunCoordinator
config:
max_concurrent_runs: 10
tag_concurrency_limits:
- key: database
limit: 3
value: exasol
I have built a job to test this limit:
@job(
tags=EXASOL_TAG,
)
def test_job_concurrency_inner():
get_sleep_op(0)()
@job
def test_job_level_concurrency():
@op(
name=f"test_job_level_concurrency_{1}"
)
def start_part1(context):
execute_job(
reconstructable(test_job_concurrency_inner),
instance=context.instance,
tags=EXASOL_TAG,
)
@op(
name=f"test_job_level_concurrency_{2}"
)
def start_part2(context):
execute_job(
reconstructable(test_job_concurrency_inner),
instance=context.instance,
tags=EXASOL_TAG,
)
@op(
name=f"test_job_level_concurrency_{3}"
)
def start_part3(context):
execute_job(
reconstructable(test_job_concurrency_inner),
instance=context.instance,
tags=EXASOL_TAG,
)
@op(
name=f"test_job_level_concurrency_{4}"
)
def start_part4(context):
execute_job(
reconstructable(test_job_concurrency_inner),
instance=context.instance,
tags=EXASOL_TAG,
)
@op(
name=f"test_job_level_concurrency_{5}"
)
def start_part5(context):
execute_job(
reconstructable(test_job_concurrency_inner),
instance=context.instance,
tags=EXASOL_TAG,
)
start_part1()
start_part2()
start_part3()
start_part4()
start_part5()
The sleep op sleeps for 60 seconds by default.
I would expect that at most 3 of the 5 parts start concurrently. But sadly all 5 are running at the same time (please see attached screenshot). I have also checked, that the tags of the runs are correct (see second screenshot).
Maybe the limits do not apply when starting jobs by execute_job?
Thank you very much in advance
Heineralex
08/17/2023, 4:08 PMexecute_job
directly and immediately executes the job, bypassing the queue. You will need to submit
the run instead if you want it to go through the queue and have the limits respectedalex
08/17/2023, 4:09 PMHeiner Hippke
08/25/2023, 11:27 AM