Katrin Grunert
07/20/2022, 11:28 AM0.14.20
and am using the dagster_k8s.k8s_job_executor
to spawn each step of a dagster job in an own pod.
I am retrieving X amount of DynamicOutputs
from an upstream op, but fail to pass the DynamicOutput
argument into a downstream op (the downstream ops are being executed in parallel and an each op should be spawned in an own pod)
I see that the upstream op outputs its results through the IO manager (fs_io_manager) at:
/opt/dagster/dagster_home/storage/aff79dd6-af8e-4f24-9fa4-2ca59637b95a/upstream_op/result/plant_0
/opt/dagster/dagster_home/storage/aff79dd6-af8e-4f24-9fa4-2ca59637b95a/upstream_op/result/plant_1
…
But then when i inspect the logs of each dagster-step pod (one step per DynamicOutput
), it fails to find the respective result:
2022-07-20 08:37:04 +0000 - dagster - ERROR - Parallelization_Test - aff79dd6-af8e-4f24-9fa4-2ca59637b95a - 1 - op_parallel[plant_0] - STEP_FAILURE - Execution of step "op_parallel[plant_0]" failed.
dagster.core.errors.DagsterExecutionLoadInputError: Error occurred while loading input "settings" of step "op_parallel[plant_0]"::
FileNotFoundError: [Errno 2] No such file or directory: '/opt/dagster/dagster_home/storage/aff79dd6-af8e-4f24-9fa4-2ca59637b95a/upstream_op/result/plant_0'
When using the default (dagster only, non-k8s) executor it works fine. And also the fs_io_manager result path looks the same, with the same run-id and everything.
My ops and jobs are defined as followed (minimum working example):
@op(
out=DynamicOut(str),
)
def upstream_op():
names = [f"plant_{i}" for i in range(0, 5)] # scaled down, can go up to 100-200
for name in names:
settings = name
yield DynamicOutput(settings, mapping_key=name)
@op(
ins={
'settings': In(str)
}
)
def op_parallel(settings: str):
print(settings)
pass
@job(
name='Parallelization_Test',
config=config_from_files([file_relative_path(__file__, os.path.join("..", "run_config", "parallel.yaml"))]), # contains the config of the job_executor (namespace and serviceaccount)
executor_def=dagster_k8s.k8s_job_executor,
)
def job_parallel():
a = upstream_op()
a.map(op_parallel)
but when executing, i get the error of FileNotFoundError
, I have tried to use the mem_io_manager
, but there i get a KeyError
daniel
07/20/2022, 2:00 PMKatrin Grunert
07/20/2022, 2:45 PM