Ayrton Bourn
12/12/2020, 9:08 AMDagsterInvalidDefinitionError
. I tried moving the loop into a composite_solid but received the same error.
Is there a standard way to handle loops in Dagster?@composite_solid
def update_dataset(source: str, stream_meta: dict):
df_stream_latest = get_latest_df(source, stream_meta)
da_stream_latest = convert_df_to_da(df_stream_latest, stream_meta)
save_da_to_zarr(da_stream_latest, source, stream_meta)
return
@pipeline
def update_datasets_pipeline():
with open('../data/data_spec.json', 'r') as fp:
data_spec = json.load(fp)
for source in data_spec.keys():
for stream_meta in data_spec[source]:
update_dataset(source, stream_meta)
return
get_latest_df
, convert_df_to_da
, and save_da_to_zarr
are all solids as wellNoah K
12/12/2020, 9:18 AMAyrton Bourn
12/12/2020, 9:29 AM