Hi! Is there a way to merge / run an op on multipl...
# ask-community
Hi! Is there a way to merge / run an op on multiple results of ops triggered by a conditional branch (and not the branching op’s result itself)? E.g.:
Copy code
def resolve_input(inp):
    sa, sb = match_to_resolver(inp) # out={'ra': Out(is_required=False), 'rb': Out(is_required=False)}
    sum_all_results(resolver_a(sa), resolver_b(sb))
plus1 1
The current situation is that
is always skipped, I’m assuming because one of the arguments is always missing. I also tried using a list:
Copy code
sum_all_results([resolver_a(sa), resolver_b(sb)])
But I get an exception:
dagster.check.CheckError: Failure condition: Unexpected dynamic output dependency in regular fan in, should have been caught at definition time.
what version are you on? this looks similar to an error i believe was fixed the using a list trick is the right way to do it - but you may need to introduce an intermediate collect op if there is a dynamic output involved
I’m on 0.13.10, which I believe is the newest?
Do you mean the collect() method as in the one used in conjunction with map()?
yeah - its not the most elegant but i believe its the work around for your issue
has dynamic outputs, something like
sum_all_results([partial_sum(resolver_a(sa).collect()), partial_sum(resolver_b(sb).collect())])
I don’t have dynamic output in this case, but maybe worth mentioning that
is a graph, not an op
Basically I am trying to run predefined graphs dynamically (every “row” is mapped using a condition branch to its appropriate graph) and have all of their results processed together in the end
Here’s a more complete example of what I’m trying to do:
Copy code
from dagster import op, job, graph, DynamicOut, DynamicOutput, Output, Out

def resolver_a(num):
    return num * 5

def resolver_b(num):
    return num

@op(out={'ra': Out(is_required=False), 'rb': Out(is_required=False)})
def match_to_resolver(inp):
    yield Output(inp, 'ra' if inp == 1 else 'rb')

def merge_resolvers(r):
    return sum(r)

def resolve_input(inp):
    sa, sb = match_to_resolver(inp)
    return merge_resolvers([resolver_a(sa), resolver_b(sb)])

def get_inputs():
    for i in [1, 2]:
        yield DynamicOutput(

def my_job():
ah ok thanks for that - yea it looks like we do not handle that “regular fan in” downstream of the
dynamic output correctly
@Dagster Bot issue graph with fan-in downstream of dynamic output
Oh ok, bummer. Thanks for opening the bug report. I’ll stay tuned. Thanks for trying to help, much appreciated!
if you don’t have ops you are trying to have skipped - you could use an optional type and return
or something equivalent
or your own special
marker value if
is meaningful
Not quite sure what you mean here, where would it help if I return None in this example? Are you suggesting I should drop the conditional branching approach and process the input in every resolver?
ya was trying to think of workarounds - its not a good one for you since you are leveraging the skip / conditional behavior pretty fundamentally
Yeah I also don't see it play out without having to process the input in every op and determine if it's right for this op. I'm trying to think if maybe I could use materialization+sensor somehow to trigger the final step somehow (as a different job)
Hey all! Sorry, new guy here. I think I'm running into the same issue -- when the graph passed to .map has conditional branching I get
Copy code
dagster.check.CheckError: Failure condition: Unexpected dynamic output dependency in regular fan in, should have been caught at definition time.