Yehuda Ornstein
02/06/2023, 8:31 PMfrom dagster import job, DynamicOutput, DynamicOut, op, Definitions
import time
@op(out=DynamicOut())
def get_dynamic_numbers():
for i in range(5):
yield DynamicOutput(value=i, mapping_key=str(i))
@op
def get_number_result(number):
time.sleep(2)
return number * 2
@op
def get_number_result_from_result(number):
time.sleep(2)
return number * 2
@job()
def dynamic_job():
def _for_each_number(number):
number_result = get_number_result(number)
number_result_from_result = get_number_result_from_result(number_result)
return number_result_from_result
results = get_dynamic_numbers().map(_for_each_number)
results.collect()
defs = Definitions(jobs=[dynamic_job])
However, if i try adding another dynamic output in the chaining steps like this:
from dagster import job, DynamicOutput, DynamicOut, op, Definitions
import time
@op(out=DynamicOut())
def get_dynamic_numbers():
for i in range(5):
yield DynamicOutput(value=i, mapping_key=str(i))
@op
def get_number_result(number):
time.sleep(2)
return number * 2
@op(out=DynamicOut())
def get_dynamic_numbers_for_result(number: int):
for i in range(number):
yield DynamicOutput(value=i, mapping_key=str(i))
@job()
def dynamic_job():
def _for_each_number(number):
number_result = get_number_result(number)
dynamics = get_dynamic_numbers_for_result(number_result)
results = dynamics.map(get_number_result)
results.collect()
get_dynamic_numbers().map(_for_each_number)
defs = Definitions(jobs=[dynamic_job])
I get the following error:
dagster._core.errors.DagsterInvalidDefinitionError: op 'get_number_result_2' cannot be downstream of more than one dynamic output. It is downstream of both "get_dynamic_numbers:result" and "get_dynamic_numbers_for_result:result"
Any help is appreciated. Thanks in advance!sean
02/07/2023, 10:26 PM