is it possible to define a graph that uses a runti...
# ask-community
is it possible to define a graph that uses a runtime config? I want the graph to use an “op factory” pattern based off of a config variable but the value of that variable needs to be defined at the job level. does that make sense?
i’m trying to define a generic graph that will take a list of queries and generate the ops that will, in parallel, execute each query and then do some subsequent steps with the respective output
the jobs created from the graph will all follow the same pattern, but the queries (and other config) passed to each job will be different
so I want the graph to look something like this
Copy code
def configurable_graph():
    client = get_client()

    for query in graph_config["queries"]:
        op1, op2, op3 = op_factory(query)

        out1 = op1(client)

        out2 = op2(out1)

        out3 = op3(out2)
and the jobs would look like this:
Copy code
job1 = configurable_graph.to_job(
        "foo": bar1,
        "spam": eggs1,

job2 = configurable_graph.to_job(
        "foo": bar2,
        "spam": eggs2,
the graph should read the queries from
and be accessible through what I called
, but I don't know what Dagster component
actually is, if it even exists
Something similar I wrote, perhaps this is helpful:
Copy code
def get_query_dataframe_op(
    name: str,
    connection_resource: str,
    query: Optional[Dict[str, Any]] = None,
    parameters: Optional[Dict[str, Any]] = None,
) -> SolidDefinition:

    if "sql" not in query:
        sql_query_file_path = file_relative_path(__file__, f"queries/{name}.sql")
        with open(sql_query_file_path, "r") as query_file:
            query["sql"] =

    output_dataframe_name = f"{PREFIX_DATAFRAME}{name}"
    out = {
        output_dataframe_name: Out(
            metadata={"label": name},
            # asset_key=AssetKey(name),

    def query_dataframe(context: OpExecutionContext,) -> Iterator[pd.DataFrame]:

        if parameters is not None:
            query["sql"] = query["sql"].format(**parameters)
        dataframe = pd.read_sql_query(
            con=getattr(context.resources, connection_resource).engine, **query
        return Output(value=dataframe, output_name=output_dataframe_name)

    return query_dataframe
Sounds like you're maybe just needing 1 layer above that for example, here where they show building a graph from yaml:
In their graph dsl example they don't show loading the graph config or yaml through the graph or job definition or using launch pad. I think this is because the graph is created on compiling of the code for the registry, and I think the launchpad configuration is intended to fit the graph that exists at that time. If you didn't pass it a yaml there wouldn't be a graph. So it seems you need some configuration further back at the locations or dagster yaml level or like they did, built in to their graph compiling code. Seems like you're wanting configuration exposed in the UI that could be used to construct a graph or graphs? Prior to running them through launch pad? Or does the UI not factor into this and you just need to execute an dynamically created graph, and don't really care if it is exposed through the UI prior to asset materialization?
yeah UI doesn't really matter, building a graph from yaml is interesting but now that seems too far in the other direction lol
I've got this working by spooling the queries via DynamicOutput, but I haven't been successful in passing that output to an op factory
and I'm trying to use an op factory to leverage some more of the op definition features that I'm not able to hardcode (e.g. metadata)
Example Dynamic Query Graph:
Copy code
from typing import Tuple, List

from dagster import DynamicOutput, DynamicOutputDefinition, In, graph, job, op, resource
from dagster.utils.yaml_utils import load_yaml_from_path

import sqlalchemy as sa

def query_iterable_from_yaml(context):
    <|>("Loading query yaml: " + context.op_config["yaml_path"])
    yaml_data = load_yaml_from_path(context.op_config["yaml_path"])

    for name,query_config in yaml_data["queries"].items():
        yield DynamicOutput((name, query_config["sql"]), mapping_key=name)

def execute_query(context, query_tuple):
    name, sql = query_tuple
    result = context.resources.sqlalchemy_connection.execute(sql)
    return result

def result_fan_in(results: List):
    for result in results:

def query_graph():
    query_iterable = query_iterable_from_yaml()
    results =
    result_collection = result_fan_in(results)
    return result_collection

@resource(config_schema={"connection_string": str})
def sqlalchemy_resource(context) -> sa.engine.Engine:
    engine = sa.create_engine(context.resource_config["connection_string"])
    return engine

@job(resource_defs={"sqlalchemy_connection": sqlalchemy_resource.configured({
    "connection_string": "<mssql+pyodbc://server/database?driver=ODBC+Driver+17+for+SQL+Server&trusted_connection=yes>"
def query_job():

query_job.execute_in_process(run_config={"ops":{"query_graph": {"ops":{"query_iterable_from_yaml":{"config":{"yaml_path": "query.yaml"}}}}}})
I think the above graph is roughly what you're looking for.