https://dagster.io/ logo
#ask-community
Title
# ask-community
t

Tanguy Le Floch

08/29/2022, 9:07 AM
Hi all, I've got a question on dynamic outputs. Not a critical one, but it could improve the signature of most of our ops. I'm wondering if there is a way to achieve passing multiple separated arguments to an op with a dynamic input. As an example, this works fine:
Copy code
from dagster import DynamicOut, DynamicOutput, OpExecutionContext, graph, op


@op
def op_taking_dynamic_args(context: OpExecutionContext, arg1: int) -> None:
    <http://context.log.info|context.log.info>(arg1)

@op(out=DynamicOut())
def op_triggering_multiple_ops(context: OpExecutionContext):
    for i, c in enumerate(context.op_config):
        yield DynamicOutput(c, mapping_key=str(i))

@graph
def my_graph():
   op_triggering_multiple_ops().map(op_taking_dynamic_args)

my_job = my_graph.to_job(
    config={
        "ops": {"op_triggering_multiple_ops": {"config": [1, 2, 3]}},
    }
)

my_job.execute_in_process()
Is there a way I could achieve something similar if
op_taking_dynamic_args
has more than one argument, or do I have to absolutely pass a
dict
(or another data type) with all the inputs inside? If there is a more dagster-ish way to achieve this we'd also be very interested! @Antoine Valette
j

jamie

08/29/2022, 4:15 PM
hi @Tanguy Le Floch if you're trying to pass the output from another (non-dynamic) op as input to
op_taking_dynamic_args
you can use a lambda function https://docs.dagster.io/concepts/ops-jobs-graphs/dynamic-graphs#additional-arguments
t

Tanguy Le Floch

08/29/2022, 4:21 PM
Thanks @jamie. I had seen that, unfortunately my other outputs are also dynamic.
j

jamie

08/29/2022, 4:30 PM
ah i see. at this time we don't support being downstream of multiple dynamic outs. we have an issue for it here https://github.com/dagster-io/dagster/issues/4364
t

Tanguy Le Floch

08/29/2022, 4:55 PM
Sorry, maybe I wasn't clear but if I understood well what I mean is not exactly the same. My case is that each iteration of
op_triggering_multiple_ops
generates several things, but they are meant to be passed together to
op_taking_dynamic_args
, it's a 1<->1 relationship between outputs of the first to the inputs of the second. However, I was wondering if there was a way for the signature of
op_taking_dynamic_args
to stay nice with several parameters. For instance it could be:
Copy code
@op
def op_taking_dynamic_args(context: OpExecutionContext, arg1: int, arg2: int) -> None:
    <http://context.log.info|context.log.info>(arg1)
    <http://context.log.info|context.log.info>(arg2)

@op(out=DynamicOut())
def op_triggering_multiple_ops(context: OpExecutionContext):
    for i, c in enumerate(context.op_config):
        yield DynamicOutput(
           {"arg1": c, "arg2": c + 1}, mapping_key=str(i), unpack=True
         )
where the new thing (hypothetical api addition) is that
unpack
parameter. Currently without it I need to change
op_taking_dynamic_args
to:
Copy code
@op
def op_taking_dynamic_args(context: OpExecutionContext, params: Dict) -> None:
    <http://context.log.info|context.log.info>(params["arg1"])
    <http://context.log.info|context.log.info>(params["arg2"])
For that dynamic use case, while that op may be used in other places by itself. It is not as nice in terms of typing and usage in dagster.
👍 1
j

jamie

08/29/2022, 4:58 PM
oh ok yeah that is different. i misunderstood. i'm not sure if this would work but my initial idea is to do something like
Copy code
@op
def op_taking_dynamic_args(context: OpExecutionContext, arg1: int, arg2: int) -> None:
    <http://context.log.info|context.log.info>(arg1)
    <http://context.log.info|context.log.info>(arg2)

@op(out=DynamicOut())
def op_triggering_multiple_ops(context: OpExecutionContext):
    for i, c in enumerate(context.op_config):
        yield DynamicOutput(
           {"arg1": c, "arg2": c + 1}, mapping_key=str(i), unpack=True
         )

@graph
def my_graph():
   op_triggering_multiple_ops().map(lambda val: op_taking_dynamic_args(**val))
t

Tanguy Le Floch

08/29/2022, 5:16 PM
I did try that too before, should've mentioned. it is currently not accepted by dagster:
__main__.op_taking_dynamic_args() argument after ** must be a mapping, not InvokedSolidOutputHandle
j

jamie

08/29/2022, 11:21 PM
@alex do you know if something like this is possible with the way Dynamic Outputs work? if so i'll create an issue to track
j

JB

09/15/2022, 4:18 AM
Bump. I'm new to dagster and was also wondering about this. Is this considered an anti pattern or is this a legit thing to want to do in the context of dagster?
a

alex

09/15/2022, 2:05 PM
ya this is legit to want to do but a bit of an ergonomic stumble from the current limitations of dynamic graphs. I think the best pattern to do currently is to use
Tuple[...]
for the bundle of values
Copy code
from typing import Tuple

from dagster import DynamicOut, DynamicOutput, OpExecutionContext, graph, op


@op
def op_taking_dynamic_args(context: OpExecutionContext, vals: Tuple[str, int, float]) -> None:
    str_arg, int_arg, float_arg = vals
    <http://context.log.info|context.log.info>(str_arg)
    <http://context.log.info|context.log.info>(int_arg)
    <http://context.log.info|context.log.info>(float_arg)


@op(out=DynamicOut(Tuple[str, int, float]))
def op_triggering_multiple_ops(context: OpExecutionContext):
    for i, c in enumerate(context.op_config):
        yield DynamicOutput(
            (str(c), int(c), float(c)),
            mapping_key=str(i),
        )


@graph
def my_graph():
    op_triggering_multiple_ops().map(op_taking_dynamic_args)


my_job = my_graph.to_job(
    config={
        "ops": {"op_triggering_multiple_ops": {"config": [1, 2, 3]}},
    }
)

my_job.execute_in_process()
❤️ 1