Charles Lariviere
02/09/2021, 11:35 PMDynamicOutput
to a composite_solid
? My pipeline works fine when I pass the DynamicOutput
to a single solid
, but I would like the .map()
to execute more than one solid, with the general flow being;
1. Fetch a list of IDs from the database (unknown before execution time);
2. For each; query an API, build a dataframe, output to database
But I’m getting the following error when I package step 2. as a `composite_solid`;
dagster.core.errors.DagsterSubprocessError: dagster.check.CheckError: Member of list mismatches type. Expected <class 'dagster.core.execution.plan.inputs.StepInput'>. Got UnresolvedStepInput(name='id', dagster_type_key='Int', source=FromPendingDynamicStepOutput(step_output_handle=StepOutputHandle(step_key='query_records', output_name='id', mapping_key=None), solid_handle=SolidHandle(name='do_multiple_steps', parent=None), input_name='id')) of type <class 'dagster.core.execution.plan.inputs.UnresolvedStepInput'>.
composite_solid
as follows:
@composite_solid(
input_defs=[InputDefinition("id", int)]
)
def do_multiple_steps(id):
output = do_something(id)
rows = do_something_else(output)
And my DynamicOutput
as follows:
@solid(
output_defs=[DynamicOutputDefinition(int, name="id")]
)
def query_records(_):
results = query_database()
for id in results:
yield DynamicOutput(values=id, mapping_key=str(id), output_name="id")
With the pipeline being defined as follows:
@pipeline()
def my_pipeline():
query_records().map(do_multiple_steps)
alex
02/10/2021, 4:53 PMCharles Lariviere
02/10/2021, 5:02 PMalex
02/10/2021, 9:53 PM