Hi Dagster team, I'm running a pipeline in which a...
# ask-community
j
Hi Dagster team, I'm running a pipeline in which a subgraph is called dynamically. The subgraph, which is ran for 4 dynamic outputs in the following screenshots, is running breadth first, i.e. for each run of the subgraph the first op is finished, then the second, etc. I'm looking to control the concurrency of these subgraphs. • Is there a way to run the different subgraphs in a depth first manner (order of red boxes in the first screenshot)? • Is there a way to control the concurrency of the subgraphs that are running breadth first, i.e. running the first two subgraphs concurrently and once those are finished, run the second batch (second screenshot)? Thanks ! 🙂
👍 2
🔥 1
y
@Dagster Bot issue Control breadth-first vs depth-first dynamic graph execution
d
y
Copy code
@op(out=DynamicOut())
def generate_things():
    for i in range(3):
        yield DynamicOutput(i, mapping_key=str(i))


@op(tags={"dagster/priority": 1})
def downstream_one(i):
    print("1", i)
    return i


@op(tags={"dagster/priority": 2})
def downstream_two(i):
    print("2", i)
    return i


@job
def my_job():
    generate_things().map(lambda x: downstream_two(downstream_one(x)))
❤️ 1
@Dagster Bot discussion How to control breadth-first vs depth-first in dynamic graph execution
d
Question in the thread has been surfaced to GitHub Discussions for future discoverability: https://github.com/dagster-io/dagster/discussions/8871
n
This BF vs DF issue can definitely be confusing when running a concurrent dynamically generated subgraph that involves grabbing a large file for processing. I’m running experiments to see if I can achieve what I need in dagster or if I will have to let something like
dask
or
ray
handle the parallelization / distribution. By default, it seems things run BF. This means that if the first op in your dynamic subgraph is to get a large file for processing, you will rapidly run out of storage / memory before your concurrency is overwhelmed. If things were to run in DF, there are no issues - each large file has its sub-graph run (parallelized @ op level where possible) to completion before
collect
happens. In DF, if there is enough concurrency, multiple of the sub-graphs could run at once up to the op-concurrency limit. It is still confusing as to the maximum concurrency that is occurring and how to control it - is it
max_concurrent_runs
(this doesn’t seem right)? You would still need to tune this max concurrency to prevent an OOM/OOS situation in the worst case. Perhaps a configuration for a dynamic graphs could be whether things run DF/BF? Or is the priority the targeted API to do this?