Ayrton Bourn
12/12/2020, 11:15 AMcomposite_solid
.
@composite_solid(
config_schema={
'source': Field(str, is_optional=True, default_value='BMRS'),
'stream': Field(str, is_optional=True, default_value='FUELHH'),
'col_dim': Field(str, is_optional=True, default_value='fueltype'),
}
)
def update_dataset(context):
df_stream_latest = get_latest_df(source, context.solid_config['stream'])
da_stream_latest = convert_df_to_da(df_stream_latest, context.solid_config['col_dim'])
save_da_to_zarr(da_stream_latest, source, context.solid_config['stream'])
return
@pipeline
def update_datasets_pipeline():
update_dataset()
return
However composite_solid
can't accept a config_schema, which means I have to repeatedly specify the same variables both in the individual solids, but also the pipeline inputs yaml. E.g.
@solid(
config_schema={
'source': Field(str, is_optional=True, default_value='BMRS'),
'stream': Field(str, is_optional=True, default_value='FUELHH')
}
)
def get_latest_df(context) -> Any:
df_stream_latest = update.get_stream_latest_data(context.solid_config['source'], context.solid_config['stream'])
return df_stream_latest
@solid(
config_schema={
'col_dim': Field(str, is_optional=True, default_value='fueltype')
}
)
def convert_df_to_da(context, df_stream_latest: Any) -> Any:
da_stream_latest = update.convert_df_to_da(df_stream_latest, context.solid_config['col_dim'])
return da_stream_latest
@solid(
config_schema={
'source': Field(str, is_optional=True, default_value='BMRS'),
'stream': Field(str, is_optional=True, default_value='FUELHH')
}
)
def save_da_to_zarr(context, da_stream_latest: Any):
update.save_stream_data_to_zarr(da_stream_latest, context.solid_config['source'], context.solid_config['stream'])
return
Is there a way to specify these variables only once?