I'm attempting to run several composite solids in ...
# announcements
j
I'm attempting to run several composite solids in parallel and then have them fan-in and then run another solid but I am receiving the following error: dagster.core.errors.DagsterUserCodeProcessError: dagster.core.errors.DagsterInvalidDefinitionError: In @pipeline pipeline_fanin, received a list containing an invalid type at index 0 for input "lst" (at position 0) in solid invocation collect. Lists can only contain the output from previous solid invocations or input mappings, received <class 'list'> load_table is a composite solid, which runs fine on its own but when I try to do the fan-in like below I get the error. What is the proper way to do fan-in for this?
Copy code
@lambda_solid
def collect(lst: List) -> List:
    return lst

@pipeline
def pipeline_fanin():
    tables = ["table1", "table2", "table3"]
    all_tables_run = []
    for table in tables:
        run_table = configured(
            load_table, name=f"load_table_{table}"
        )({"table_name": table})
        all_tables_run.append(run_table())
    collect(all_tables_run)
Fixed it. Even though the final step of the composite solid doesn't return anything still needed to have it return the output of its last solid. So the code looked like this when error was occurring:
Copy code
@composite_solid(
    config_schema={
        "table_name": str,
    },
    config_fn=table_config,
)
def load_table():
    df = gzip_json_to_df_solid(download_full_table())
    load_df_to_db(df)
To fix add a return:
Copy code
@composite_solid(
    config_schema={
        "table_name": str,
    },
    config_fn=table_config,
)
def load_table():
    df = gzip_json_to_df_solid(download_table())
    return load_df_to_db(df)
👍 1