Hello :wave: I am trying to run a local Dagster wi...
# ask-community
a
Hello ๐Ÿ‘‹ I am trying to run a local Dagster with a
k8sRunLauncher
๐Ÿ˜„ ? My goal is to host dagit locally and run job's steps on the EKS cluster. Do you know if this is supported by Dagster? I currently have the following configuration:
Copy code
run_launcher:
  module: dagster_k8s.launcher
  class: K8sRunLauncher
  config:
    dagster_home: /usr/src/app
    job_image: <my-code-image>:0.0.1
    image_pull_policy: Always
    instance_config_map: dagster-instance
    job_namespace: system-dagster
    load_incluster_config: false
    kubeconfig_file: /Users/alexandreguitton/.kube/config
    postgres_password_secret: dagster-postgresql
    volumes:
      - configMap:
          defaultMode: 420
          name: dagster-instance
        name: dagster-instance
    resources:
#      limits:
#        cpu: 1
#        memory: 200Mi
      requests:
        cpu: 1
        memory: 200Mi
    service_account_name: dagster
It runs the pod correctly, but the pod can't connect because of
load_incluster_config
set to false, as the configuration is passed in the command and it doesn't seem that
instance_config_map
works (or maybe I didn't understand how it works ๐Ÿ˜•) Any help would be greatly appreciated ๐Ÿ˜„
Up, anyone to help me on this ๐Ÿ˜• ?
d
Hey Alexandre - it might take some work on our side to fully support this, although i think its a legitimate use case. Can you share the exact error that you're seeing when you try to do this?
a
Hey ! Thanks for the reply ๐Ÿ™‚ So my current local configuration of
dagster.yaml
is:
Copy code
run_launcher:
  module: dagster_k8s.launcher
  class: K8sRunLauncher
  config:
    dagster_home: /usr/src/app
    job_image:
      env: DAGSTER_JOB_IMAGE
    image_pull_policy: Always
    instance_config_map: dagster-instance
    job_namespace: system-dagster
    load_incluster_config: false
    kubeconfig_file: ~/.kube/config
    postgres_password_secret: dagster-postgresql
    volumes:
      - configMap:
          defaultMode: 420
          name: dagster-instance
        name: dagster-instance
    resources:
      requests:
        cpu: 1
        memory: 200Mi
    service_account_name: dagster
    env_vars:
      - DAGSTER_PG_HOSTNAME
      - DAGSTER_PG_DB_NAME
      - DAGSTER_PG_PORT
      - DAGSTER_PG_USERNAME
      - TMP_DAGSTER_BASE_DIR
      - DAGSTER_JOB_IMAGE
When I launch the job, it has the following pod definition:
Copy code
apiVersion: v1
kind: Pod
metadata:
  annotations:
    <http://kubernetes.io/psp|kubernetes.io/psp>: eks.privileged
  creationTimestamp: "2023-02-02T16:52:46Z"
  generateName: dagster-run-05ac286c-f6fc-4bb2-a7a7-d73d0daa6fa9-
  labels:
    <http://app.kubernetes.io/component|app.kubernetes.io/component>: run_worker
    <http://app.kubernetes.io/instance|app.kubernetes.io/instance>: dagster
    <http://app.kubernetes.io/name|app.kubernetes.io/name>: dagster
    <http://app.kubernetes.io/part-of|app.kubernetes.io/part-of>: dagster
    <http://app.kubernetes.io/version|app.kubernetes.io/version>: 1.1.14
    controller-uid: 70971e8f-f283-4cee-bf0a-f8c9645ba518
    dagster/job: ASSET_JOB
    dagster/run-id: 05ac286c-f6fc-4bb2-a7a7-d73d0daa6fa9
    job-name: dagster-run-05ac286c-f6fc-4bb2-a7a7-d73d0daa6fa9
  name: dagster-run-05ac286c-f6fc-4bb2-a7a7-d73d0daa6fa9-vktln
  namespace: system-dagster
  ownerReferences:
  - apiVersion: batch/v1
    blockOwnerDeletion: true
    controller: true
    kind: Job
    name: dagster-run-05ac286c-f6fc-4bb2-a7a7-d73d0daa6fa9
    uid: 70971e8f-f283-4cee-bf0a-f8c9645ba518
  resourceVersion: "7411517"
  uid: 245a9713-bde9-4962-bd92-20c6f8e9b814
spec:
  containers:
  - args:
    - dagster
    - api
    - execute_run
    - '{"__class__": "ExecuteRunArgs", "instance_ref": {"__class__": "InstanceRef",
      "compute_logs_data": {"__class__": "ConfigurableClassData", "class_name": "LocalComputeLogManager",
      "config_yaml": "base_dir:\n  env: TMP_DAGSTER_BASE_DIR\n", "module_name": "dagster.core.storage.local_compute_log_manager"},
      "custom_instance_class_data": null, "event_storage_data": {"__class__": "ConfigurableClassData",
      "class_name": "PostgresEventLogStorage", "config_yaml": "postgres_db:\n  db_name:
      dagster\n  hostname:\n    env: DAGSTER_PG_HOSTNAME\n  params: {}\n  password:\n    env:
      DAGSTER_PG_PASSWORD\n  port:\n    env: DAGSTER_PG_PORT\n  username:\n    env:
      DAGSTER_PG_USERNAME\n", "module_name": "dagster_postgres"}, "local_artifact_storage_data":
      {"__class__": "ConfigurableClassData", "class_name": "LocalArtifactStorage",
      "config_yaml": "base_dir:\n  env: TMP_DAGSTER_BASE_DIR\n", "module_name": "dagster.core.storage.root"},
      "run_coordinator_data": {"__class__": "ConfigurableClassData", "class_name":
      "DefaultRunCoordinator", "config_yaml": "{}\n", "module_name": "dagster.core.run_coordinator"},
      "run_launcher_data": {"__class__": "ConfigurableClassData", "class_name": "K8sRunLauncher",
      "config_yaml": "dagster_home: /usr/src/app\nenv_vars:\n- DAGSTER_PG_HOSTNAME\n-
      DAGSTER_PG_DB_NAME\n- DAGSTER_PG_PORT\n- DAGSTER_PG_USERNAME\n- TMP_DAGSTER_BASE_DIR\n-
      DAGSTER_JOB_IMAGE\nimage_pull_policy: Always\ninstance_config_map: dagster-instance\njob_image:\n  env:
      DAGSTER_JOB_IMAGE\njob_namespace: system-dagster\nkubeconfig_file: ~/.kube/config\nload_incluster_config:
      false\npostgres_password_secret: dagster-postgresql\nresources:\n  requests:\n    cpu:
      1\n    memory: 200Mi\nservice_account_name: dagster\nvolumes:\n- configMap:\n    defaultMode:
      420\n    name: dagster-instance\n  name: dagster-instance\n", "module_name":
      "dagster_k8s.launcher"}, "run_storage_data": {"__class__": "ConfigurableClassData",
      "class_name": "PostgresRunStorage", "config_yaml": "postgres_db:\n  db_name:
      dagster\n  hostname:\n    env: DAGSTER_PG_HOSTNAME\n  params: {}\n  password:\n    env:
      DAGSTER_PG_PASSWORD\n  port:\n    env: DAGSTER_PG_PORT\n  username:\n    env:
      DAGSTER_PG_USERNAME\n", "module_name": "dagster_postgres"}, "schedule_storage_data":
      {"__class__": "ConfigurableClassData", "class_name": "PostgresScheduleStorage",
      "config_yaml": "postgres_db:\n  db_name: dagster\n  hostname:\n    env: DAGSTER_PG_HOSTNAME\n  params:
      {}\n  password:\n    env: DAGSTER_PG_PASSWORD\n  port:\n    env: DAGSTER_PG_PORT\n  username:\n    env:
      DAGSTER_PG_USERNAME\n", "module_name": "dagster_postgres"}, "scheduler_data":
      {"__class__": "ConfigurableClassData", "class_name": "DagsterDaemonScheduler",
      "config_yaml": "{}\n", "module_name": "dagster.core.scheduler"}, "secrets_loader_data":
      null, "settings": {}, "storage_data": {"__class__": "ConfigurableClassData",
      "class_name": "DagsterPostgresStorage", "config_yaml": "postgres_db:\n  db_name:
      dagster\n  hostname:\n    env: DAGSTER_PG_HOSTNAME\n  params: {}\n  password:\n    env:
      DAGSTER_PG_PASSWORD\n  port:\n    env: DAGSTER_PG_PORT\n  username:\n    env:
      DAGSTER_PG_USERNAME\n", "module_name": "dagster_postgres"}}, "pipeline_origin":
      {"__class__": "PipelinePythonOrigin", "pipeline_name": "__ASSET_JOB", "repository_origin":
      {"__class__": "RepositoryPythonOrigin", "code_pointer": {"__class__": "ModuleCodePointer",
      "fn_name": "defs", "module": "github_slack", "working_directory": "/usr/src/app"},
      "container_context": {}, "container_image": null, "entry_point": ["dagster"[],
      "executable_path": "/root/.cache/pypoetry/virtualenvs/github-slack-VA82Wl8V-py3.9/bin/python"}},
      "pipeline_run_id": "05ac286c-f6fc-4bb2-a7a7-d73d0daa6fa9", "set_exit_code_on_failure":
      null}'
    env:
    - name: DAGSTER_RUN_JOB_NAME
      value: __ASSET_JOB
    - name: DAGSTER_HOME
      value: /usr/src/app
    - name: DAGSTER_PG_PASSWORD
      valueFrom:
        secretKeyRef:
          key: postgresql-password
          name: dagster-postgresql
    - name: DAGSTER_PG_PORT
      value: "5432"
    - name: TMP_DAGSTER_BASE_DIR
      value: /usr/src/app/tmp
    - name: DAGSTER_JOB_IMAGE
      value: ...
    - name: DAGSTER_PG_DB_NAME
      value: dagster
    - name: DAGSTER_PG_USERNAME
      value: dagster
    - name: DAGSTER_PG_HOSTNAME
      value: ...
    image: ...
    imagePullPolicy: Always
    name: dagster
    resources:
      requests:
        cpu: "1"
        memory: 200Mi
    terminationMessagePath: /dev/termination-log
    terminationMessagePolicy: File
    volumeMounts:
    - mountPath: /var/run/secrets/kubernetes.io/serviceaccount
      name: kube-api-access-7x5rc
      readOnly: true
  dnsPolicy: ClusterFirst
  enableServiceLinks: true
  nodeName: ip-10-200-17-172.eu-central-1.compute.internal
  preemptionPolicy: PreemptLowerPriority
  priority: 0
  restartPolicy: Never
  schedulerName: default-scheduler
  securityContext: {}
  serviceAccount: dagster
  serviceAccountName: dagster
  terminationGracePeriodSeconds: 30
  tolerations:
  - effect: NoExecute
    key: <http://node.kubernetes.io/not-ready|node.kubernetes.io/not-ready>
    operator: Exists
    tolerationSeconds: 300
  - effect: NoExecute
    key: <http://node.kubernetes.io/unreachable|node.kubernetes.io/unreachable>
    operator: Exists
    tolerationSeconds: 300
  volumes:
  - configMap:
      defaultMode: 420
      name: dagster-instance
    name: dagster-instance
  - name: kube-api-access-7x5rc
    projected:
      defaultMode: 420
      sources:
      - serviceAccountToken:
          expirationSeconds: 3607
          path: token
      - configMap:
          items:
          - key: ca.crt
            path: ca.crt
          name: kube-root-ca.crt
      - downwardAPI:
          items:
          - fieldRef:
              apiVersion: v1
              fieldPath: metadata.namespace
            path: namespace
And the pod fails with the following error:
Copy code
{"__class__": "DagsterEvent", "event_specific_data": {"__class__": "EngineEventData", "error": {"__class__": "SerializableErrorInfo", "cause": null, "cls_name": "ConfigException", "context": null, "message": "kubernetes.config.config_exception.ConfigException: Invalid kube-config file. No configuration found.\n", "stack": ["  File \"/usr/local/lib/python3.9/site-packages/dagster/_grpc/impl.py\", line 130, in core_execute_run\n    yield from execute_run_iterator(\n", "  File \"/usr/local/lib/python3.9/site-packages/dagster/_core/execution/api.py\", line 1097, in __iter__\n    yield from self.execution_context_manager.prepare_context()\n", "  File \"/usr/local/lib/python3.9/site-packages/dagster/_utils/__init__.py\", line 521, in generate_setup_events\n    obj = next(self.generator)\n", "  File \"/usr/local/lib/python3.9/site-packages/dagster/_core/execution/context_creation_pipeline.py\", line 333, in orchestration_context_event_generator\n    executor = create_executor(context_creation_data)\n", "  File \"/usr/local/lib/python3.9/site-packages/dagster/_core/execution/context_creation_pipeline.py\", line 426, in create_executor\n    return creation_fn(init_context)\n", "  File \"/usr/local/lib/python3.9/site-packages/dagster_k8s/executor.py\", line 96, in k8s_job_executor\n    run_launcher = init_context.instance.run_launcher\n", "  File \"/usr/local/lib/python3.9/site-packages/dagster/_core/instance/__init__.py\", line 659, in run_launcher\n    launcher = cast(InstanceRef, self._ref).run_launcher\n", "  File \"/usr/local/lib/python3.9/site-packages/dagster/_core/instance/ref.py\", line 491, in run_launcher\n    return self.run_launcher_data.rehydrate() if self.run_launcher_data else None\n", "  File \"/usr/local/lib/python3.9/site-packages/dagster/_serdes/config_class.py\", line 101, in rehydrate\n    return klass.from_config_value(self, check.not_none(result.value))\n", "  File \"/usr/local/lib/python3.9/site-packages/dagster_k8s/launcher.py\", line 200, in from_config_value\n    return cls(inst_data=inst_data, **config_value)\n", "  File \"/usr/local/lib/python3.9/site-packages/dagster_k8s/launcher.py\", line 87, in __init__\n    kubernetes.config.load_kube_config(kubeconfig_file)\n", "  File \"/usr/local/lib/python3.9/site-packages/kubernetes/config/kube_config.py\", line 813, in load_kube_config\n    loader = _get_kube_config_loader(\n", "  File \"/usr/local/lib/python3.9/site-packages/kubernetes/config/kube_config.py\", line 770, in _get_kube_config_loader\n    raise ConfigException(\n"]}, "marker_end": null, "marker_start": null, "metadata_entries": []}, "event_type_value": "ENGINE_EVENT", "logging_tags": {}, "message": "An exception was thrown during execution that is likely a framework error, rather than an error in user code.", "pid": null, "pipeline_name": "__ASSET_JOB", "solid_handle": null, "step_handle": null, "step_key": null, "step_kind_value": null}
{"__class__": "DagsterEvent", "event_specific_data": null, "event_type_value": "PIPELINE_FAILURE", "logging_tags": {}, "message": "This run has been marked as failed from outside the execution context.", "pid": null, "pipeline_name": "__ASSET_JOB", "solid_handle": null, "step_handle": null, "step_key": null, "step_kind_value": null}
Stream closed EOF for system-dagster/dagster-run-05ac286c-f6fc-4bb2-a7a7-d73d0daa6fa9-vktln (dagster)
I guess it fails because the
run_launch
configuration is passed through the command line:
Copy code
job_namespace: system-dagster\nkubeconfig_file: ~/.kube/config\nload_incluster_config:
      false
But I expected that
instance_config_map
to allow the launched job to use the
dagster-instance
configmap instead, which has the following definition:
Copy code
apiVersion: v1
data:
  dagster.yaml: "scheduler:      \n  module: dagster.core.scheduler\n  class: DagsterDaemonScheduler\n\nschedule_storage:\n
    \ module: dagster_postgres.schedule_storage\n  class: PostgresScheduleStorage\n
    \ config:        \n    postgres_db:\n      username: dagster\n      password:\n
    \       env: DAGSTER_PG_PASSWORD\n      hostname: \"<http://dagster.cluster-ctyrh80ang3y.eu-central-1.rds.amazonaws.com|dagster.cluster-ctyrh80ang3y.eu-central-1.rds.amazonaws.com>\"\n
    \     db_name: dagster\n      port: 5432\n      params:\n        {}\n\nrun_launcher:
    \     \n  module: dagster_k8s\n  class: K8sRunLauncher\n  config:\n    load_incluster_config:
    true\n    job_namespace: system-dagster\n    image_pull_policy: Always\n    service_account_name:
    dagster\n    dagster_home: \"/opt/dagster/dagster_home\"\n    instance_config_map:
    \"dagster-instance\"\n    postgres_password_secret: \"dagster-postgresql\"\n\nrun_storage:\n
    \ module: dagster_postgres.run_storage\n  class: PostgresRunStorage\n  config:
    \       \n    postgres_db:\n      username: dagster\n      password:\n        env:
    DAGSTER_PG_PASSWORD\n      hostname: \"...\"\n
    \     db_name: dagster\n      port: 5432\n      params:\n        {}\n\nevent_log_storage:\n
    \ module: dagster_postgres.event_log\n  class: PostgresEventLogStorage\n  config:
    \       \n    postgres_db:\n      username: dagster\n      password:\n        env:
    DAGSTER_PG_PASSWORD\n      hostname: \"...\"\n
    \     db_name: dagster\n      port: 5432\n      params:\n        {}\ncompute_logs:
    \     \n  module: dagster.core.storage.noop_compute_log_manager\n  class: NoOpComputeLogManager\n\ntelemetry:\n
    \ enabled: true\n"
kind: ConfigMap
metadata:
  annotations:
    <http://meta.helm.sh/release-name|meta.helm.sh/release-name>: dagster
    <http://meta.helm.sh/release-namespace|meta.helm.sh/release-namespace>: system-dagster
  creationTimestamp: "2023-01-30T17:03:33Z"
  labels:
    app: dagster
    <http://app.kubernetes.io/managed-by|app.kubernetes.io/managed-by>: Helm
    chart: dagster-1.1.14
    heritage: Helm
    release: dagster
  name: dagster-instance
  namespace: system-dagster
  resourceVersion: "6397349"
  uid: 1cb5fe9b-ea71-4ee5-98db-5e6940a128d7
d
Are you using the k8s_job_executor as well?
I would suggest starting with getting it working without that - that executor requires the pod that it spins up to also be able to launch pods for each step, so its probably a bit tricky to get both working with the same config
a
Yes, my Definitions is:
Copy code
defs = Definitions(
    # assets=[airbyte_assets, hex_notebook_asset] + dbt_assets,
    assets=[airbyte_assets] + dbt_assets,
    resources={
        "dbt": dbt_cli_resource.configured(
            {
                "project_dir": DBT_PROJECT_DIR,
                "profiles_dir": DBT_PROJECT_DIR,
            }
        ),
        # "hex": hex_resource.configured(
        #     {
        #         "api_key": {"env": "HEX_TOKEN"},
        #     }
        # ),
    },
    jobs=[update_dash_job],
    executor=k8s_job_executor,
    # schedules=[update_dash_schedule],
)
If I deploy to Kubernetes my user-deployment, it works perfectly, but it means that every time I want to test my code (e.g to test Kubernetes Service Account accesses), I have to push to my k8s cluster the user-deployment and run to test. I was wondering if I could avoid the user-deployment on k8s, by just running dagit locally with the job execution on the k8s cluster.
d
does it work if you use the default executor instead? (commenting out the executor= part)
just to start
if it does then we can move on to getting it working with the executor
a
Yes, it seems to work fine ๐Ÿ™‚
d
OK, I think to make this work we would need to expose a load_incluster_config config field on the executor, and you would need to set it to True so that it loads the k8s config from within the cluster instead of trying to find your local kubeconfig file
alternately, you could try to include the kubeconfig that your local machine is using on the launched pod for the dagster job - i'm not actually 100% sure if that will work
is this something you'd be willing to file a feature request issue for on github?
(I'm assuming its important to you to use the k8s_job_executor - i.e. you want each step to run in its own pod)?
a
Yes, we really need it. But I think I am missing something to understand the solution. Why does the run pod try to load the kubeconfig file when I run from a local, and not when it's from a Kubernetes user deployment? Is this because it defaults to the K8sRunLauncher configuration? If so, in that case, what is the
instance_config_map
field for?
d
the run pod tries to load the kubeconfig file so that it can spin up a pod for each step
I think instance_config_map is more or less deprecated
although I see how it would be useful in this specific situation
๐ŸŒˆ 1
instead, the instance config to use is passed in as an argument to the command
๐ŸŒˆ 1
a
Just created the feature-request! Thank you for this quick and efficient discussion dagster bot responded by community
๐Ÿ™ 1
t
maybe you guys managed to fix this? I'm wondering how are you able to join local dagit vs kubernetes dagit? I'm getting
Copy code
dagster._check.CheckError: Expected non-None value: Pipeline run with id '82c96833-8188-4cbd-bcaa-c382593a4158' not found for run execution.
Dagit on K8s cannot find this job and fails, since the executor is running on my machine >.>
d
Theyโ€™ll need to share storage for this to work, for example in an external postgres DB that both local and Kubernetes have access to
t
works! thanks ๐Ÿ™‚