Hi everyone! Apologies if this is a simple questio...
# announcements
a
Hi everyone! Apologies if this is a simple question but I've looked around the documentation, including around config_mapping and configured, but couldn't see anything for my use case. I'm trying to load a JSON that defines my data streams, iterate over those data stream definitions and then run parts of my pipeline based on those definitions. The issue I have is that because the iterative definitions are created in the pipeline not a solid I get a
DagsterInvalidDefinitionError
. I tried moving the loop into a composite_solid but received the same error. Is there a standard way to handle loops in Dagster?
For reference this is the code where I'm encountering issues:
Copy code
@composite_solid
def update_dataset(source: str, stream_meta: dict):   
    df_stream_latest = get_latest_df(source, stream_meta)
    da_stream_latest = convert_df_to_da(df_stream_latest, stream_meta)
    save_da_to_zarr(da_stream_latest, source, stream_meta)
    
    return 

@pipeline
def update_datasets_pipeline(): 
    with open('../data/data_spec.json', 'r') as fp:
        data_spec = json.load(fp)
        
    for source in data_spec.keys():
        for stream_meta in data_spec[source]:
            update_dataset(source, stream_meta)
    return
get_latest_df
,
convert_df_to_da
, and
save_da_to_zarr
are all solids as well
n
Not yet
Simple fan out loops are have a working prototype and should be in shortly after the "no one gets anything done" EOY dead zone
Remember that pipelines are static data right now, while you write them in a Python DSL, they are compiled into a PipelineDefintion object and eventually into JSON 🙂
a
Ah that's unfortunate, I'll look at other options, thank you
Unfortunately it doesn't look like I'm going to be able to achieve what I wanted to with Dagster after some further investigation. It would be helpful if the static nature of the DAGs is more apparant in the documentation, it seems that this has happened to others as well.