Hi Dagster Team! :dagster-yay: I am currently try...
# ask-community
s
Hi Dagster Team! dagster yay I am currently trying to trigger a dagster pipeline using flask server endpoint — using reconstructable pipeline execution inside a k8 cluster using
K8RunLauncher
and
k8s_job_executor
to create pods for each solid/op I am trying to define the ops based on external trigger which is why I am using
execute_pipeline
and
build_reconstructable_pipeline
functions (v0.12.4) The pipeline is starting but I am getting this following error and have little idea what is causing this error?
Copy code
Server initialized for threading.
 * Serving Flask app "src.pipeline_server.flask_app" (lazy loading)
 * Environment: production
   WARNING: This is a development server. Do not use it in a production deployment.
   Use a production WSGI server instead.
 * Debug mode: off
 * Running on <http://0.0.0.0:5000/> (Press CTRL+C to quit)
/src/dagster/pre_processing/pre_processing_pipeline.py:107: ExperimentalWarning: "build_reconstructable_pipeline" is an experimental function. It may break in future versions, even between dot releases. To mute warnings for experimental functionality, invoke warnings.filterwarnings("ignore", category=dagster.ExperimentalWarning) or use one of the other methods described at <https://docs.python.org/3/library/warnings.html#describing-warning-filters>.
  foo_pipeline_kwargs
/usr/local/lib/python3.7/site-packages/dagster/core/execution/context_creation_pipeline.py:490: ExperimentalWarning: "k8s_job_executor" is an experimental function. It may break in future versions, even between dot releases. To mute warnings for experimental functionality, invoke warnings.filterwarnings("ignore", category=dagster.ExperimentalWarning) or use one of the other methods described at <https://docs.python.org/3/library/warnings.html#describing-warning-filters>.
  return context_creation_data.executor_def.executor_creation_fn(init_context)
/usr/local/lib/python3.7/site-packages/dagster_k8s/executor.py:116: ExperimentalWarning: "K8sStepHandler" is an experimental class. It may break in future versions, even between dot releases. To mute warnings for experimental functionality, invoke warnings.filterwarnings("ignore", category=dagster.ExperimentalWarning) or use one of the other methods described at <https://docs.python.org/3/library/warnings.html#describing-warning-filters>.
  kubeconfig_file=run_launcher.kubeconfig_file,
/usr/local/lib/python3.7/site-packages/dagster_k8s/executor.py:116: ExperimentalWarning: "StepDelegatingExecutor" is an experimental class. It may break in future versions, even between dot releases. To mute warnings for experimental functionality, invoke warnings.filterwarnings("ignore", category=dagster.ExperimentalWarning) or use one of the other methods described at <https://docs.python.org/3/library/warnings.html#describing-warning-filters>.
  kubeconfig_file=run_launcher.kubeconfig_file,
2022-04-08 14:10:22 - dagster - DEBUG - sleepy - b7898d89-1142-463b-9630-d53f898be74b - 1 - PIPELINE_START - Started execution of pipeline "sleepy".
2022-04-08 14:10:22 - dagster - DEBUG - sleepy - b7898d89-1142-463b-9630-d53f898be74b - 1 - ENGINE_EVENT - Starting execution with step handler K8sStepHandler
2022-04-08 14:10:22 - dagster - ERROR - sleepy - b7898d89-1142-463b-9630-d53f898be74b - 1 - PIPELINE_FAILURE - Execution of pipeline "sleepy" failed. An exception was thrown during execution.

kubernetes.client.exceptions.ApiException: (403)
Reason: Forbidden
HTTP response headers: HTTPHeaderDict({'Audit-Id': 'e660c754-85bb-4f88-ab57-752daeff4ffe', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Content-Type-Options': 'nosniff', 'X-Kubernetes-Pf-Flowschema-Uid': 'bcc8641e-042d-4fc2-97c9-2bc19aecef88', 'X-Kubernetes-Pf-Prioritylevel-Uid': '6bc5accb-c0e4-48b4-a976-918b8e7ec9cf', 'Date': 'Fri, 08 Apr 2022 14:10:22 GMT', 'Content-Length': '307'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"jobs.batch is forbidden: User \"system:serviceaccount:spark:default\" cannot create resource \"jobs\" in API group \"batch\" in the namespace \"spark\"","reason":"Forbidden","details":{"group":"batch","kind":"jobs"},"code":403}



Stack Trace:
  File "/usr/local/lib/python3.7/site-packages/dagster/core/execution/api.py", line 756, in pipeline_execution_iterator
    for event in pipeline_context.executor.execute(pipeline_context, execution_plan):
  File "/usr/local/lib/python3.7/site-packages/dagster/core/executor/step_delegating/step_delegating_executor.py", line 150, in execute
    pipeline_context, [step], active_execution
  File "/usr/local/lib/python3.7/site-packages/dagster_k8s/executor.py", line 210, in launch_step
    self._batch_api.create_namespaced_job(body=job, namespace=self._job_namespace)
  File "/usr/local/lib/python3.7/site-packages/kubernetes/client/api/batch_v1_api.py", line 210, in create_namespaced_job
    return self.create_namespaced_job_with_http_info(namespace, body, **kwargs)  # noqa: E501
  File "/usr/local/lib/python3.7/site-packages/kubernetes/client/api/batch_v1_api.py", line 323, in create_namespaced_job_with_http_info
    collection_formats=collection_formats)
  File "/usr/local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 353, in call_api
    _preload_content, _request_timeout, _host)
  File "/usr/local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 184, in __call_api
    _request_timeout=_request_timeout)
  File "/usr/local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 397, in request
    body=body)
  File "/usr/local/lib/python3.7/site-packages/kubernetes/client/rest.py", line 281, in POST
    body=body)
  File "/usr/local/lib/python3.7/site-packages/kubernetes/client/rest.py", line 234, in request
    raise ApiException(http_resp=r)
Even a partial help/direction to this blocker would be great, any help is much appreciated!🙂
j
It looks like your service account needs permission for the namespace:
Copy code
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"jobs.batch is forbidden: User \"system:serviceaccount:spark:default\" cannot create resource \"jobs\" in API group \"batch\" in the namespace \"spark\"","reason":"Forbidden","details":{"group":"batch","kind":"jobs"},"code":403}
We attach this role to the service account for dagit/jobs, but it’s scoped to your main namespace https://github.com/dagster-io/dagster/blob/master/helm/dagster/templates/role.yaml
It may be easier to stick to one namespace (if you already are, you just need to amend the new service accounts). There are other things you’ll need to add to another namespace for jobs to run like configmaps and secrets.
s
Thanks for the response Johann! The namespace issue is resolved, using the same and the pod is being created, but getting this error for the job pod dagster is trying to create
Copy code
Traceback (most recent call last):
  File "/usr/local/bin/dagster", line 8, in <module>
    sys.exit(main())
  File "/usr/local/lib/python3.7/site-packages/dagster/cli/__init__.py", line 48, in main
    cli(auto_envvar_prefix=ENV_PREFIX)  # pylint:disable=E1123
  File "/usr/local/lib/python3.7/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/usr/local/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/usr/local/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/usr/local/lib/python3.7/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/usr/local/lib/python3.7/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/dagster/cli/api.py", line 176, in execute_step_command
    args.pipeline_run_id
  File "/usr/local/lib/python3.7/site-packages/dagster/check/__init__.py", line 131, in inst
    raise _type_mismatch_error(obj, ttype, desc)
dagster.check.CheckError: Object None is not a PipelineRun. Got None with type <class 'NoneType'>. Desc: Pipeline run with id '9f91962d-0b86-42b6-b7f4-95aa9c498516' not found for step execution
j
That’s a strange error- it implies that the step worker wasn’t able to load the run from the DB. Could that run have been deleted during execution? Is it in some way possible that the step worker is connecting to a different DB?