LP
02/17/2023, 7:17 PMdef op_factory(name):
@op(name=name):
def common_step():
pass
return common_step
def op_factory_v2(name):
@op(name=name):
def common_step_1():
pass
return common_step_1
@graph(out={"step_1": GraphOut(),
"step_2": GraphOut()})
def graph_1(): # forecasting_pipeline
step_output_dict = {}
for i in range(2):
step_output_dict[f"step_i}"] = op_factory(i)()
return {"step_1": step_output_dict["step_1"],
"step_2": step_output_dict["step_2"]}
@graph
def graph_2(step_1, step_2):
op_factory_v2("combine_graph")(step_1, step_2)
@noodle_workflow()
def ml_workflow():
step_1, step_2 = graph_1()
graph_2(step_1, step_2)
owen
02/17/2023, 7:26 PMLP
02/17/2023, 7:32 PMstep_output_dict
. How I can access this dict on downstream in graph_2().
@graph(out={"step_1": GraphOut(),
"step_2": GraphOut()})
def graph_1(): # forecasting_pipeline
step_output_dict = {}
for i in range(2):
step_output_dict[f"step_i}"] = op_factory(i)()
return step_output_dict
@graph
def graph_2(single_dict_output:
op_factory_v2("combine_graph")(single_dict_output)
@noodle_workflow()
def ml_workflow():
single_dict_output = graph_1()
graph_2(single_dict_output)
owen
02/17/2023, 7:57 PMLP
02/17/2023, 8:05 PMgraph graph_1 returned problematic value of type class list. Expected return value from invoked solid or dict mapping output name to return values from invoked solids
@graph(out={"step_output_list": GraphOut()})
def graph_1(): # forecasting_pipeline
step_output_list = {}
for i in range(2):
step_output_list.append(op_factory(i)())
return step_output_list
@graph
def graph_2(single_dict_output:
op_factory_v2("combine_graph")(single_dict_output)
@noodle_workflow()
def ml_workflow():
step_output_list = graph_1()
graph_2(step_output_list)
owen
02/17/2023, 9:42 PMfrom dagster import graph, GraphOut, op
def op_factory(name):
@op(name=name)
def common_step():
pass
return common_step
def combine_op_factory(name):
@op(name=name)
def combine_op(step_output_list):
# step_output_list has the two outputs from graph_1
pass
return combine_op
@graph(out={"step_output_list": GraphOut()})
def graph_1(): # forecasting_pipeline
step_output_list = []
for i in range(2):
step_output_list.append(op_factory(f"op_{i}")())
return tuple(step_output_list)
@graph
def combine_graph(step_output_list):
combine_op_factory("combine_op_1")(step_output_list)
@graph
def ml_workflow():
step_output_list = graph_1()
combine_graph(step_output_list)
owen
02/17/2023, 9:42 PMLP
02/18/2023, 7:38 PMgraph 'graph_1' returned problematic value of type <class 'tuple'>. Expected return value from invoked solid or dict mapping output name to return values from invoked solids