Hello, I'm planning on deploying an OSS instance o...
# deployment-kubernetes
q
Hello, I'm planning on deploying an OSS instance of Dagster with Helm on Kubernetes. I think I'll use the run coordinator kubernetes, which I saw allows specifying pod resources config at instance, job, and ops/assets levels. • Any chance we can also specify docker images in job definitions? • If yes would you have an example of the json syntax? Many thanks for your help
a
By default the job will use the same docker image of your code-repository. if you want certain steps of your pipeline to use custom docker images (maybe you have certain operations that don’t use python) you can use https://docs.dagster.io/_apidocs/libraries/dagster-k8s#ops
q
Hi @Andrea Giardini, thanks for your answer 🙂 What I mean is that I'd like to have one docker image for each job, rather than a single image for N jobs (and a potentially wide dependencies universe)
a
In that case I would have separate code-repositories!
q
Yeah I thought about that but I'm not sure that would be very relevant unless I have to deal with dependencies conflicts between 2 jobs. The idea here is more to have very lightweight images with pex executable rather than a single fat image to run in all my pods.
a
I understand. Generally you would keep in the same code-repository all the jobs that have the same dependencies and “grow together”. As soon as the dependencies become different, the jobs do radically different things or the jobs follow different release cycles I would split them across different repositories. It’s also a way to test code and organize jobs.
q
Ok so I understand that the K8S Yeah I was also thinking of workspaces as a way to split staging and production environments, hereby removing the need for a staging instance. is is something considered good practices?
Also according to the documentation there is a
job_image
parameter so I might be able to get to what I want
My bad for lazy reading the documentation 😶‍🌫️
a
I would keep staging /production on two separate dagster instances. otherwise testing upgrades becomes tricky
r
Hey @Quentin Gaborit I suppose this issue is same with mine. You can check this thread. Maybe it helps you.
q
thanks for the notif! In order to standardize the way I declare jobs, I've decided to configure executors passed to jobs rather than jobs themselves with:
Copy code
def get_k8s_executor_config(**kwargs) -> dict:
    """Return the configuration for the k8s executor.

    Accepts all the parameters that can be passed to the
    k8s executor, as specified by `_K8S_EXECUTOR_CONFIG_SCHEMA`

    :return: The configuration to pass to the `k8sJobExecutor`.
    """
    image_name = kwargs.get("image_name", os.environ["DAGSTER_DEPLOYMENT_IMAGE_NAME"])
    image_tag = kwargs.get("image_tag", os.environ["DAGSTER_DEPLOYMENT_IMAGE_TAG"])
    return {
        "job_image": get_image_tag(image_name, image_tag),
        "load_incluster_config": kwargs.get("load_incluster_config", True),
        "kubeconfig_file": kwargs.get("kubeconfig_file", None),
        "job_namespace": kwargs.get("job_namespace", "dagster"),
        "retries": kwargs.get("retries", {"enabled": {}}),
        "max_concurrent": kwargs.get("max_concurrent", 3),
        "image_pull_policy": kwargs.get("image_pull_policy", "Always"),
        "image_pull_secrets": kwargs.get("image_pull_secrets", None),
        "service_account_name": kwargs.get("service_account_name", None),
        "env_config_maps": kwargs.get("env_config_maps", None),
        "env_secrets": kwargs.get("env_secrets", None),
        "env_vars": kwargs.get("env_vars", None),
        "volume_mounts": kwargs.get("volume_mounts", []),
        "volumes": kwargs.get("volumes", []),
        "labels": kwargs.get("labels", {}),
        "resources": kwargs.get("resources", {"limits": {}, "requests": {}}),
        "security_context": kwargs.get("security_context", {}),
    }

job_config = get_executor_config(
    resources=dict(
        job_image=...,
        requests=dict(
            cpu="750m",
            memory="1000Mi",
        )
    ),
)

assets = ...

extract_data = define_asset_job(
    name="extract_data",
    description="Extract data from an API.",
    selection=AssetSelection.assets(*assets),  # pyright: ignore
    executor_def=executor.configured(job_config),
    metadata={
       ....
    },
)
However, the kubernetes executor does only accept the k8s parameters referenced in the
get_k8s_executor_config
. For the rest like annotations, node_affinity and so on, the job tags is the way to go :)