Hi all, this feels like a stupid question but I ca...
# ask-community
v
Hi all, this feels like a stupid question but I can’t seem to figure if what I’m using is the intended pattern: I have a project with a bunch of config files including information like
source
,
sink
, etc. I wanna create a graph that does something like this:
Copy code
@graph
def load_to_sink():
    options = fetch_options()
    source_data = run_source_script(options["source"])
    return run_sink_script(options["sink"], source_data)
This resulted in a
DagsterInvalidDefinitionError
with dagster saying I should define multiple outs. Eventually I reached the following solution:
Copy code
@op(config_schema={'config_file_path': str}, out={"source": Out(), "sink": Out()})
def fetch_options(context):
    options = helper.fetch_options(context.op_config['config_file_path'])
    return options["source"], options["sink"]

@graph
def load_to_sink():
    options = fetch_options()
    source_data = run_source_script(options.source)
    return run_sink_script(options.sink, source_data)
Is this the “correct” solution? Or is there a way to still return a single dictionary from the first
op
and use different properties to call subsequent
ops
?
z
I don't know what's necessarily correct but you could also consider refactoring
fetch_options
into a resource, that way each op could just access the pieces of the configuration they need on the resource object instead of having to take them in as inputs. the thing I like about that pattern is it leaves inputs / outputs more focused on data being processed and generally helps decouple the data graph from the configuration. it also depends on how heavy-weight
helper.fetch_options
is and if you want op-level visibility into its execution.
y
Hi @Vinnie, what @Zach describes is exactly what I’d recommend. Here’s an example of how to pass config values to multiple ops: https://docs.dagster.io/concepts/configuration/config-schema#passing-configuration-to-multiple-ops-in-a-job
v
Right, interesting. I like the approach, in our case it’s probably more about doing some dict manipulation after fetching the options, all that function does is read from a local or remote storage. I’ll play around with it a little bit!
blob thumbs up 1
Just wanted to say it worked out great. Took me a few tries to figure out I should pass the configs when calling the
to_job()
function instead of calling
execute_in_progress()
on top since I’m dynamically creating jobs based on the config files:
Copy code
configs = [{"jobName": "foo", "filepath": "foo/bar.yaml"}]

def generate_jobs():
    jobs = {}
    for config in configs:
        job_options = helper.fetch_options(config["filepath"])
        jobs[config["jobName"]] = my_job.to_job(
            name=config["jobName"],
            config={resources": {"values": {"config": job_options}}},
            resource_defs={"values": make_values_resource()})

    return jobs

@repository
def my_repo():
    return {"jobs": generate_jobs()}
One short followup question I haven’t had too much time to look into just yet: am I correct in assuming the
make_values_resource()
is called and configs are scoped only on repository refreshes?