https://dagster.io/ logo
#deployment-kubernetes
Title
# deployment-kubernetes
y

Yassine Marzougui

09/05/2021, 1:59 PM
Hi, I'm trying to run a simple pipeline with
k8s_job_executor
. This is the pipeline code:
Copy code
from dagster import solid, ModeDefinition, pipeline, default_executors, fs_io_manager
from dagster_k8s import k8s_job_executor

@solid(config_schema={"name": str})
def hello(_context):
    return f"Hello, {_context.solid_config['name']}!"

@solid
def bye(_context, hello_str):
    return f"Goodbye {hello_str.split(' ')[1]}"

MODE_PROD = ModeDefinition(name="prod", resource_defs={"io_manager": fs_io_manager}, executor_defs=default_executors + [k8s_job_executor])
MODE_TEST = ModeDefinition(name="test", resource_defs={})


@pipeline(mode_defs=[MODE_PROD, MODE_TEST])
def my_pipeline():
    hello_str = hello()
    bye(hello_str)
And this is the run config:
Copy code
solids:
  hello:
    config:
      name: "Dagster"
execution:
  k8s:
    config:
      job_namespace: "..."
The first solid (
hello
) succeeds, but the second one (
bye
) fails with the following error:
Copy code
Traceback (most recent call last):
  File "/home/appuser/.local/bin/dagster", line 8, in <module>
    sys.exit(main())
  File "/home/appuser/.local/lib/python3.9/site-packages/dagster/cli/__init__.py", line 48, in main
    cli(auto_envvar_prefix=ENV_PREFIX)  # pylint:disable=E1123
  File "/home/appuser/.local/lib/python3.9/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/home/appuser/.local/lib/python3.9/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/home/appuser/.local/lib/python3.9/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/home/appuser/.local/lib/python3.9/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/home/appuser/.local/lib/python3.9/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/home/appuser/.local/lib/python3.9/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/home/appuser/.local/lib/python3.9/site-packages/dagster/cli/api.py", line 194, in execute_step_command
    execution_plan = create_execution_plan(
  File "/home/appuser/.local/lib/python3.9/site-packages/dagster/core/execution/api.py", line 718, in create_execution_plan
    resolved_run_config = ResolvedRunConfig.build(pipeline_def, run_config, mode=mode)
  File "/home/appuser/.local/lib/python3.9/site-packages/dagster/core/system_config/objects.py", line 179, in build
    raise DagsterInvalidConfigError(
dagster.core.errors.DagsterInvalidConfigError: Error in config for pipeline my_pipeline
    Error 1: Received unexpected config entry "k8s" at path root:execution. Expected: "{ in_process?: { config?: { marker_to_close?: String retries?: { disabled?: { } enabled?: { } } } } multiprocess?: { config?: { max_concurrent?: Int retries?: { disabled?: { } enabled?: { } } } } }".
Any idea what's going on here?
There seems to be 3 failure modes, happening randomly: • #1
hello
step succeeds, and
bye
step fails. Dagit doesn't show the error, but rather:
Copy code
STEP_FAILURE       Discovered failed Kubernetes job dagster-job-0d289bfa21ffe28982593fbfc8b703a9 for step bye
PIPELINE_FAILURE   Execution of pipeline "my_pipeline" failed. Steps failed: ['bye'].
When inspecting the pod logs, we find the error log in the message above. • #2
hello
step fails with the same error above. • #3 The whole pipeline fails before the first step. Dagit shows the following error:
Copy code
An exception was thrown during execution that is likely a framework error, rather than an error in user code.
dagster.core.errors.DagsterInvalidConfigError: Error in config for pipeline my_pipeline
    Error 1: Received unexpected config entry "k8s" at path root:execution. Expected: "{ in_process?: { config?: { marker_to_close?: String retries?: { disabled?: { } enabled?: { } } } } multiprocess?: { config?: { max_concurrent?: Int retries?: { disabled?: { } enabled?: { } } } } }".
Stack Trace:
dagster.core.errors.DagsterInvalidConfigError: Error in config for pipeline my_pipeline
Error 1: Received unexpected config entry "k8s" at path root:execution. Expected: "{ in_process?: { config?: { marker_to_close?: String retries?: { disabled?: { } enabled?: { } } } } multiprocess?: { config?: { max_concurrent?: Int retries?: { disabled?: { } enabled?: { } } } } }".
  File "/home/appuser/.local/lib/python3.9/site-packages/dagster/grpc/impl.py", line 86, in core_execute_run
    yield from execute_run_iterator(recon_pipeline, pipeline_run, instance)
  File "/home/appuser/.local/lib/python3.9/site-packages/dagster/core/execution/api.py", line 822, in __iter__
    yield from self.execution_context_manager.prepare_context()
  File "/home/appuser/.local/lib/python3.9/site-packages/dagster/utils/__init__.py", line 447, in generate_setup_events
    obj = next(self.generator)
  File "/home/appuser/.local/lib/python3.9/site-packages/dagster/core/execution/context_creation_pipeline.py", line 354, in orchestration_context_event_generator
    context_creation_data = create_context_creation_data(
  File "/home/appuser/.local/lib/python3.9/site-packages/dagster/core/execution/context_creation_pipeline.py", line 147, in create_context_creation_data
    resolved_run_config = ResolvedRunConfig.build(pipeline_def, run_config, mode=pipeline_run.mode)
  File "/home/appuser/.local/lib/python3.9/site-packages/dagster/core/system_config/objects.py", line 179, in build
    raise DagsterInvalidConfigError(
And sometimes, the execution of step
bye
starts successfully, but then fails with:
Copy code
dagster.core.errors.DagsterExecutionLoadInputError: Error occurred while loading input "hello_str" of step "bye":
The above exception was caused by the following exception:
FileNotFoundError: [Errno 2] No such file or directory: '/app/dagster/dagster_home/storage/d6af8072-eadf-4f86-a483-af1b3e3b6472/hello/result'
Which is normal and expected here because I'm using
fs_io_manager
with
kubernetes_job_executor
instead of
s3_pickle_io_manager
.
Found the culprit: the
pullPolicy
of the job executor was not set, so it defaulted to
IfNotPresent
, which caused the steps to run with an old image containing only the default executors. This caused #1 and #2. The same was happening with the run worker, which caused #3, but when trying to change the
pullPolicy
, it turned out I'm also impacted by this issue: https://dagster.slack.com/archives/C01U954MEER/p1630618928200600?thread_ts=1630177344.082400&amp;cid=C01U954MEER This also explains the random nature of the failures, because sometimes the pipeline was deployed on different nodes and the newer image was pulled.
4 Views