Hello How can I get list of In Progress jobs progr...
# ask-community
v
Hello How can I get list of In Progress jobs programmatically? I know that I can send query using graphql, is the other way to do that?
dagster bot responded by community 1
r
The API is based on GraphQL. You have the following: https://docs.dagster.io/concepts/dagit/graphql-client with a limited set of functions available. If you want to do that from a Dagster’s job (as an Op), you could use the Op context to do the following:
Copy code
from dagster.core.storage.pipeline_run import PipelineRunsFilter, PipelineRunStatus

@op
def my_op(context):
    runs = context.instance.get_runs(filters=PipelineRunsFilter(statuses=[PipelineRunStatus.STARTED]))
v
Thanks
D 1
Another question on the same topic. I have 2 types of capacity units, CPU and GPU. For example each of them has 30 capacity units. I have 4 Jobs A, B, C and D. A - 30 GPU and 1 CPU B - 5 GPU and 2 CPU C - 6 CPU D - 2 CPU How can I run Jobs without exceeding the maximum of capacity units of GPU and CPU?
p
Hi Vlad. You might want to check out customizing your instance’s run coordinator to inspect the set of ongoing runs, their required resources, and the requirements of the runs on the queue in order to manage your capacity constraints: https://docs.dagster.io/deployment/run-coordinator#limiting-run-concurrency
v
@prha Hi. I know about this options. Here I can set a limit for number of runs for certain tag, but it's doesn't solve the problem of capacities. For example if Job A (30 GPU and 1 CPU) running, the Job B (5 GPU and 2 CPU) can't start because the Job A uses maximum of capacity, but in general I can set in run coordinator that maximum runs of Job A is 1 and Job B is 6. The is collision. How can I solve this?
p
Hi Vlad. The queued run coordinator by itself won’t be able to enforce the behavior you’re looking for. I think you’d probably have to implement your own run coordinator (perhaps subclassing the queued run coordinator), which looks at some job tags (indicating cpu/gpu) and does the math to calculate available capacity for the next job to dequeue.
v
How I can implement my own run coordinator? What function the daemon run before start any Job?
p
Apologies, Vlad. I thought the dequeuing logic was happening in the run coordinator (which is pluggable), but it’s actually happening in the queued run coordinator daemon. You’d have to fork the dagster daemon to implement your own logic to override this here: https://github.com/dagster-io/dagster/blob/e26293dcf7aaf5ac7a7a2aef6832ef743073c95[…]dagster/daemon/run_coordinator/queued_run_coordinator_daemon.py