Hello! :wave: I am on dagster version `0.14.20` an...
# ask-community
k
Hello! 👋 I am on dagster version
0.14.20
and am using the
dagster_k8s.k8s_job_executor
to spawn each step of a dagster job in an own pod. I am retrieving X amount of
DynamicOutputs
from an upstream op, but fail to pass the
DynamicOutput
argument into a downstream op (the downstream ops are being executed in parallel and an each op should be spawned in an own pod) I see that the upstream op outputs its results through the IO manager (fs_io_manager) at:
Copy code
/opt/dagster/dagster_home/storage/aff79dd6-af8e-4f24-9fa4-2ca59637b95a/upstream_op/result/plant_0
/opt/dagster/dagster_home/storage/aff79dd6-af8e-4f24-9fa4-2ca59637b95a/upstream_op/result/plant_1
…
But then when i inspect the logs of each dagster-step pod (one step per
DynamicOutput
), it fails to find the respective result:
Copy code
2022-07-20 08:37:04 +0000 - dagster - ERROR - Parallelization_Test - aff79dd6-af8e-4f24-9fa4-2ca59637b95a - 1 - op_parallel[plant_0] - STEP_FAILURE - Execution of step "op_parallel[plant_0]" failed.
dagster.core.errors.DagsterExecutionLoadInputError: Error occurred while loading input "settings" of step "op_parallel[plant_0]"::
FileNotFoundError: [Errno 2] No such file or directory: '/opt/dagster/dagster_home/storage/aff79dd6-af8e-4f24-9fa4-2ca59637b95a/upstream_op/result/plant_0'
When using the default (dagster only, non-k8s) executor it works fine. And also the fs_io_manager result path looks the same, with the same run-id and everything. My ops and jobs are defined as followed (minimum working example):
Copy code
@op(
    out=DynamicOut(str),
)
def upstream_op():
    names = [f"plant_{i}" for i in range(0, 5)] # scaled down, can go up to 100-200
    for name in names:
        settings = name
        yield DynamicOutput(settings, mapping_key=name)


@op(
    ins={
        'settings': In(str)
    }
)
def op_parallel(settings: str):
    print(settings)
    pass

@job(
    name='Parallelization_Test',
    config=config_from_files([file_relative_path(__file__, os.path.join("..", "run_config", "parallel.yaml"))]), # contains the config of the job_executor (namespace and serviceaccount)
    executor_def=dagster_k8s.k8s_job_executor,
)
def job_parallel():
    a = upstream_op()
    a.map(op_parallel)
but when executing, i get the error of
FileNotFoundError
, I have tried to use the
mem_io_manager
, but there i get a
KeyError
d
Hi Katrin - every pod has its own filesystem, so if each op is running in its own pod, the fs_io_manager won't work for persisting outputs between runs (Unless you use a shared volume or something that all the ops use). The easiest way to get this working is probably to use one of the other io managers (like the s3_pickle_io_manager) that persists outputs outside of the filesystem
k
Awesome! This is really good to know, happy to hear that using a different IOManager is the way to go, thanks for the swift response ! daggy love
condagster 1