Jori Geysen
07/13/2022, 10:02 AMyuhan
07/13/2022, 10:03 PMDagster Bot
07/13/2022, 10:03 PMyuhan
07/13/2022, 10:18 PM@op(out=DynamicOut())
def generate_things():
for i in range(3):
yield DynamicOutput(i, mapping_key=str(i))
@op(tags={"dagster/priority": 1})
def downstream_one(i):
print("1", i)
return i
@op(tags={"dagster/priority": 2})
def downstream_two(i):
print("2", i)
return i
@job
def my_job():
generate_things().map(lambda x: downstream_two(downstream_one(x)))
yuhan
07/13/2022, 10:19 PMDagster Bot
07/13/2022, 10:19 PMNick Renninger
07/20/2022, 1:39 AMdask
or ray
handle the parallelization / distribution.
By default, it seems things run BF. This means that if the first op in your dynamic subgraph is to get a large file for processing, you will rapidly run out of storage / memory before your concurrency is overwhelmed.
If things were to run in DF, there are no issues - each large file has its sub-graph run (parallelized @ op level where possible) to completion before collect
happens.
In DF, if there is enough concurrency, multiple of the sub-graphs could run at once up to the op-concurrency limit. It is still confusing as to the maximum concurrency that is occurring and how to control it - is it max_concurrent_runs
(this doesn’t seem right)? You would still need to tune this max concurrency to prevent an OOM/OOS situation in the worst case.
Perhaps a configuration for a dynamic graphs could be whether things run DF/BF? Or is the priority the targeted API to do this?