https://dagster.io/ logo
#announcements
Title
# announcements
c

Charles Lariviere

02/09/2021, 11:35 PM
Hey 👋 Can we pass a
DynamicOutput
to a
composite_solid
? My pipeline works fine when I pass the
DynamicOutput
to a single
solid
, but I would like the
.map()
to execute more than one solid, with the general flow being; 1. Fetch a list of IDs from the database (unknown before execution time); 2. For each; query an API, build a dataframe, output to database But I’m getting the following error when I package step 2. as a `composite_solid`;
Copy code
dagster.core.errors.DagsterSubprocessError: dagster.check.CheckError: Member of list mismatches type. Expected <class 'dagster.core.execution.plan.inputs.StepInput'>. Got UnresolvedStepInput(name='id', dagster_type_key='Int', source=FromPendingDynamicStepOutput(step_output_handle=StepOutputHandle(step_key='query_records', output_name='id', mapping_key=None), solid_handle=SolidHandle(name='do_multiple_steps', parent=None), input_name='id')) of type <class 'dagster.core.execution.plan.inputs.UnresolvedStepInput'>.
I’ve structured my
composite_solid
as follows:
Copy code
@composite_solid(
    input_defs=[InputDefinition("id", int)]
)
def do_multiple_steps(id):
    output = do_something(id)
    rows = do_something_else(output)
And my
DynamicOutput
as follows:
Copy code
@solid(
  output_defs=[DynamicOutputDefinition(int, name="id")]
)
def query_records(_):
  results = query_database()
  for id in results:
    yield DynamicOutput(values=id, mapping_key=str(id), output_name="id")
With the pipeline being defined as follows:
Copy code
@pipeline()
def my_pipeline():
  query_records().map(do_multiple_steps)
a

alex

02/10/2021, 4:53 PM
based on the error this looks like a real bug - thanks for the report!
c

Charles Lariviere

02/10/2021, 5:02 PM
Sounds good, thanks @alex!
a

alex

02/10/2021, 9:53 PM
fix should go out tomorrow
🙌 1