Alan Bohannon
03/12/2024, 11:29 PMScout
03/12/2024, 11:29 PMpython @graph( ins={"new_user_signups": GraphIn()}, out={"signups_today": GraphOut(), "num_signups_today": GraphOut()}, ) def users_filtered_by_date(new_user_signups): signups_today = filter_for_date(new_user_signups) return signups_today, num_users(signups_today) asset_def = AssetsDefinition.from_graph(users_filtered_by_date)
In this example, the users_filtered_by_date
graph accepts new_user_signups
as its input asset and outputs two assets, signups_today
and num_signups_today
. The AssetsDefinition.from_graph
method is used to create asset definitions from this graph. Graph assets are a powerful abstraction in Dagster that help you manage and organize your data computations in a way that is clear, maintainable, and scalable.Alan Bohannon
03/12/2024, 11:31 PMScout
03/12/2024, 11:31 PMDynamicOutput
objects, each representing a batch of data to be processed. Subsequent ops can then map over these dynamic outputs to process each batch in parallel. Here's a simplified example from the documentation that illustrates the concept: python @op(out=DynamicOut()) def return_dynamic(input_val): for idx in range(10): yield DynamicOutput(value=input_val, mapping_key=str(idx)) @op def process_batch(batch): # Process each batch pass @graph def my_graph(): dynamic_result = return_dynamic() dynamic_result.map(process_batch)
In this example, return_dynamic
yields multiple dynamic outputs that represent different batches of data. The process_batch
op is then mapped over these dynamic outputs, allowing each batch to be processed in parallel. However, the actual concurrency behavior—whether these batches are processed simultaneously or not—will depend on the executor configuration for your job. For instance, if you use the multiprocess executor, Dagster can run each batch in a separate process, achieving parallel execution. Keep in mind that while graph assets can facilitate parallel execution, the degree of concurrency and the exact execution behavior are controlled by the job's executor and any concurrency limits you set in your Dagster system configuration.