Sandeep Aggarwal
09/01/2022, 7:47 AMimport dagster
@dagster.op(out=dagster.DynamicOut())
def generate_even_numbers():
for i in range(10):
if i % 2 == 0:
print("even number: %s" % i)
yield dagster.DynamicOutput(i, mapping_key=str(i))
@dagster.op
def square(number):
squared = number * number
print("Squared: %s" % squared)
return squared
@dagster.job
def square_generator():
evens = generate_even_numbers()
evens.map(square)
square_generator.execute_in_process(run_config={
"loggers": {"console": {"config": {"log_level": "INFO"}}}
})
The output that I get after running above is:
even number: 0
even number: 2
even number: 4
even number: 6
even number: 8
Squared: 0
Squared: 4
Squared: 16
Squared: 36
Squared: 64
Shouldn't this be below instead?
even number: 0
Squared: 0
even number: 2
Squared: 4
even number: 4
Squared: 16
even number: 6
Squared: 36
even number: 8
Squared: 64
Is Dynamic Output expected to work like this or I am missing something? And is there any way to achieve above in case Dagster is behaving as expected.Saul Burgos
09/01/2022, 1:22 PMjamie
09/01/2022, 6:19 PMalex
09/01/2022, 6:26 PMit seems like the generator returned by Dynamic Op is processed fully, before passing on result to subsequent OpThis is true, we wait for the op to complete successfully before starting any downstream work As long as you are not using an in memory io manager, the results should not be held in memory. You can use the
dagster/priority
tag on your op
to force depth first dag traversal by setting a higher priority on the downstream ops.
You will still see batches of N of the “first-tier” ops execute based on the max_concurrency
when using the multiprocess executor, as there are no ready “second tier” ops when the executor fills the concurrency slots.Sandeep Aggarwal
09/02/2022, 8:40 AMalex
09/06/2022, 2:28 PMI am using in-process executor, all this won’t apply. right?No it should work. If you set the
dagster/priority
tags in the way described you should get the execution order you seek.
Is it preserved till whole pipeline execution is completeIt is