https://dagster.io/ logo
Title
d

dinya

11/23/2021, 1:09 PM
Hello all. I wanna to pass both
param0
and
param1
from
get_record
op to
calc_record
op
from dagster import (op, graph, In, Out, Optional, String)

@op(
    out={
        "param0": Out(Optional[String]),
        "param1": Out(Optional[String]),
    }
)
def get_record(_):
    param0 = None
    param1 = None
    return param0, param1


@op(
    ins={
        "param0": In(Optional[String]),
        "param1": In(Optional[String]),
    },
)
def calc_record(_, param0, param1):
    pass


@graph
def process_record_graph():
    calc_record(get_record())

process_record_job = process_record_graph.to_job()
But Dagster fails with
dagster.core.errors.DagsterInvalidDefinitionError: In @graph process_record_graph, received a tuple of multiple outputs for input "param0" (at position 0) in op invocation calc_record. Must pass individual output, available from tuple: ('param0', 'param1')
Single-param ops in-out works well. According to dagster docs multiple out (python tuples) is ok for ops. What am I doing wrong? I use v0.13.8
1
m

Mike Davison

11/23/2021, 4:44 PM
I new to dagster, but I've got this sort of thing to work yielding values instead of returning a tuple. I'd be interested to know if this is the right' approach.
yield Output(value = None, output_name = "param0")
yield Output(value = None, output_name = "param1")
👍 1
c

claire

11/23/2021, 4:49 PM
Yes, we recommend outputting multiple outputs within ops via
yield
statements
👍 2
m

Mike Davison

11/23/2021, 4:51 PM
Is it possible to pass the outputs from get_record right into calc_record without assigning the outputs from get_record to local variables? Or does it need to be like this:
👍 1
@graph
def process_record_graph():
    my_param0, my_param1 = get_record()
    calc_record(my_param0, my_param1)
c

claire

11/23/2021, 5:00 PM
Yep, you would need to assign the outputs to local variables like in your code snippet above. If you want to avoid doing that, you can also consider condensing the two ops into one
👍 2
d

dinya

11/24/2021, 6:58 AM
@Mike Davison, @claire thanks for the clarification @claire, reading `@dagster.op` docstring v0.13.8:
1. Return a value. This value will be wrapped in an Output and yielded by the compute function.
2. Return an Output. This output will be yielded by the compute function.
3. Yield Output or other event objects. Same as default compute behavior.
Maybe add to item 3 your notes about multiple outputs case (forced
yield
instead of
return
)? About return multiple values as tuple. You wrote above "we recommend outputting multiple outputs within ops via yield statements". Did you mean that this is currently status and ops will be able to
return
multiple values (tuple) on a par with
yield
in the future. Or did you mean "recommend == you must" and
yield
for multiple Outputs/values is the dagster design feature (ops are generator-like objects?)?
Btw, assigning the outputs from
get_record
to local variables is only working way
from dagster import op, graph, job, In, Out, Optional, String, Output


@op(
    out={
        "param0": Out(Optional[String]),
        "param1": Out(Optional[String]),
    }
)
def get_record(_):
    yield Output(value=None, output_name="param0")
    yield Output(value=None, output_name="param1")


@op(
    ins={
        "param0": In(Optional[String]),
        "param1": In(Optional[String]),
    },
)
def calc_record(_, param0, param1):
    pass


# Works well
@job
def process_record_job_0():
    param0, param1 = get_record()
    calc_record(param0, param1)


# Doesn't work and fails with
# dagster.core.errors.DagsterInvalidDefinitionError: In @job process_record_job_1, received a tuple of multiple outputs for input "param0" (at position 0) in op invocation calc_record. Must pass individual output, available from tuple: ('param0', 'param1')
@job
def process_record_job_1():
    calc_record(get_record())


# Works well
@graph
def process_record_graph_0():
    param0, param1 = get_record()
    calc_record(param0, param1)

process_record_job_2 = process_record_graph_0.to_job(name="process_record_job_2")


# Doesn't work and fails with
# dagster.core.errors.DagsterInvalidDefinitionError: In @graph process_record_graph_1, received a tuple of multiple outputs for input "param0" (at position 0) in op invocation calc_record. Must pass individual output, available from tuple: ('param0', 'param1')
@graph
def process_record_graph_1():
    calc_record(get_record())

process_record_job_3 = process_record_graph_1.to_job(name="process_record_job_3")
c

claire

11/24/2021, 4:54 PM
Thanks @dinya for the response. I think "recommend" was too strong of a word, we support both returning from a tuple as well as yielding multiple outputs:
@op(out={"out1": Out(str), "out2": Out(int)})
def my_op():
    return "foo", 1
Though you will still have to unpack the tuple by assigning to local variables. In the dagster framework, ops do behave as generator-like objects and can yield multiple values outside of outputs (e.g. events). Either way is ok and we'll continue to support them both.
👍 1
d

dinya

11/25/2021, 8:31 AM
@claire thanks.
Though you will still have to unpack the tuple by assigning to local variables.
I note that unpacking in graph/job like
op2(*op1())
works well too for chained ops calls.