Vinnie
05/09/2022, 3:10 PMsource
, sink
, etc. I wanna create a graph that does something like this:
@graph
def load_to_sink():
options = fetch_options()
source_data = run_source_script(options["source"])
return run_sink_script(options["sink"], source_data)
This resulted in a DagsterInvalidDefinitionError
with dagster saying I should define multiple outs. Eventually I reached the following solution:
@op(config_schema={'config_file_path': str}, out={"source": Out(), "sink": Out()})
def fetch_options(context):
options = helper.fetch_options(context.op_config['config_file_path'])
return options["source"], options["sink"]
@graph
def load_to_sink():
options = fetch_options()
source_data = run_source_script(options.source)
return run_sink_script(options.sink, source_data)
Is this the “correct” solution? Or is there a way to still return a single dictionary from the first op
and use different properties to call subsequent ops
?Zach
05/09/2022, 4:08 PMfetch_options
into a resource, that way each op could just access the pieces of the configuration they need on the resource object instead of having to take them in as inputs. the thing I like about that pattern is it leaves inputs / outputs more focused on data being processed and generally helps decouple the data graph from the configuration. it also depends on how heavy-weight helper.fetch_options
is and if you want op-level visibility into its execution.yuhan
05/09/2022, 4:53 PMVinnie
05/10/2022, 7:03 AMto_job()
function instead of calling execute_in_progress()
on top since I’m dynamically creating jobs based on the config files:
configs = [{"jobName": "foo", "filepath": "foo/bar.yaml"}]
def generate_jobs():
jobs = {}
for config in configs:
job_options = helper.fetch_options(config["filepath"])
jobs[config["jobName"]] = my_job.to_job(
name=config["jobName"],
config={resources": {"values": {"config": job_options}}},
resource_defs={"values": make_values_resource()})
return jobs
@repository
def my_repo():
return {"jobs": generate_jobs()}
One short followup question I haven’t had too much time to look into just yet: am I correct in assuming the make_values_resource()
is called and configs are scoped only on repository refreshes?