Dmitry Mogilevsky
07/13/2021, 9:14 AM.map()
function isn't called, and neither do any of the subsequent solids in the graph. The graph works properly when not running on dask executor@solid
def hello_world(numbers):
return "Hello, World! " + str(numbers)
@solid
def to_upper(solid_str):
return solid_str.upper()
@solid
def to_file(string):
with open("test.txt", "w") as f:
f.write(string)
f.close()
@solid
def rand_number(mult):
num = 0
while num < 990:
num = random.random() * 1000
return num * mult
@solid(output_defs=[DynamicOutputDefinition(dagster_type=int, io_manager_key="io_manager")])
def fan_out_numbers():
for i in range(5):
yield DynamicOutput(value=i, mapping_key=str(i))
@pipeline(mode_defs=[ModeDefinition(resource_defs={"io_manager": fs_io_manager}, executor_defs=default_executors + [dask_executor],)])
def dask_pipeline():
all_numbers = fan_out_numbers().map(rand_number)
to_file(to_upper(hello_world(all_numbers.collect())))
execution:
dask:
config:
cluster:
existing:
address: "localhost:6766"
alex
07/13/2021, 4:33 PMDagster Bot
07/13/2021, 4:34 PM