Daniel Sef
07/12/2022, 3:05 PMkubernetes.client.exceptions.ApiException: (403)
Reason: Forbidden
HTTP response headers: HTTPHeaderDict({'Audit-Id': '89394afc-8f31-479d-b626-afcbe7a6dae6', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Content-Type-Options': 'nosniff', 'X-Kubernetes-Pf-Flowschema-Uid': '9895fe88-9a7f-452d-a578-4ac38c897d2e', 'X-Kubernetes-Pf-Prioritylevel-Uid': '7718b4d7-f74e-42f3-9994-4bef4648ce4b', 'Date': 'Tue, 12 Jul 2022 12:43:43 GMT', 'Content-Length': '311'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"jobs.batch is forbidden: User \"system:serviceaccount:dagster:dagster\" cannot create resource \"jobs\" in API group \"batch\" in the namespace \"default\"","reason":"Forbidden","details":{"group":"batch","kind":"jobs"},"code":403}
File "/usr/local/lib/python3.7/site-packages/dagster/core/instance/__init__.py", line 1762, in launch_run
self._run_launcher.launch_run(LaunchRunContext(pipeline_run=run, workspace=workspace))
File "/usr/local/lib/python3.7/site-packages/dagster_celery_k8s/launcher.py", line 240, in launch_run
self._batch_api.create_namespaced_job(body=job, namespace=job_namespace)
File "/usr/local/lib/python3.7/site-packages/kubernetes/client/api/batch_v1_api.py", line 210, in create_namespaced_job
return self.create_namespaced_job_with_http_info(namespace, body, **kwargs) # noqa: E501
File "/usr/local/lib/python3.7/site-packages/kubernetes/client/api/batch_v1_api.py", line 323, in create_namespaced_job_with_http_info
collection_formats=collection_formats)
File "/usr/local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 353, in call_api
_preload_content, _request_timeout, _host)
File "/usr/local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 184, in __call_api
_request_timeout=_request_timeout)
File "/usr/local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 397, in request
body=body)
File "/usr/local/lib/python3.7/site-packages/kubernetes/client/rest.py", line 281, in POST
body=body)
File "/usr/local/lib/python3.7/site-packages/kubernetes/client/rest.py", line 234, in request
raise ApiException(http_resp=r)
My current config is the following:
@job(
resource_defs=df,
executor_def=multiprocess_executor,
config=_default_config_masseffect
)
Could you please help me how can I move forward? As far as I understand I should define the namespace for the executor. Thanks in advance!Daniel Gafni
07/12/2022, 3:14 PMUser \"system:serviceaccount:dagster:dagster\" cannot create resource \"jobs\" in API group \"batch\" in the namespace \"default\"","reason":"Forbidden","details":{"group":"batch","kind":"jobs"},"code":403}
Your service account doesn't have enough permissions.
Seems like you would like to set the job_namespace
variable to a non-default value too.Daniel Sef
07/12/2022, 3:25 PMexecutor_def=multiprocess_executor({"execution": {"config": {"job_namespace": "dagster"}}}),
If not, could you please send me a simple example?Daniel Gafni
07/12/2022, 3:40 PMdagster_k8s.k8s_job_executor
• dagster_celery_k8s.celery_k8s_job_executor
The multiprocessing executor definitely can't take it as input (it doesn't know anything about namespaces).
There is also a jobNamespace
field in the helm
values for the `k8sRunLauncher`:
config:
# This configuration will only be used if the K8sRunLauncher is selected
k8sRunLauncher:
# Change with caution! If you're using a fixed tag for pipeline run images, changing the
# image pull policy to anything other than "Always" will use a cached/stale image, which is
# almost certainly not what you want.
imagePullPolicy: "Always"
## The image to use for the launched Job's Dagster container.
## The `pullPolicy` field is ignored. Use`imagePullPolicy` instead.
# image:
# repository: ""
# tag: ""
# pullPolicy: Always
# The K8s namespace where new jobs will be launched.
# By default, the release namespace is used.
jobNamespace: ~
Maybe this will help.yuhan
07/13/2022, 6:59 PMfrom dagster import executor, job
from dagster.core.execution.retries import RetryMode
from dagster.core.executor.multiprocess import MultiprocessExecutor
from dagster_celery_k8s.config import celery_k8s_executor_config
@executor(config_schema=celery_k8s_executor_config())
def celery_k8s_compat_multiproc_executor(_context):
return MultiprocessExecutor(
retries=RetryMode.ENABLED,
max_concurrent=0, # default to cpu count, can add config to set if desired
)
@job(
config={"execution": {"config": {"job_namespace": "dagster"}}},
executor_def=celery_k8s_compat_multiproc_executor,
)
def test():
pass