https://dagster.io/ logo
Title
c

Clement Emmanuel

03/05/2023, 5:39 PM
Is there some issue with trying to nest graphs that both utilize dynamic graphs? I have a simple example that recreates this and I get the following error
dagster._core.errors.DagsterInvariantViolationError: In job 'in_process_materialization_job' op 'outer_collect', input collect must get a value either (a) from a dependency or (b) from the inputs section of its configuration.
Here's the example:
from dagster import op, DynamicOut, DynamicOutput, graph, AssetsDefinition, materialize, DailyPartitionsDefinition, In


@op
def collector(collect):
    return collect


@op(out=DynamicOut())
def outer_dynamic():
    for x in range(5):
        yield DynamicOutput(x, mapping_key=str(x))


@op(out=DynamicOut())
def inner_dynamic(inp):
    for x in range(5):
        yield DynamicOutput(x, mapping_key=str(x))


@graph
def inner_graph(inp):
    return collector.alias("inner_collect")(
        inner_dynamic(inp).collect()  #.map(collector.alias("map_collect")).collect()
    )


@graph
def outer_graph():
    return collector.alias("outer_collect")(
        outer_dynamic().map(inner_graph).collect()
    )


ExAsset = AssetsDefinition.from_graph(
    outer_graph,
    partitions_def=DailyPartitionsDefinition(start_date="2023-01-01"),
)


if __name__ == '__main__':
    materialize([ExAsset], partition_key="2023-01-02")
I'm not sure why the
outer_collect
is unsure where its input is coming from, it should be the result of the
.collect()
on the passed in DynamicFanIn, this pattern works in general for graph returns when not nesting like this
c

chris

03/06/2023, 8:31 PM
The error message you’re getting is misleading and rough unfortunately, but I think what you’re trying to do here is unsupported at the moment: https://github.com/dagster-io/dagster/issues/4364 since you’re going to have multiple layers of dynamiciscm (one from outer dynamic, one from inner)