https://dagster.io/ logo
Title
d

Dmitry Mogilevsky

07/13/2021, 9:14 AM
Has anyone been successful in getting a fan out solid to work with dagster_dask? My experience has been that while the fan out gets executed successfully, the solid indicated by the
.map()
function isn't called, and neither do any of the subsequent solids in the graph. The graph works properly when not running on dask executor
Quick code example:
@solid
def hello_world(numbers):
    return "Hello, World! " + str(numbers)


@solid
def to_upper(solid_str):
    return solid_str.upper()


@solid
def to_file(string):
    with open("test.txt", "w") as f:
        f.write(string)
        f.close()


@solid
def rand_number(mult):
    num = 0
    while num < 990:
        num = random.random() * 1000
    return num * mult


@solid(output_defs=[DynamicOutputDefinition(dagster_type=int, io_manager_key="io_manager")])
def fan_out_numbers():
    for i in range(5):
        yield DynamicOutput(value=i, mapping_key=str(i))


@pipeline(mode_defs=[ModeDefinition(resource_defs={"io_manager": fs_io_manager}, executor_defs=default_executors + [dask_executor],)])
def dask_pipeline():
    all_numbers = fan_out_numbers().map(rand_number)
    to_file(to_upper(hello_world(all_numbers.collect())))
When using default executor:
when using the following config
execution:
  dask:
    config:
      cluster:
        existing:
          address: "localhost:6766"
a

alex

07/13/2021, 4:33 PM
Dynamic execution is not supported in Dask & Airflow and we should do better to error early here. Sorry for the thrash.
@Dagster Bot issue error early on dynamic map/collect in dask&airflow
d

Dagster Bot

07/13/2021, 4:34 PM