Clement Emmanuel
03/05/2023, 5:39 PMdagster._core.errors.DagsterInvariantViolationError: In job 'in_process_materialization_job' op 'outer_collect', input collect must get a value either (a) from a dependency or (b) from the inputs section of its configuration.
Here's the example:
from dagster import op, DynamicOut, DynamicOutput, graph, AssetsDefinition, materialize, DailyPartitionsDefinition, In
@op
def collector(collect):
return collect
@op(out=DynamicOut())
def outer_dynamic():
for x in range(5):
yield DynamicOutput(x, mapping_key=str(x))
@op(out=DynamicOut())
def inner_dynamic(inp):
for x in range(5):
yield DynamicOutput(x, mapping_key=str(x))
@graph
def inner_graph(inp):
return collector.alias("inner_collect")(
inner_dynamic(inp).collect() #.map(collector.alias("map_collect")).collect()
)
@graph
def outer_graph():
return collector.alias("outer_collect")(
outer_dynamic().map(inner_graph).collect()
)
ExAsset = AssetsDefinition.from_graph(
outer_graph,
partitions_def=DailyPartitionsDefinition(start_date="2023-01-01"),
)
if __name__ == '__main__':
materialize([ExAsset], partition_key="2023-01-02")
I'm not sure why the outer_collect
is unsure where its input is coming from, it should be the result of the .collect()
on the passed in DynamicFanIn, this pattern works in general for graph returns when not nesting like thischris
03/06/2023, 8:31 PM