Yassine Marzougui
09/05/2021, 1:59 PMk8s_job_executor
.
This is the pipeline code:
from dagster import solid, ModeDefinition, pipeline, default_executors, fs_io_manager
from dagster_k8s import k8s_job_executor
@solid(config_schema={"name": str})
def hello(_context):
return f"Hello, {_context.solid_config['name']}!"
@solid
def bye(_context, hello_str):
return f"Goodbye {hello_str.split(' ')[1]}"
MODE_PROD = ModeDefinition(name="prod", resource_defs={"io_manager": fs_io_manager}, executor_defs=default_executors + [k8s_job_executor])
MODE_TEST = ModeDefinition(name="test", resource_defs={})
@pipeline(mode_defs=[MODE_PROD, MODE_TEST])
def my_pipeline():
hello_str = hello()
bye(hello_str)
And this is the run config:
solids:
hello:
config:
name: "Dagster"
execution:
k8s:
config:
job_namespace: "..."
The first solid (hello
) succeeds, but the second one (bye
) fails with the following error:
Traceback (most recent call last):
File "/home/appuser/.local/bin/dagster", line 8, in <module>
sys.exit(main())
File "/home/appuser/.local/lib/python3.9/site-packages/dagster/cli/__init__.py", line 48, in main
cli(auto_envvar_prefix=ENV_PREFIX) # pylint:disable=E1123
File "/home/appuser/.local/lib/python3.9/site-packages/click/core.py", line 829, in __call__
return self.main(*args, **kwargs)
File "/home/appuser/.local/lib/python3.9/site-packages/click/core.py", line 782, in main
rv = self.invoke(ctx)
File "/home/appuser/.local/lib/python3.9/site-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/home/appuser/.local/lib/python3.9/site-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/home/appuser/.local/lib/python3.9/site-packages/click/core.py", line 1066, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/home/appuser/.local/lib/python3.9/site-packages/click/core.py", line 610, in invoke
return callback(*args, **kwargs)
File "/home/appuser/.local/lib/python3.9/site-packages/dagster/cli/api.py", line 194, in execute_step_command
execution_plan = create_execution_plan(
File "/home/appuser/.local/lib/python3.9/site-packages/dagster/core/execution/api.py", line 718, in create_execution_plan
resolved_run_config = ResolvedRunConfig.build(pipeline_def, run_config, mode=mode)
File "/home/appuser/.local/lib/python3.9/site-packages/dagster/core/system_config/objects.py", line 179, in build
raise DagsterInvalidConfigError(
dagster.core.errors.DagsterInvalidConfigError: Error in config for pipeline my_pipeline
Error 1: Received unexpected config entry "k8s" at path root:execution. Expected: "{ in_process?: { config?: { marker_to_close?: String retries?: { disabled?: { } enabled?: { } } } } multiprocess?: { config?: { max_concurrent?: Int retries?: { disabled?: { } enabled?: { } } } } }".
Any idea what's going on here?hello
step succeeds, and bye
step fails. Dagit doesn't show the error, but rather:
STEP_FAILURE Discovered failed Kubernetes job dagster-job-0d289bfa21ffe28982593fbfc8b703a9 for step bye
PIPELINE_FAILURE Execution of pipeline "my_pipeline" failed. Steps failed: ['bye'].
When inspecting the pod logs, we find the error log in the message above.
• #2 hello
step fails with the same error above.
• #3 The whole pipeline fails before the first step. Dagit shows the following error:
An exception was thrown during execution that is likely a framework error, rather than an error in user code.
dagster.core.errors.DagsterInvalidConfigError: Error in config for pipeline my_pipeline
Error 1: Received unexpected config entry "k8s" at path root:execution. Expected: "{ in_process?: { config?: { marker_to_close?: String retries?: { disabled?: { } enabled?: { } } } } multiprocess?: { config?: { max_concurrent?: Int retries?: { disabled?: { } enabled?: { } } } } }".
Stack Trace:
dagster.core.errors.DagsterInvalidConfigError: Error in config for pipeline my_pipeline
Error 1: Received unexpected config entry "k8s" at path root:execution. Expected: "{ in_process?: { config?: { marker_to_close?: String retries?: { disabled?: { } enabled?: { } } } } multiprocess?: { config?: { max_concurrent?: Int retries?: { disabled?: { } enabled?: { } } } } }".
File "/home/appuser/.local/lib/python3.9/site-packages/dagster/grpc/impl.py", line 86, in core_execute_run
yield from execute_run_iterator(recon_pipeline, pipeline_run, instance)
File "/home/appuser/.local/lib/python3.9/site-packages/dagster/core/execution/api.py", line 822, in __iter__
yield from self.execution_context_manager.prepare_context()
File "/home/appuser/.local/lib/python3.9/site-packages/dagster/utils/__init__.py", line 447, in generate_setup_events
obj = next(self.generator)
File "/home/appuser/.local/lib/python3.9/site-packages/dagster/core/execution/context_creation_pipeline.py", line 354, in orchestration_context_event_generator
context_creation_data = create_context_creation_data(
File "/home/appuser/.local/lib/python3.9/site-packages/dagster/core/execution/context_creation_pipeline.py", line 147, in create_context_creation_data
resolved_run_config = ResolvedRunConfig.build(pipeline_def, run_config, mode=pipeline_run.mode)
File "/home/appuser/.local/lib/python3.9/site-packages/dagster/core/system_config/objects.py", line 179, in build
raise DagsterInvalidConfigError(
And sometimes, the execution of step bye
starts successfully, but then fails with:
dagster.core.errors.DagsterExecutionLoadInputError: Error occurred while loading input "hello_str" of step "bye":
The above exception was caused by the following exception:
FileNotFoundError: [Errno 2] No such file or directory: '/app/dagster/dagster_home/storage/d6af8072-eadf-4f86-a483-af1b3e3b6472/hello/result'
Which is normal and expected here because I'm using fs_io_manager
with kubernetes_job_executor
instead of s3_pickle_io_manager
.pullPolicy
of the job executor was not set, so it defaulted to IfNotPresent
, which caused the steps to run with an old image containing only the default executors. This caused #1 and #2.
The same was happening with the run worker, which caused #3, but when trying to change the pullPolicy
, it turned out I'm also impacted by this issue: https://dagster.slack.com/archives/C01U954MEER/p1630618928200600?thread_ts=1630177344.082400&cid=C01U954MEER
This also explains the random nature of the failures, because sometimes the pipeline was deployed on different nodes and the newer image was pulled.