Hi <@U01EK4V7BM4>, is there a way I can return mul...
# ask-community
l
Hi @jordan, is there a way I can return multiple graphs output for downstream one graph ? Example as below. Basically, I want to combine output of 2 or more graph and send output back to 1 single graph.
Copy code
def op_factory(name):
    @op(name=name):
    def common_step():
        pass
    return common_step

def op_factory_v2(name):
    @op(name=name):
    def common_step_1():
        pass
    return common_step_1

@graph(out={"step_1": GraphOut(),
            "step_2": GraphOut()})
def graph_1(): # forecasting_pipeline

    step_output_dict = {}
    for i in range(2):
        step_output_dict[f"step_i}"] = op_factory(i)()

    return {"step_1": step_output_dict["step_1"],
            "step_2": step_output_dict["step_2"]}


@graph
def graph_2(step_1, step_2):
    op_factory_v2("combine_graph")(step_1, step_2)


@noodle_workflow()
def ml_workflow():
    step_1, step_2 = graph_1()
    graph_2(step_1, step_2)
o
hi @LP! this sort of pattern is definitely possible -- any specific roadblocks you're running into when trying to implement this?
l
Hi @owen, Thank you for reply, return value for graph_1() if there are more then 2 graphs then how we can pass them to graph_2. Is it possible to append those values to
step_output_dict
. How I can access this dict on downstream in graph_2().
Copy code
@graph(out={"step_1": GraphOut(),
            "step_2": GraphOut()})
def graph_1(): # forecasting_pipeline

    step_output_dict = {}
    for i in range(2):
        step_output_dict[f"step_i}"] = op_factory(i)()

    return step_output_dict

@graph
def graph_2(single_dict_output:
    op_factory_v2("combine_graph")(single_dict_output)


@noodle_workflow()
def ml_workflow():
    single_dict_output = graph_1()
    graph_2(single_dict_output)
o
ah i see — you should be able to pass in two graph outputs as a list like graph_2([step1, step2])
l
Can you please elaborate? Is this your trying to say. I tried this but this is giving error.
graph graph_1 returned problematic value of type class list. Expected return value from invoked solid or dict mapping output name to return values from invoked solids
Copy code
@graph(out={"step_output_list": GraphOut()})
def graph_1(): # forecasting_pipeline

    step_output_list = {}
    for i in range(2):
        step_output_list.append(op_factory(i)())

    return step_output_list

@graph
def graph_2(single_dict_output:
    op_factory_v2("combine_graph")(single_dict_output)


@noodle_workflow()
def ml_workflow():
    step_output_list = graph_1()
    graph_2(step_output_list)
o
Copy code
from dagster import graph, GraphOut, op


def op_factory(name):
    @op(name=name)
    def common_step():
        pass

    return common_step


def combine_op_factory(name):
    @op(name=name)
    def combine_op(step_output_list):
        # step_output_list has the two outputs from graph_1
        pass

    return combine_op


@graph(out={"step_output_list": GraphOut()})
def graph_1():  # forecasting_pipeline
    step_output_list = []
    for i in range(2):
        step_output_list.append(op_factory(f"op_{i}")())

    return tuple(step_output_list)


@graph
def combine_graph(step_output_list):
    combine_op_factory("combine_op_1")(step_output_list)


@graph
def ml_workflow():
    step_output_list = graph_1()
    combine_graph(step_output_list)
this should work !
l
Hi @owen, again the same error
graph 'graph_1' returned problematic value of type <class 'tuple'>. Expected return value from invoked solid or dict mapping output name to return values from invoked solids