Quentin Gaborit
07/26/2023, 3:55 PMAndrea Giardini
07/26/2023, 3:58 PMQuentin Gaborit
07/26/2023, 4:01 PMAndrea Giardini
07/26/2023, 4:02 PMQuentin Gaborit
07/26/2023, 4:05 PMAndrea Giardini
07/26/2023, 4:07 PMQuentin Gaborit
07/26/2023, 4:56 PMQuentin Gaborit
07/26/2023, 4:57 PMjob_image
parameter so I might be able to get to what I wantQuentin Gaborit
07/26/2023, 4:58 PMAndrea Giardini
07/26/2023, 5:02 PMRESUL YURTTAKALAN
09/29/2023, 8:09 AMQuentin Gaborit
09/29/2023, 8:27 AMdef get_k8s_executor_config(**kwargs) -> dict:
"""Return the configuration for the k8s executor.
Accepts all the parameters that can be passed to the
k8s executor, as specified by `_K8S_EXECUTOR_CONFIG_SCHEMA`
:return: The configuration to pass to the `k8sJobExecutor`.
"""
image_name = kwargs.get("image_name", os.environ["DAGSTER_DEPLOYMENT_IMAGE_NAME"])
image_tag = kwargs.get("image_tag", os.environ["DAGSTER_DEPLOYMENT_IMAGE_TAG"])
return {
"job_image": get_image_tag(image_name, image_tag),
"load_incluster_config": kwargs.get("load_incluster_config", True),
"kubeconfig_file": kwargs.get("kubeconfig_file", None),
"job_namespace": kwargs.get("job_namespace", "dagster"),
"retries": kwargs.get("retries", {"enabled": {}}),
"max_concurrent": kwargs.get("max_concurrent", 3),
"image_pull_policy": kwargs.get("image_pull_policy", "Always"),
"image_pull_secrets": kwargs.get("image_pull_secrets", None),
"service_account_name": kwargs.get("service_account_name", None),
"env_config_maps": kwargs.get("env_config_maps", None),
"env_secrets": kwargs.get("env_secrets", None),
"env_vars": kwargs.get("env_vars", None),
"volume_mounts": kwargs.get("volume_mounts", []),
"volumes": kwargs.get("volumes", []),
"labels": kwargs.get("labels", {}),
"resources": kwargs.get("resources", {"limits": {}, "requests": {}}),
"security_context": kwargs.get("security_context", {}),
}
job_config = get_executor_config(
resources=dict(
job_image=...,
requests=dict(
cpu="750m",
memory="1000Mi",
)
),
)
assets = ...
extract_data = define_asset_job(
name="extract_data",
description="Extract data from an API.",
selection=AssetSelection.assets(*assets), # pyright: ignore
executor_def=executor.configured(job_config),
metadata={
....
},
)
However, the kubernetes executor does only accept the k8s parameters referenced in the get_k8s_executor_config
. For the rest like annotations, node_affinity and so on, the job tags is the way to go :)