sk4la
03/10/2021, 2:34 PM@pipeline
def ingestion_pipeline():
def ingest(file):
ingest_file(path=file)
def spread(file):
gather_files(path=file).map(ingest)
gather_files().map(spread)
When I execute this pipeline, I get the following:
Solid "ingest_file" cannot be downstream of more than one dynamic output. It is downstream of both "gather_files" and "gather_files_2"
As I understand it, nesting dynamic outputs is not currently implemented.
Alright, so instead of mapping from inside the spread
function, I tried to map over its result, like this:
@pipeline
def ingestion_pipeline():
def ingest(file):
ingest_file(path=file)
def spread(file):
return gather_files(path=file)
spread_files = gather_files().map(spread)
spread_files.map(ingest)
It does not work either, which makes sense since the underlying dependency graph should be the same.
Does anyone have tips on how to overcome this kind of situation using Dagster?alex
03/10/2021, 4:23 PMfiles from inside of filesare these directories? im not sure exactly what this means Eitherway i think the workaround is to have the body of the
gather_files
solid which is emitting the dynamic outputs to do the recursion in to the “files that contain other files” and emit the whole set straight away - if thats possiblesk4la
03/10/2021, 4:45 PMalex
03/10/2021, 4:56 PMcollect
for this upcoming releasesk4la
03/10/2021, 5:00 PMalex
04/27/2021, 3:14 PM