Hello Dagsters, I have a simple op that fetches bu...
# ask-community
s
Hello Dagsters, I have a simple op that fetches bunch of records from DB, processes them and sends back as response. The job works fine, but when no. of records fetched from DB is huge, the current logic loads all the records in memory at once and leads to OOM. So, I thought to explore Dagster's Dynamic Graph which seems to allow breaking op output in chunks and process them individually. However, to my surprise, it seems like the generator returned by Dynamic Op is processed fully, before passing on result to subsequent Op. Below is a very basic example to demonstrate the same:
Copy code
import dagster


@dagster.op(out=dagster.DynamicOut())
def generate_even_numbers():
    for i in range(10):
        if i % 2 == 0:
            print("even number: %s" % i)
            yield dagster.DynamicOutput(i, mapping_key=str(i))

@dagster.op
def square(number):
    squared = number * number
    print("Squared: %s" % squared)
    return squared


@dagster.job
def square_generator():
    evens = generate_even_numbers()
    evens.map(square)


square_generator.execute_in_process(run_config={
    "loggers": {"console": {"config": {"log_level": "INFO"}}}
})
The output that I get after running above is:
Copy code
even number: 0
even number: 2
even number: 4
even number: 6
even number: 8
Squared: 0
Squared: 4
Squared: 16
Squared: 36
Squared: 64
Shouldn't this be below instead?
Copy code
even number: 0
Squared: 0
even number: 2
Squared: 4
even number: 4
Squared: 16
even number: 6
Squared: 36
even number: 8
Squared: 64
Is Dynamic Output expected to work like this or I am missing something? And is there any way to achieve above in case Dagster is behaving as expected.
s
🤔, yeah.... I guess you are right. Let's wait someone of the team for an answer
j
@alex
a
it seems like the generator returned by Dynamic Op is processed fully, before passing on result to subsequent Op
This is true, we wait for the op to complete successfully before starting any downstream work As long as you are not using an in memory io manager, the results should not be held in memory. You can use the
dagster/priority
tag on your
op
to force depth first dag traversal by setting a higher priority on the downstream ops. You will still see batches of N of the “first-tier” ops execute based on the
max_concurrency
when using the multiprocess executor, as there are no ready “second tier” ops when the executor fills the concurrency slots.
s
Thanks @alex for your response. Sadly, since I am using in-process executor, all this won't apply. right?
Another follow up question: For how long does Dagster's in-memory IO Manager stores op output? Is it preserved till whole pipeline execution is complete or is there any pruning strategy that would remove the intermediate outputs that are no longer being referenced.
a
I am using in-process executor, all this won’t apply. right?
No it should work. If you set the
dagster/priority
tags in the way described you should get the execution order you seek.
Is it preserved till whole pipeline execution is complete
It is