Alexandre Guitton
01/30/2023, 5:57 PMk8sRunLauncher
😄 ? My goal is to host dagit locally and run job's steps on the EKS cluster.
Do you know if this is supported by Dagster?
I currently have the following configuration:
run_launcher:
module: dagster_k8s.launcher
class: K8sRunLauncher
config:
dagster_home: /usr/src/app
job_image: <my-code-image>:0.0.1
image_pull_policy: Always
instance_config_map: dagster-instance
job_namespace: system-dagster
load_incluster_config: false
kubeconfig_file: /Users/alexandreguitton/.kube/config
postgres_password_secret: dagster-postgresql
volumes:
- configMap:
defaultMode: 420
name: dagster-instance
name: dagster-instance
resources:
# limits:
# cpu: 1
# memory: 200Mi
requests:
cpu: 1
memory: 200Mi
service_account_name: dagster
It runs the pod correctly, but the pod can't connect because of load_incluster_config
set to false, as the configuration is passed in the command and it doesn't seem that instance_config_map
works (or maybe I didn't understand how it works 😕)
Any help would be greatly appreciated 😄daniel
02/02/2023, 3:36 PMAlexandre Guitton
02/02/2023, 5:04 PMdagster.yaml
is:
run_launcher:
module: dagster_k8s.launcher
class: K8sRunLauncher
config:
dagster_home: /usr/src/app
job_image:
env: DAGSTER_JOB_IMAGE
image_pull_policy: Always
instance_config_map: dagster-instance
job_namespace: system-dagster
load_incluster_config: false
kubeconfig_file: ~/.kube/config
postgres_password_secret: dagster-postgresql
volumes:
- configMap:
defaultMode: 420
name: dagster-instance
name: dagster-instance
resources:
requests:
cpu: 1
memory: 200Mi
service_account_name: dagster
env_vars:
- DAGSTER_PG_HOSTNAME
- DAGSTER_PG_DB_NAME
- DAGSTER_PG_PORT
- DAGSTER_PG_USERNAME
- TMP_DAGSTER_BASE_DIR
- DAGSTER_JOB_IMAGE
When I launch the job, it has the following pod definition:
apiVersion: v1
kind: Pod
metadata:
annotations:
<http://kubernetes.io/psp|kubernetes.io/psp>: eks.privileged
creationTimestamp: "2023-02-02T16:52:46Z"
generateName: dagster-run-05ac286c-f6fc-4bb2-a7a7-d73d0daa6fa9-
labels:
<http://app.kubernetes.io/component|app.kubernetes.io/component>: run_worker
<http://app.kubernetes.io/instance|app.kubernetes.io/instance>: dagster
<http://app.kubernetes.io/name|app.kubernetes.io/name>: dagster
<http://app.kubernetes.io/part-of|app.kubernetes.io/part-of>: dagster
<http://app.kubernetes.io/version|app.kubernetes.io/version>: 1.1.14
controller-uid: 70971e8f-f283-4cee-bf0a-f8c9645ba518
dagster/job: ASSET_JOB
dagster/run-id: 05ac286c-f6fc-4bb2-a7a7-d73d0daa6fa9
job-name: dagster-run-05ac286c-f6fc-4bb2-a7a7-d73d0daa6fa9
name: dagster-run-05ac286c-f6fc-4bb2-a7a7-d73d0daa6fa9-vktln
namespace: system-dagster
ownerReferences:
- apiVersion: batch/v1
blockOwnerDeletion: true
controller: true
kind: Job
name: dagster-run-05ac286c-f6fc-4bb2-a7a7-d73d0daa6fa9
uid: 70971e8f-f283-4cee-bf0a-f8c9645ba518
resourceVersion: "7411517"
uid: 245a9713-bde9-4962-bd92-20c6f8e9b814
spec:
containers:
- args:
- dagster
- api
- execute_run
- '{"__class__": "ExecuteRunArgs", "instance_ref": {"__class__": "InstanceRef",
"compute_logs_data": {"__class__": "ConfigurableClassData", "class_name": "LocalComputeLogManager",
"config_yaml": "base_dir:\n env: TMP_DAGSTER_BASE_DIR\n", "module_name": "dagster.core.storage.local_compute_log_manager"},
"custom_instance_class_data": null, "event_storage_data": {"__class__": "ConfigurableClassData",
"class_name": "PostgresEventLogStorage", "config_yaml": "postgres_db:\n db_name:
dagster\n hostname:\n env: DAGSTER_PG_HOSTNAME\n params: {}\n password:\n env:
DAGSTER_PG_PASSWORD\n port:\n env: DAGSTER_PG_PORT\n username:\n env:
DAGSTER_PG_USERNAME\n", "module_name": "dagster_postgres"}, "local_artifact_storage_data":
{"__class__": "ConfigurableClassData", "class_name": "LocalArtifactStorage",
"config_yaml": "base_dir:\n env: TMP_DAGSTER_BASE_DIR\n", "module_name": "dagster.core.storage.root"},
"run_coordinator_data": {"__class__": "ConfigurableClassData", "class_name":
"DefaultRunCoordinator", "config_yaml": "{}\n", "module_name": "dagster.core.run_coordinator"},
"run_launcher_data": {"__class__": "ConfigurableClassData", "class_name": "K8sRunLauncher",
"config_yaml": "dagster_home: /usr/src/app\nenv_vars:\n- DAGSTER_PG_HOSTNAME\n-
DAGSTER_PG_DB_NAME\n- DAGSTER_PG_PORT\n- DAGSTER_PG_USERNAME\n- TMP_DAGSTER_BASE_DIR\n-
DAGSTER_JOB_IMAGE\nimage_pull_policy: Always\ninstance_config_map: dagster-instance\njob_image:\n env:
DAGSTER_JOB_IMAGE\njob_namespace: system-dagster\nkubeconfig_file: ~/.kube/config\nload_incluster_config:
false\npostgres_password_secret: dagster-postgresql\nresources:\n requests:\n cpu:
1\n memory: 200Mi\nservice_account_name: dagster\nvolumes:\n- configMap:\n defaultMode:
420\n name: dagster-instance\n name: dagster-instance\n", "module_name":
"dagster_k8s.launcher"}, "run_storage_data": {"__class__": "ConfigurableClassData",
"class_name": "PostgresRunStorage", "config_yaml": "postgres_db:\n db_name:
dagster\n hostname:\n env: DAGSTER_PG_HOSTNAME\n params: {}\n password:\n env:
DAGSTER_PG_PASSWORD\n port:\n env: DAGSTER_PG_PORT\n username:\n env:
DAGSTER_PG_USERNAME\n", "module_name": "dagster_postgres"}, "schedule_storage_data":
{"__class__": "ConfigurableClassData", "class_name": "PostgresScheduleStorage",
"config_yaml": "postgres_db:\n db_name: dagster\n hostname:\n env: DAGSTER_PG_HOSTNAME\n params:
{}\n password:\n env: DAGSTER_PG_PASSWORD\n port:\n env: DAGSTER_PG_PORT\n username:\n env:
DAGSTER_PG_USERNAME\n", "module_name": "dagster_postgres"}, "scheduler_data":
{"__class__": "ConfigurableClassData", "class_name": "DagsterDaemonScheduler",
"config_yaml": "{}\n", "module_name": "dagster.core.scheduler"}, "secrets_loader_data":
null, "settings": {}, "storage_data": {"__class__": "ConfigurableClassData",
"class_name": "DagsterPostgresStorage", "config_yaml": "postgres_db:\n db_name:
dagster\n hostname:\n env: DAGSTER_PG_HOSTNAME\n params: {}\n password:\n env:
DAGSTER_PG_PASSWORD\n port:\n env: DAGSTER_PG_PORT\n username:\n env:
DAGSTER_PG_USERNAME\n", "module_name": "dagster_postgres"}}, "pipeline_origin":
{"__class__": "PipelinePythonOrigin", "pipeline_name": "__ASSET_JOB", "repository_origin":
{"__class__": "RepositoryPythonOrigin", "code_pointer": {"__class__": "ModuleCodePointer",
"fn_name": "defs", "module": "github_slack", "working_directory": "/usr/src/app"},
"container_context": {}, "container_image": null, "entry_point": ["dagster"[],
"executable_path": "/root/.cache/pypoetry/virtualenvs/github-slack-VA82Wl8V-py3.9/bin/python"}},
"pipeline_run_id": "05ac286c-f6fc-4bb2-a7a7-d73d0daa6fa9", "set_exit_code_on_failure":
null}'
env:
- name: DAGSTER_RUN_JOB_NAME
value: __ASSET_JOB
- name: DAGSTER_HOME
value: /usr/src/app
- name: DAGSTER_PG_PASSWORD
valueFrom:
secretKeyRef:
key: postgresql-password
name: dagster-postgresql
- name: DAGSTER_PG_PORT
value: "5432"
- name: TMP_DAGSTER_BASE_DIR
value: /usr/src/app/tmp
- name: DAGSTER_JOB_IMAGE
value: ...
- name: DAGSTER_PG_DB_NAME
value: dagster
- name: DAGSTER_PG_USERNAME
value: dagster
- name: DAGSTER_PG_HOSTNAME
value: ...
image: ...
imagePullPolicy: Always
name: dagster
resources:
requests:
cpu: "1"
memory: 200Mi
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
volumeMounts:
- mountPath: /var/run/secrets/kubernetes.io/serviceaccount
name: kube-api-access-7x5rc
readOnly: true
dnsPolicy: ClusterFirst
enableServiceLinks: true
nodeName: ip-10-200-17-172.eu-central-1.compute.internal
preemptionPolicy: PreemptLowerPriority
priority: 0
restartPolicy: Never
schedulerName: default-scheduler
securityContext: {}
serviceAccount: dagster
serviceAccountName: dagster
terminationGracePeriodSeconds: 30
tolerations:
- effect: NoExecute
key: <http://node.kubernetes.io/not-ready|node.kubernetes.io/not-ready>
operator: Exists
tolerationSeconds: 300
- effect: NoExecute
key: <http://node.kubernetes.io/unreachable|node.kubernetes.io/unreachable>
operator: Exists
tolerationSeconds: 300
volumes:
- configMap:
defaultMode: 420
name: dagster-instance
name: dagster-instance
- name: kube-api-access-7x5rc
projected:
defaultMode: 420
sources:
- serviceAccountToken:
expirationSeconds: 3607
path: token
- configMap:
items:
- key: ca.crt
path: ca.crt
name: kube-root-ca.crt
- downwardAPI:
items:
- fieldRef:
apiVersion: v1
fieldPath: metadata.namespace
path: namespace
And the pod fails with the following error:
{"__class__": "DagsterEvent", "event_specific_data": {"__class__": "EngineEventData", "error": {"__class__": "SerializableErrorInfo", "cause": null, "cls_name": "ConfigException", "context": null, "message": "kubernetes.config.config_exception.ConfigException: Invalid kube-config file. No configuration found.\n", "stack": [" File \"/usr/local/lib/python3.9/site-packages/dagster/_grpc/impl.py\", line 130, in core_execute_run\n yield from execute_run_iterator(\n", " File \"/usr/local/lib/python3.9/site-packages/dagster/_core/execution/api.py\", line 1097, in __iter__\n yield from self.execution_context_manager.prepare_context()\n", " File \"/usr/local/lib/python3.9/site-packages/dagster/_utils/__init__.py\", line 521, in generate_setup_events\n obj = next(self.generator)\n", " File \"/usr/local/lib/python3.9/site-packages/dagster/_core/execution/context_creation_pipeline.py\", line 333, in orchestration_context_event_generator\n executor = create_executor(context_creation_data)\n", " File \"/usr/local/lib/python3.9/site-packages/dagster/_core/execution/context_creation_pipeline.py\", line 426, in create_executor\n return creation_fn(init_context)\n", " File \"/usr/local/lib/python3.9/site-packages/dagster_k8s/executor.py\", line 96, in k8s_job_executor\n run_launcher = init_context.instance.run_launcher\n", " File \"/usr/local/lib/python3.9/site-packages/dagster/_core/instance/__init__.py\", line 659, in run_launcher\n launcher = cast(InstanceRef, self._ref).run_launcher\n", " File \"/usr/local/lib/python3.9/site-packages/dagster/_core/instance/ref.py\", line 491, in run_launcher\n return self.run_launcher_data.rehydrate() if self.run_launcher_data else None\n", " File \"/usr/local/lib/python3.9/site-packages/dagster/_serdes/config_class.py\", line 101, in rehydrate\n return klass.from_config_value(self, check.not_none(result.value))\n", " File \"/usr/local/lib/python3.9/site-packages/dagster_k8s/launcher.py\", line 200, in from_config_value\n return cls(inst_data=inst_data, **config_value)\n", " File \"/usr/local/lib/python3.9/site-packages/dagster_k8s/launcher.py\", line 87, in __init__\n kubernetes.config.load_kube_config(kubeconfig_file)\n", " File \"/usr/local/lib/python3.9/site-packages/kubernetes/config/kube_config.py\", line 813, in load_kube_config\n loader = _get_kube_config_loader(\n", " File \"/usr/local/lib/python3.9/site-packages/kubernetes/config/kube_config.py\", line 770, in _get_kube_config_loader\n raise ConfigException(\n"]}, "marker_end": null, "marker_start": null, "metadata_entries": []}, "event_type_value": "ENGINE_EVENT", "logging_tags": {}, "message": "An exception was thrown during execution that is likely a framework error, rather than an error in user code.", "pid": null, "pipeline_name": "__ASSET_JOB", "solid_handle": null, "step_handle": null, "step_key": null, "step_kind_value": null}
{"__class__": "DagsterEvent", "event_specific_data": null, "event_type_value": "PIPELINE_FAILURE", "logging_tags": {}, "message": "This run has been marked as failed from outside the execution context.", "pid": null, "pipeline_name": "__ASSET_JOB", "solid_handle": null, "step_handle": null, "step_key": null, "step_kind_value": null}
Stream closed EOF for system-dagster/dagster-run-05ac286c-f6fc-4bb2-a7a7-d73d0daa6fa9-vktln (dagster)
I guess it fails because the run_launch
configuration is passed through the command line:
job_namespace: system-dagster\nkubeconfig_file: ~/.kube/config\nload_incluster_config:
false
But I expected that instance_config_map
to allow the launched job to use the dagster-instance
configmap instead, which has the following definition:apiVersion: v1
data:
dagster.yaml: "scheduler: \n module: dagster.core.scheduler\n class: DagsterDaemonScheduler\n\nschedule_storage:\n
\ module: dagster_postgres.schedule_storage\n class: PostgresScheduleStorage\n
\ config: \n postgres_db:\n username: dagster\n password:\n
\ env: DAGSTER_PG_PASSWORD\n hostname: \"<http://dagster.cluster-ctyrh80ang3y.eu-central-1.rds.amazonaws.com|dagster.cluster-ctyrh80ang3y.eu-central-1.rds.amazonaws.com>\"\n
\ db_name: dagster\n port: 5432\n params:\n {}\n\nrun_launcher:
\ \n module: dagster_k8s\n class: K8sRunLauncher\n config:\n load_incluster_config:
true\n job_namespace: system-dagster\n image_pull_policy: Always\n service_account_name:
dagster\n dagster_home: \"/opt/dagster/dagster_home\"\n instance_config_map:
\"dagster-instance\"\n postgres_password_secret: \"dagster-postgresql\"\n\nrun_storage:\n
\ module: dagster_postgres.run_storage\n class: PostgresRunStorage\n config:
\ \n postgres_db:\n username: dagster\n password:\n env:
DAGSTER_PG_PASSWORD\n hostname: \"...\"\n
\ db_name: dagster\n port: 5432\n params:\n {}\n\nevent_log_storage:\n
\ module: dagster_postgres.event_log\n class: PostgresEventLogStorage\n config:
\ \n postgres_db:\n username: dagster\n password:\n env:
DAGSTER_PG_PASSWORD\n hostname: \"...\"\n
\ db_name: dagster\n port: 5432\n params:\n {}\ncompute_logs:
\ \n module: dagster.core.storage.noop_compute_log_manager\n class: NoOpComputeLogManager\n\ntelemetry:\n
\ enabled: true\n"
kind: ConfigMap
metadata:
annotations:
<http://meta.helm.sh/release-name|meta.helm.sh/release-name>: dagster
<http://meta.helm.sh/release-namespace|meta.helm.sh/release-namespace>: system-dagster
creationTimestamp: "2023-01-30T17:03:33Z"
labels:
app: dagster
<http://app.kubernetes.io/managed-by|app.kubernetes.io/managed-by>: Helm
chart: dagster-1.1.14
heritage: Helm
release: dagster
name: dagster-instance
namespace: system-dagster
resourceVersion: "6397349"
uid: 1cb5fe9b-ea71-4ee5-98db-5e6940a128d7
daniel
02/02/2023, 5:05 PMAlexandre Guitton
02/02/2023, 5:13 PMdefs = Definitions(
# assets=[airbyte_assets, hex_notebook_asset] + dbt_assets,
assets=[airbyte_assets] + dbt_assets,
resources={
"dbt": dbt_cli_resource.configured(
{
"project_dir": DBT_PROJECT_DIR,
"profiles_dir": DBT_PROJECT_DIR,
}
),
# "hex": hex_resource.configured(
# {
# "api_key": {"env": "HEX_TOKEN"},
# }
# ),
},
jobs=[update_dash_job],
executor=k8s_job_executor,
# schedules=[update_dash_schedule],
)
If I deploy to Kubernetes my user-deployment, it works perfectly, but it means that every time I want to test my code (e.g to test Kubernetes Service Account accesses), I have to push to my k8s cluster the user-deployment and run to test.
I was wondering if I could avoid the user-deployment on k8s, by just running dagit locally with the job execution on the k8s cluster.daniel
02/02/2023, 5:13 PMAlexandre Guitton
02/02/2023, 5:22 PMdaniel
02/02/2023, 5:24 PMAlexandre Guitton
02/02/2023, 5:37 PMinstance_config_map
field for?daniel
02/02/2023, 5:37 PMAlexandre Guitton
02/02/2023, 5:51 PMTadasG
03/22/2023, 2:12 PMdagster._check.CheckError: Expected non-None value: Pipeline run with id '82c96833-8188-4cbd-bcaa-c382593a4158' not found for run execution.
Dagit on K8s cannot find this job and fails, since the executor is running on my machine >.>daniel
03/22/2023, 2:14 PMTadasG
03/22/2023, 2:34 PM