One thing I’ve been struggling with in Dagster is ...
# ask-community
j
One thing I’ve been struggling with in Dagster is how to build dynamic DAGs based on some configuration state; like values in a resource config. This is quite common in ML experiment workflows where based on the config values the set of solids I want to execute are different. Take this pipeline for example:
Copy code
@pipeline(mode_defs=[cluster_mode_def])
def build_models():
    # determine if we're building or copying the training data set from production
    if config['BUILD_DATA_SET']:
        training_data_set = cmds.build_training_data_set(cmds.training_data_config())
    else:
        training_data_set = cmds.copy_training_data_set()

    # build vectors for each feature family, build full vector space, then build the model
    cmds.build_model(
        cmds.build_feature_space(
            cmds.build_feature_vectors(
                training_data_set
            )
        )
    )
Based on some config state I’d want to either build my training dataset or just copy it from a production environment. But since pipelines don’t execute, just build the solid DAG structure and don’t have access to any configuration information, how would I do this?
a
you see this piece of the docs https://docs.dagster.io/concepts/solids-pipelines/pipelines#conditional-branching ? This is one approach you can also use
solid_selection
arguments in most places runs are kicked off, but based on your description i think using the optional outputs may be good for your case
Copy code
@solid(
    config_schema={Selector({"build": build_config(), "copy": copy_config()})},
    output_defs=[
        OutputDefinition("build_info", is_required=False),
        OutputDefinition("copy_info", is_required=False),
    ],
)
def determine_training_data_source(context):
    if context.solid_config.get("build"):
        yield Output(context.solid_config["build"], output_name="build_info")
    else:
        yield Output(context.solid_config["copy"], output_name="copy_info")


@pipeline(mode_defs=[cluster_mode_def])
def build_models():
    # determine if we're building or copying the training data set from production

    build_info, copy_info = determine_training_data_source()

    # only one of these paths will be taken at run time
    copied_training_data_set = cmds.copy_training_data(copy_info)
    built_training_data_set = cmds.build_training_data(build_info)

    # build vectors for each feature family, build full vector space, then build the model
    cmds.build_model(
        cmds.build_feature_space(
            cmds.build_feature_vectors(
                # here we "fan-in" to prevent the skipping behavior
                # of optional outputs from propagating further.
                # we can assert in this solid (or an added intermediate one)
                # that we have a list of only one item
                traning_data_set=[built_training_data_set, copied_training_data_set]
            )
        )
    )
j
This’ll work, but fundamentally it points to a couple areas of concern I have about adopting Dagster for ML / data science. First, it requires that you fully define every possible path through the DAG up front, then leave the execution flow to be based on the branching structures you show above. This is not flexible and could be an issue when I need to quickly create and run different model experiments, where each one would be a different DAG path. Second, this is really verbose, having to create an entire solid just to encapsulate a simple
if
branching statements. The pipeline structure ends up looking more like a graphical programming language vs a graphical DSL, which is what I’m really looking for.
I think being able to access config and resources in pipeline functions and composite_solid functions would really add this type of flexibility
Some of my early pipelines just used a hard coded dict to hold my config, vs using a resource. This allowed me to have conditional logic to build different DAG structures based on the values in the dict. But once I tried to integrate all my config into Dagster’s resource functionality, all that flexibility went away
a
Thanks, thats useful feedback. Are any of these dynamic decisions based on information from running solids or just config time information? If different pipelines/DAGs were created for each different permutation needed, approximately how many different pipelines would you guess would exist?
j
No, this is all based on config values. I might have 10 different experiments I want to try, each with different DAG structures in some way. I want to be able to change the config, run an experiment, evaluate the results, and repeat. If at all possible I’d rather keep my pipeline code as static as possible each experiment, otherwise I have to change the code, deploy, and refresh the Dagster repository each run
I do think this workflow is probably pretty data science / ML specific, where each run is slightly different. I can totally see for standard production ETL pipelines, you would want to build your pipeline to do exactly what it needs to do, with little flexibility needed
a
Thanks for the extra context. Just to share something that some other users have done, is having a repository of programmatically generated pipelines using a bespoke DSL/config scheme https://docs.dagster.io/concepts/solids-pipelines/pipelines#pipeline-dsl If these are read from an external source like a database or file, you only need to restart the process as opposed to redeploy code to pick up a new variant. One user made their own GUI DAG creation tool that persisted their intermediate pipeline representation to a database that the repository would then load from.
m
Hi there, I am busy trawling through the comments to try find out how to have a more generic pipeline that is static, but can have lots of permutations at run time, based on some config. OR, be able to create a pipeline automatically from some config. This is also ML/data science focused where we are building out a core product, but the implementation of each pipeline will need to vary slightly, depending on the data source. The above thread sounded promising but points to links that don't exist anymore. Do you have any updated resources about programmatically generated pipelines using a config scheme or a GUI to do so?