Martin Preusse
03/09/2023, 10:50 PMasset
(or op
) for a list of items. While assets
are executed in parallel if they are independent, I did not find a way to include iteration.
Simple example: Iterate over a list of files and perform a task for each file. The tasks are independent.
python
@asset
def iterate_files():
path = Path("/some/path")
for file in path.iterdir():
# here I would like to start parallel ops
...
If I pass a list to a downstream asset
, the operation is performed sequentially:
python
@asset
def list_of_files():
path = Path("/some/path")
return list(path.iterdir())
@asset
def process_files(list_of_files):
for file in list_of_files):
with open(file, 'rt') as f:
for line in f:
# write to a DB or other operation
Is it possible to achieve a parallel execution of the downstream task with Dagster?
(cross posted from https://stackoverflow.com/questions/75681976/call-downstream-assets-or-ops-in-parallel-with-dagster)chris
03/10/2023, 1:03 AM@op(out=DynamicOut())
def iterate_files():
for file in whatever:
yield DynamicOutput(file, some_mapping_key)
@op
def process_file(file)
yada yada
@graph_asset
def processed_files():
iterate_files.map(processed_files)
Martin Preusse
03/10/2023, 1:05 AMchris
03/10/2023, 1:06 AMMartin Preusse
03/10/2023, 12:06 PMgraph_asset
and that's all I need?chris
03/10/2023, 11:40 PMMartin Preusse
03/15/2023, 2:01 PMdagster._check.CheckError: Invariant failed. Description: The set of output names keys specified in the keys_by_output_name argument must equal the set of asset keys outputted by load_files_to_postgres_parallel_as_asset.
Do I have to define the ops as dependencies in the graph_asset
?