Jeff Hulbert
03/11/2021, 4:34 PM@lambda_solid
def collect(lst: List) -> List:
return lst
@pipeline
def pipeline_fanin():
tables = ["table1", "table2", "table3"]
all_tables_run = []
for table in tables:
run_table = configured(
load_table, name=f"load_table_{table}"
)({"table_name": table})
all_tables_run.append(run_table())
collect(all_tables_run)
@composite_solid(
config_schema={
"table_name": str,
},
config_fn=table_config,
)
def load_table():
df = gzip_json_to_df_solid(download_full_table())
load_df_to_db(df)
To fix add a return:
@composite_solid(
config_schema={
"table_name": str,
},
config_fn=table_config,
)
def load_table():
df = gzip_json_to_df_solid(download_table())
return load_df_to_db(df)