May Kogan
07/17/2023, 5:24 PMQueuedRunCoordinator
with custom tags in order to set limits on the number of runs that can be executed at once.
Some of the jobs should be queued and wait for the lock to be freed, but others in some occasions make the queue to fill up pretty quickly.
Is there a way to skip the sending of specific jobs to the queue, if the queue (of the specific tag) is full?claire
07/17/2023, 6:50 PMRoei Jacobovich
07/17/2023, 6:59 PMsubmit_run
. How can I query the relevant queue size with the SubmitRunContext
context I have? (if there’s a better way from using the regular run query functionality and ordering the queue runs by tags - a way to check the internal “counter” of the queue)claire
07/18/2023, 12:00 AMinstance.get_runs_count(RunsFilter(statuses=[DagsterRunStatus.QUEUED], tags={"key": "value"}))
The get_runs_count
method is currently private, which means that it is used internally and subject to change. If you want a stable API, you can use get_runs
instead, but this method may be slower/more expensive since it deserializes the runs rather than just the counts.
If you decide to go with using get_runs_count
, you can file a feature request to petition this method to be made public.Roei Jacobovich
07/18/2023, 3:58 PMRunCoordinator
being handled concurrently on the submitting side? Is it possible to have some sort of race condition on there? As I’m checking on every submit_run
the relevant queue size, and don’t have a clear locking mechanism there - and another run submitting request might be in process. Another case is another run might just start (dequeued from the daemon) during my check - that’s sort of avoidable by checking if the queue is empty.
It’s not that bad as the dequeued mechanism will take care of that anyway (but the job will still enter the queue). What would be your recommendation?
Thanks!claire
07/21/2023, 11:48 PMRoei Jacobovich
07/22/2023, 8:27 AM