Luke Bogacz
03/04/2024, 8:19 PMScout
03/04/2024, 8:19 PMDynamicOut
type. This operation will yield DynamicOutput
objects, each representing a piece of data to be processed serially.
2. Configure the executor in your job to limit concurrency. You can do this by setting the max_concurrent
parameter to 1
in the executor config, which ensures that only one operation runs at a time.
3. Map the dynamic output to a graph or a series of operations that will process each piece of data. Since the concurrency is limited, these operations will be executed one after another, in serial order.
Here is an example of how you might define the job and configure the executor to enforce serial execution:
python
from dagster import job, op, DynamicOut, DynamicOutput, graph
@op(out=DynamicOut())
def generate_dynamic_outputs():
# Your logic to yield dynamic outputs
for i in range(10):
yield DynamicOutput(value=i, mapping_key=str(i))
@graph
def process_graph(input_data):
# Your graph logic here
@job(executor_def=in_process_executor.configured({"max_concurrent": 1}))
def my_serial_job():
dynamic_results = generate_dynamic_outputs()
dynamic_results.map(process_graph)
In this example, generate_dynamic_outputs
is an operation that yields dynamic outputs. The process_graph
is a graph that processes each piece of data. The my_serial_job
job uses the in_process_executor
with the max_concurrent
parameter set to 1
to ensure that the graph processes each dynamic output one at a time.
Please note that the actual syntax and configuration may vary depending on the version of Dagster you are using and the specific details of your use case. Always refer to the latest Dagster documentation for the most accurate and up-to-date information. Using Dynamic Graphs and Op Priority to ... button How do I do dynamic fan out with Dagster... button How to control breadth-first vs depth-fi... button Dynamic Graphs | Dagster button Thank you for your feedback!Luke Bogacz
03/04/2024, 8:24 PMLuke Bogacz
03/04/2024, 8:24 PMScout
03/04/2024, 8:24 PMmap
method on the dynamic output to apply a function to each element as it is yielded. This will ensure that each dynamic output is processed individually and serially. Here's an example based on the documentation: python from dagster import job, op, DynamicOut, DynamicOutput @op(out=DynamicOut()) def generate_dynamic_outputs(): # Your logic to yield dynamic outputs for i in range(10): yield DynamicOutput(value=i, mapping_key=str(i)) @op def process_individual_output(input_data): # Your logic to process each individual output pass @job def my_serial_job(): dynamic_results = generate_dynamic_outputs() dynamic_results.map(process_individual_output)
In this example, generate_dynamic_outputs
yields dynamic outputs one by one. The process_individual_output
operation is mapped to each dynamic output, which means it will be invoked for each output separately. The map
method ensures that each dynamic output is processed as it is yielded, without waiting to collect all outputs first. By using the map
method, you are effectively fanning out the processing of each dynamic output, but since the in_process_executor
processes operations serially by default, they will be processed one at a time. If you need to enforce this behavior explicitly, you can configure the executor with a concurrency limit as mentioned in the previous message. Remember to check the documentation for the version of Dagster you are using to ensure that the syntax and behavior are consistent with the current implementation.