https://dagster.io/ logo
#ask-ai
Title
# ask-ai
l

Luke Bogacz

03/04/2024, 8:19 PM
@Scout how can i process a dynamic out through a graph one at a time?
s

Scout

03/04/2024, 8:19 PM
To process dynamic outputs through a graph one at a time, you can control the concurrency of the operations within the graph. The documentation provides guidance on how to execute dynamic outputs in serial order instead of in parallel by configuring the executor to enforce concurrency limits. Here's a step-by-step approach based on the documentation: 1. Define your dynamic output operation using the
DynamicOut
type. This operation will yield
DynamicOutput
objects, each representing a piece of data to be processed serially. 2. Configure the executor in your job to limit concurrency. You can do this by setting the
max_concurrent
parameter to
1
in the executor config, which ensures that only one operation runs at a time. 3. Map the dynamic output to a graph or a series of operations that will process each piece of data. Since the concurrency is limited, these operations will be executed one after another, in serial order. Here is an example of how you might define the job and configure the executor to enforce serial execution:
Copy code
python
from dagster import job, op, DynamicOut, DynamicOutput, graph

@op(out=DynamicOut())
def generate_dynamic_outputs():
    # Your logic to yield dynamic outputs
    for i in range(10):
        yield DynamicOutput(value=i, mapping_key=str(i))

@graph
def process_graph(input_data):
    # Your graph logic here

@job(executor_def=in_process_executor.configured({"max_concurrent": 1}))
def my_serial_job():
    dynamic_results = generate_dynamic_outputs()
    dynamic_results.map(process_graph)
In this example,
generate_dynamic_outputs
is an operation that yields dynamic outputs. The
process_graph
is a graph that processes each piece of data. The
my_serial_job
job uses the
in_process_executor
with the
max_concurrent
parameter set to
1
to ensure that the graph processes each dynamic output one at a time. Please note that the actual syntax and configuration may vary depending on the version of Dagster you are using and the specific details of your use case. Always refer to the latest Dagster documentation for the most accurate and up-to-date information. Using Dynamic Graphs and Op Priority to ... button How do I do dynamic fan out with Dagster... button How to control breadth-first vs depth-fi... button Dynamic Graphs | Dagster button Thank you for your feedback!
l

Luke Bogacz

03/04/2024, 8:24 PM
i want to process the dynamic outputs one at a time - not collect them all, then fanout. is there a way to do this?
@Scout i want to process the dynamic outputs one at a time - not collect them all, then fanout. is there a way to do this?
s

Scout

03/04/2024, 8:24 PM
Yes, you can process dynamic outputs one at a time without collecting them all first. To achieve this, you can use the
map
method on the dynamic output to apply a function to each element as it is yielded. This will ensure that each dynamic output is processed individually and serially. Here's an example based on the documentation:
Copy code
python from dagster import job, op, DynamicOut, DynamicOutput  @op(out=DynamicOut()) def generate_dynamic_outputs():     # Your logic to yield dynamic outputs     for i in range(10):         yield DynamicOutput(value=i, mapping_key=str(i))  @op def process_individual_output(input_data):     # Your logic to process each individual output     pass  @job def my_serial_job():     dynamic_results = generate_dynamic_outputs()     dynamic_results.map(process_individual_output)
In this example,
generate_dynamic_outputs
yields dynamic outputs one by one. The
process_individual_output
operation is mapped to each dynamic output, which means it will be invoked for each output separately. The
map
method ensures that each dynamic output is processed as it is yielded, without waiting to collect all outputs first. By using the
map
method, you are effectively fanning out the processing of each dynamic output, but since the
in_process_executor
processes operations serially by default, they will be processed one at a time. If you need to enforce this behavior explicitly, you can configure the executor with a concurrency limit as mentioned in the previous message. Remember to check the documentation for the version of Dagster you are using to ensure that the syntax and behavior are consistent with the current implementation.
5 Views