Sa'ar Elias
12/09/2021, 8:29 PM@graph
def resolve_input(inp):
sa, sb = match_to_resolver(inp) # out={'ra': Out(is_required=False), 'rb': Out(is_required=False)}
sum_all_results(resolver_a(sa), resolver_b(sb))
Sa'ar Elias
12/09/2021, 8:43 PMsum_all_results
is always skipped, I’m assuming because one of the arguments is always missing. I also tried using a list:
sum_all_results([resolver_a(sa), resolver_b(sb)])
But I get an exception:
dagster.check.CheckError: Failure condition: Unexpected dynamic output dependency in regular fan in, should have been caught at definition time.
alex
12/09/2021, 8:52 PMSa'ar Elias
12/09/2021, 8:59 PMSa'ar Elias
12/09/2021, 9:00 PMalex
12/09/2021, 9:02 PMalex
12/09/2021, 9:03 PMresolver_X
has dynamic outputs, something like
sum_all_results([partial_sum(resolver_a(sa).collect()), partial_sum(resolver_b(sb).collect())])
Sa'ar Elias
12/09/2021, 9:08 PMresolve_X
is a graph, not an opSa'ar Elias
12/09/2021, 9:09 PMSa'ar Elias
12/09/2021, 9:12 PMfrom dagster import op, job, graph, DynamicOut, DynamicOutput, Output, Out
@op
def resolver_a(num):
return num * 5
@op
def resolver_b(num):
return num
@op(out={'ra': Out(is_required=False), 'rb': Out(is_required=False)})
def match_to_resolver(inp):
yield Output(inp, 'ra' if inp == 1 else 'rb')
@op
def merge_resolvers(r):
return sum(r)
@graph
def resolve_input(inp):
sa, sb = match_to_resolver(inp)
return merge_resolvers([resolver_a(sa), resolver_b(sb)])
@op(out=DynamicOut(int))
def get_inputs():
for i in [1, 2]:
yield DynamicOutput(
value=i,
mapping_key=f'inp_{i}'
)
@job
def my_job():
get_inputs().map(resolve_input)
alex
12/09/2021, 9:28 PMget_inputs
dynamic output correctlyalex
12/09/2021, 9:29 PMDagster Bot
12/09/2021, 9:29 PMSa'ar Elias
12/09/2021, 9:34 PMalex
12/09/2021, 9:36 PMNone
or something equivalentalex
12/09/2021, 9:37 PMSkip
marker value if None
is meaningfulSa'ar Elias
12/09/2021, 9:39 PMalex
12/09/2021, 9:46 PMSa'ar Elias
12/09/2021, 10:03 PMMomchil Konstantinov
06/23/2022, 4:14 PMdagster.check.CheckError: Failure condition: Unexpected dynamic output dependency in regular fan in, should have been caught at definition time.