I am looking for a way to explicitly call/execute ...
# ask-community
m
I am looking for a way to explicitly call/execute an
asset
(or
op
) for a list of items. While
assets
are executed in parallel if they are independent, I did not find a way to include iteration. Simple example: Iterate over a list of files and perform a task for each file. The tasks are independent.
Copy code
python
@asset
def iterate_files():
	path = Path("/some/path")
	
	for file in path.iterdir():
		# here I would like to start parallel ops
		...
If I pass a list to a downstream
asset
, the operation is performed sequentially:
Copy code
python

@asset
def list_of_files():
	path = Path("/some/path")
	return list(path.iterdir())

@asset
def process_files(list_of_files):
	for file in list_of_files):
		with open(file, 'rt') as f:
			for line in f:
				# write to a DB or other operation
Is it possible to achieve a parallel execution of the downstream task with Dagster? (cross posted from https://stackoverflow.com/questions/75681976/call-downstream-assets-or-ops-in-parallel-with-dagster)
c
use a graph-backed asset with dynamic ops:
Copy code
@op(out=DynamicOut())
def iterate_files():
    for file in whatever:
        yield DynamicOutput(file, some_mapping_key)

@op
def process_file(file)
    yada yada

@graph_asset
def processed_files():
    iterate_files.map(processed_files)
m
interesting! thanks for the pointer.
I have only used assets so far (and absolutely love them). Looks like I have to take the next step now 😄
c
now you’re thinking in dagster blob devil
m
Follow up question: is it possible to trigger this op from an asset? Or do I include the upstream dependency in the
graph_asset
and that's all I need?
c
not possible to trigger an op from an asset - if you have an op that you want to run as part of your assets, it should likely be wrapped in a graph-backed asset (or, you should potentially see if you can reframe that op as a software-defined asset)
m
@chris I tried the solution with the graph-backed asset (which seems to be the most logical one) but I run into an error that I don't understand:
Copy code
dagster._check.CheckError: Invariant failed. Description: The set of output names keys specified in the keys_by_output_name argument must equal the set of asset keys outputted by load_files_to_postgres_parallel_as_asset.
Do I have to define the ops as dependencies in the
graph_asset
?
116 Views