Hi, I have a working pipeline that uses DynamicOut...
# ask-community
o
Hi, I have a working pipeline that uses DynamicOutput to run queries in parallel:
Copy code
@job
def MFCBenchMarkPipeline():
	queries_list = benchmarks()
    conf_dicts = generate_subtasks(queries_list).map(MFCPipeline).collect()
    MFCStats4DB(conf_dicts)
This code used to work well. "MFCPipeline" op used to run for each query in "queries_list" and the results were collected and passed to "MFCStats4DB". The only change I made, is turning "MFCPipeline" to "graph" from "op". When I try to run it from Dagit, the code gets loaded successfully, but when I try to launch a run I get the following error:
Copy code
dagster._core.errors.DagsterInvariantViolationError: In job 'MFCBenchMarkPipeline' op 'MFCStats4DB', input conf_dicts must get a value either (a) from a dependency or (b) from the inputs section of its configuration.
What am I missing here? Will appreciate any help. Thanks.
🤖 1
s
@alex is dynamic mapping meant to work with graphs?
a
yea it should work.
The only change I made, is turning “MFCPipeline” to “graph” from “op”
so
@op
has an implicit
Out
but
@graph
does not, so the missing piece could be adding the output mapping on the
MFCPipeline
@graph
o
Thanks for the quick response! Added
out=GraphOut()
which fixed it. But now I'm having another problem:
Copy code
AttributeError: 'tuple' object has no attribute 'collect'
Which is weird, because
DynamicOutput.map()
should return an object and not the return value of graph/op it was mapped to. Is
collect()
meant to be used with graphs?
a
do you have multiple outputs from the graph? I just tried this and it works:
Copy code
def test_graph_map():
    @op(out=DynamicOut())
    def dyn_vals():
        for i in range(3):
            yield DynamicOutput(i, mapping_key=f"num_{i}")

    @op
    def echo(x):
        return x

    @graph
    def wrapped(y):
        return echo(y)

    @job
    def with_graph():
        echo(dyn_vals().map(wrapped).collect())

    assert with_graph.execute_in_process().success
which also shows that i was wrong about the implicit out on @graph