Tanguy Le Floch
08/29/2022, 9:07 AMfrom dagster import DynamicOut, DynamicOutput, OpExecutionContext, graph, op
@op
def op_taking_dynamic_args(context: OpExecutionContext, arg1: int) -> None:
<http://context.log.info|context.log.info>(arg1)
@op(out=DynamicOut())
def op_triggering_multiple_ops(context: OpExecutionContext):
for i, c in enumerate(context.op_config):
yield DynamicOutput(c, mapping_key=str(i))
@graph
def my_graph():
op_triggering_multiple_ops().map(op_taking_dynamic_args)
my_job = my_graph.to_job(
config={
"ops": {"op_triggering_multiple_ops": {"config": [1, 2, 3]}},
}
)
my_job.execute_in_process()
Is there a way I could achieve something similar if op_taking_dynamic_args
has more than one argument, or do I have to absolutely pass a dict
(or another data type) with all the inputs inside?
If there is a more dagster-ish way to achieve this we'd also be very interested! @Antoine Valettejamie
08/29/2022, 4:15 PMop_taking_dynamic_args
you can use a lambda function
https://docs.dagster.io/concepts/ops-jobs-graphs/dynamic-graphs#additional-argumentsTanguy Le Floch
08/29/2022, 4:21 PMjamie
08/29/2022, 4:30 PMTanguy Le Floch
08/29/2022, 4:55 PMop_triggering_multiple_ops
generates several things, but they are meant to be passed together to op_taking_dynamic_args
, it's a 1<->1 relationship between outputs of the first to the inputs of the second.
However, I was wondering if there was a way for the signature of op_taking_dynamic_args
to stay nice with several parameters. For instance it could be:
@op
def op_taking_dynamic_args(context: OpExecutionContext, arg1: int, arg2: int) -> None:
<http://context.log.info|context.log.info>(arg1)
<http://context.log.info|context.log.info>(arg2)
@op(out=DynamicOut())
def op_triggering_multiple_ops(context: OpExecutionContext):
for i, c in enumerate(context.op_config):
yield DynamicOutput(
{"arg1": c, "arg2": c + 1}, mapping_key=str(i), unpack=True
)
where the new thing (hypothetical api addition) is that unpack
parameter. Currently without it I need to change op_taking_dynamic_args
to:
@op
def op_taking_dynamic_args(context: OpExecutionContext, params: Dict) -> None:
<http://context.log.info|context.log.info>(params["arg1"])
<http://context.log.info|context.log.info>(params["arg2"])
For that dynamic use case, while that op may be used in other places by itself. It is not as nice in terms of typing and usage in dagster.jamie
08/29/2022, 4:58 PM@op
def op_taking_dynamic_args(context: OpExecutionContext, arg1: int, arg2: int) -> None:
<http://context.log.info|context.log.info>(arg1)
<http://context.log.info|context.log.info>(arg2)
@op(out=DynamicOut())
def op_triggering_multiple_ops(context: OpExecutionContext):
for i, c in enumerate(context.op_config):
yield DynamicOutput(
{"arg1": c, "arg2": c + 1}, mapping_key=str(i), unpack=True
)
@graph
def my_graph():
op_triggering_multiple_ops().map(lambda val: op_taking_dynamic_args(**val))
Tanguy Le Floch
08/29/2022, 5:16 PM__main__.op_taking_dynamic_args() argument after ** must be a mapping, not InvokedSolidOutputHandle
jamie
08/29/2022, 11:21 PMJB
09/15/2022, 4:18 AMalex
09/15/2022, 2:05 PMTuple[...]
for the bundle of values
from typing import Tuple
from dagster import DynamicOut, DynamicOutput, OpExecutionContext, graph, op
@op
def op_taking_dynamic_args(context: OpExecutionContext, vals: Tuple[str, int, float]) -> None:
str_arg, int_arg, float_arg = vals
<http://context.log.info|context.log.info>(str_arg)
<http://context.log.info|context.log.info>(int_arg)
<http://context.log.info|context.log.info>(float_arg)
@op(out=DynamicOut(Tuple[str, int, float]))
def op_triggering_multiple_ops(context: OpExecutionContext):
for i, c in enumerate(context.op_config):
yield DynamicOutput(
(str(c), int(c), float(c)),
mapping_key=str(i),
)
@graph
def my_graph():
op_triggering_multiple_ops().map(op_taking_dynamic_args)
my_job = my_graph.to_job(
config={
"ops": {"op_triggering_multiple_ops": {"config": [1, 2, 3]}},
}
)
my_job.execute_in_process()