Mark Fickett
03/04/2022, 4:18 PM.collect()
calls in a order dependency start
argument? Like this:
@op(
ins={"start": In(Nothing)}
)
def wait_for_many():
pass
@gaph
def mostly_parallel():
parallel_done = []
parallel_done.append(has_dynamic_out.map(do_one_thing))
parallel_done.append(has_dynamic_out.map(do_another_thing))
parallel_done.append(has_dynamic_out.map(do_different_thing))
wait_for_many(start=parallel_done)
When I try to do it as written above, I get: DagsterInvalidDefinitionError: In @graph mostly_parallel, received a list containing an invalid type at index 0 for input "start" (passed by keyword) in op invocation wait_for_many. Lists can only contain the output from previous solid invocations or input mappings, received <class 'list'>
(actual op names changed to match my minimal example).Zach
03/04/2022, 4:53 PMMark Fickett
03/04/2022, 4:58 PMparallel_done.extend(has_dynamic_out.map(do_one_thing))
says Attempted to iterate over an InvokedSolidDynamicOutputWrapper. This object represents the dynamic output "result" from the op 'do_one_thing'. Use the "map" method on this object to create downstream dependencies that will be cloned for each DynamicOut that is resolved at runtime.
And if I make it .extend(...map().collect)
then I get the received list
error again.Zach
03/04/2022, 5:31 PMMark Fickett
03/04/2022, 5:52 PM