https://dagster.io/ logo
a

Andrew Herbst

04/02/2021, 5:13 PM
Hi all! We are looking to build a pipeline that has two branches that in turn fanout using the new dynamic mapping & collect feature. We’d like to collect() on both branches and do some post-processing. However, when we attempt to collect on the two branches and feed those as input into a downstream solid we bump into this error:
collect index unexpectedly set twice
. So, we’ve introduced an intermediate “fan-in” solid on both branches that collects the results and then hands those off to the downstream post-processing solid. My question: is this a known issue and is the notion of introducing the intermediate fan-in solid the correct approach here?
a

alex

04/02/2021, 5:17 PM
sounds like a bug - can you share a snippet of what the problematic collecting looked like
a

Andrew Herbst

04/02/2021, 5:27 PM
Sure:
Copy code
@solid(
    output_defs=[DynamicOutputDefinition(int)]
)
def fan_out(_):
    for i in range(1, 10):
        yield DynamicOutput(value=i, mapping_key=i)


@solid
def post_process(_, fan_in1: list[int], fan_in2: list[int]):
    pass

@pipeline
def dynamic_example() -> None:
    fan_out1 = fan_out()
    fan_out2 = fan_out()

    post_process(fan_out1.collect(), fan_out2.collect())
ack 1
a

alex

04/02/2021, 6:08 PM
will get this fixed - I think the intermediate solid is the best workaround for now
a

Andrew Herbst

04/02/2021, 6:12 PM
ah great, thanks!