Hi, we use `QueuedRunCoordinator` with custom tags...
# ask-community
m
Hi, we use
QueuedRunCoordinator
with custom tags in order to set limits on the number of runs that can be executed at once. Some of the jobs should be queued and wait for the lock to be freed, but others in some occasions make the queue to fill up pretty quickly. Is there a way to skip the sending of specific jobs to the queue, if the queue (of the specific tag) is full?
c
Hi May. Unfortunately no built-in setting to be able to do something like this. One workaround would be to write your own custom run coordinator: https://docs.dagster.io/deployment/run-coordinator#custom-run-coordinators
r
Thanks @claire! (I work with @May Kogan 🙂) Another question - I understand we need to implement
submit_run
. How can I query the relevant queue size with the
SubmitRunContext
context I have? (if there’s a better way from using the regular run query functionality and ordering the queue runs by tags - a way to check the internal “counter” of the queue)
c
One way I can think of is to query the instance for the number of queued runs with a certain tag. Here's a bit of untested code:
Copy code
instance.get_runs_count(RunsFilter(statuses=[DagsterRunStatus.QUEUED], tags={"key": "value"}))
The
get_runs_count
method is currently private, which means that it is used internally and subject to change. If you want a stable API, you can use
get_runs
instead, but this method may be slower/more expensive since it deserializes the runs rather than just the counts. If you decide to go with using
get_runs_count
, you can file a feature request to petition this method to be made public.
r
Thanks @claire, I’ll check it out! Is the
RunCoordinator
being handled concurrently on the submitting side? Is it possible to have some sort of race condition on there? As I’m checking on every
submit_run
the relevant queue size, and don’t have a clear locking mechanism there - and another run submitting request might be in process. Another case is another run might just start (dequeued from the daemon) during my check - that’s sort of avoidable by checking if the queue is empty. It’s not that bad as the dequeued mechanism will take care of that anyway (but the job will still enter the queue). What would be your recommendation? Thanks!
c
Hi Roei. Unfortunately I'm not well versed on run coordinators and the possibilities of race conditions, so not sure I am the best person to answer your question. Would you mind making another post in dagster-support so that the question can get directed to someone with more knowledge on this?
r
Sure, thanks 🙂