Hi! Is there a way to merge / run an op on multipl...
# ask-community
s
Hi! Is there a way to merge / run an op on multiple results of ops triggered by a conditional branch (and not the branching op’s result itself)? E.g.:
Copy code
@graph
def resolve_input(inp):
    sa, sb = match_to_resolver(inp) # out={'ra': Out(is_required=False), 'rb': Out(is_required=False)}
    sum_all_results(resolver_a(sa), resolver_b(sb))
plus1 1
The current situation is that
sum_all_results
is always skipped, I’m assuming because one of the arguments is always missing. I also tried using a list:
Copy code
sum_all_results([resolver_a(sa), resolver_b(sb)])
But I get an exception:
dagster.check.CheckError: Failure condition: Unexpected dynamic output dependency in regular fan in, should have been caught at definition time.
a
what version are you on? this looks similar to an error i believe was fixed the using a list trick is the right way to do it - but you may need to introduce an intermediate collect op if there is a dynamic output involved
s
I’m on 0.13.10, which I believe is the newest?
Do you mean the collect() method as in the one used in conjunction with map()?
a
yeah - its not the most elegant but i believe its the work around for your issue
assuming
resolver_X
has dynamic outputs, something like
sum_all_results([partial_sum(resolver_a(sa).collect()), partial_sum(resolver_b(sb).collect())])
s
I don’t have dynamic output in this case, but maybe worth mentioning that
resolve_X
is a graph, not an op
Basically I am trying to run predefined graphs dynamically (every “row” is mapped using a condition branch to its appropriate graph) and have all of their results processed together in the end
Here’s a more complete example of what I’m trying to do:
Copy code
from dagster import op, job, graph, DynamicOut, DynamicOutput, Output, Out

@op
def resolver_a(num):
    return num * 5

@op
def resolver_b(num):
    return num

@op(out={'ra': Out(is_required=False), 'rb': Out(is_required=False)})
def match_to_resolver(inp):
    yield Output(inp, 'ra' if inp == 1 else 'rb')

@op
def merge_resolvers(r):
    return sum(r)

@graph
def resolve_input(inp):
    sa, sb = match_to_resolver(inp)
    return merge_resolvers([resolver_a(sa), resolver_b(sb)])


@op(out=DynamicOut(int))
def get_inputs():
    for i in [1, 2]:
        yield DynamicOutput(
            value=i,
            mapping_key=f'inp_{i}'
        )

@job
def my_job():
    get_inputs().map(resolve_input)
a
ah ok thanks for that - yea it looks like we do not handle that “regular fan in” downstream of the
get_inputs
dynamic output correctly
@Dagster Bot issue graph with fan-in downstream of dynamic output
d
s
Oh ok, bummer. Thanks for opening the bug report. I’ll stay tuned. Thanks for trying to help, much appreciated!
a
if you don’t have ops you are trying to have skipped - you could use an optional type and return
None
or something equivalent
or your own special
Skip
marker value if
None
is meaningful
s
Not quite sure what you mean here, where would it help if I return None in this example? Are you suggesting I should drop the conditional branching approach and process the input in every resolver?
a
ya was trying to think of workarounds - its not a good one for you since you are leveraging the skip / conditional behavior pretty fundamentally
s
Yeah I also don't see it play out without having to process the input in every op and determine if it's right for this op. I'm trying to think if maybe I could use materialization+sensor somehow to trigger the final step somehow (as a different job)
m
Hey all! Sorry, new guy here. I think I'm running into the same issue -- when the graph passed to .map has conditional branching I get
Copy code
dagster.check.CheckError: Failure condition: Unexpected dynamic output dependency in regular fan in, should have been caught at definition time.