As a workaround I thought I could run a separate p...
# announcements
a
As a workaround I thought I could run a separate pipeline for processing each data stream. I was hoping that I'd be able to specify the relevant variables only in the
composite_solid
.
Copy code
@composite_solid(
    config_schema={
        'source': Field(str, is_optional=True, default_value='BMRS'),
        'stream': Field(str, is_optional=True, default_value='FUELHH'),
        'col_dim': Field(str, is_optional=True, default_value='fueltype'),
    }
)
def update_dataset(context):   
    df_stream_latest = get_latest_df(source, context.solid_config['stream'])
    da_stream_latest = convert_df_to_da(df_stream_latest, context.solid_config['col_dim'])
    save_da_to_zarr(da_stream_latest, source, context.solid_config['stream'])
    
    return 

@pipeline
def update_datasets_pipeline(): 
    update_dataset()
    return
However
composite_solid
can't accept a config_schema, which means I have to repeatedly specify the same variables both in the individual solids, but also the pipeline inputs yaml. E.g.
Copy code
@solid(
    config_schema={
        'source': Field(str, is_optional=True, default_value='BMRS'),
        'stream': Field(str, is_optional=True, default_value='FUELHH')
    }
)
def get_latest_df(context) -> Any:
    df_stream_latest = update.get_stream_latest_data(context.solid_config['source'], context.solid_config['stream'])
    return df_stream_latest

@solid(
    config_schema={
        'col_dim': Field(str, is_optional=True, default_value='fueltype')
    }
)
def convert_df_to_da(context, df_stream_latest: Any) -> Any:
    da_stream_latest = update.convert_df_to_da(df_stream_latest, context.solid_config['col_dim'])
    return da_stream_latest

@solid(
    config_schema={
        'source': Field(str, is_optional=True, default_value='BMRS'),
        'stream': Field(str, is_optional=True, default_value='FUELHH')
    }
)
def save_da_to_zarr(context, da_stream_latest: Any):
    update.save_stream_data_to_zarr(da_stream_latest, context.solid_config['source'], context.solid_config['stream'])
    return
Is there a way to specify these variables only once?