https://dagster.io/ logo
#deployment-kubernetes
Title
# deployment-kubernetes
r

raul

12/21/2021, 3:34 PM
Hello Everyone I'm trying to run a different container image as part of dagster op(This image will have its own dependency's and file with in it)... But i'm getting lost finding the documentation for that around( very new to dagster and its concepts).. i saw couple posts but couldn't get clarity on how its wired up. would anyone mind sharing sample code that you have implemented so i can understand the concepts myself.
d

daniel

12/21/2021, 3:37 PM
Hi raul - here's an example of how you can tag an op to run in a separate image (this assumes that you are using the k8s_job_executor to run each op in its own job)
Copy code
@op(
  tags = {
    'dagster-k8s/config': {
      'container_config': {
        'image': 'foobar:latest'
      },
    },
  },
)
Take caution though- this is an experimental feature and it’s easy to reach confusing errors. All the images need to have identical versions of your job defined in them, and in the same location (could be a module, file, etc. depending on your workspace). We’ll also try to use the same python executable path in all images.
@Dagster Bot docs document how to override the image for an op (along with the caveats)
d

Dagster Bot

12/21/2021, 3:38 PM
r

raul

12/21/2021, 3:45 PM
Thanks for the response @daniel. From your comment can i not execute container with different language or folder structure in a job ? meaning one op runs dbt image another run python image and another one runs bash.. And these images are build with different different dependency's and folder structures ? "All the images need to have identical versions of your job defined in them, and in the same location (could be a module, file, etc. depending on your workspace)."
d

daniel

12/21/2021, 3:46 PM
Ah, you can absolutely execute a container with a different language - but in that case it would be an op that runs the container over the k8s api. Somebody just posted some example code of that the other day, let me pull it up
Ah here it is just a couple of posts up actually, there's some example code here: https://dagster.slack.com/archives/C014N0PK37E/p1639479433039200
r

raul

12/21/2021, 3:48 PM
would you mind posting a complete example if you happen to have one around, sorry to ask just trying understand by deploying that locally ?
d

daniel

12/21/2021, 3:50 PM
I don't actually have a complete example lying around unfortunately. @Andrea Giardini or @Manny Schneck do you possibly have something that you'd be able/willing to share here based on what you built?
a

Andrea Giardini

12/21/2021, 3:54 PM
This is an example of an op creating a Job and waiting for it to complete:
Copy code
@op(
    ins={
        "output_folder": In(str)
    }
)
def create_k8s_job(context, el, output_folder):

    from kubernetes import client, config

    if os.getenv('KUBERNETES_SERVICE_HOST'):
        # If I am running in Kubernetes, use the "in cluster" configuration
        config.load_incluster_config()
    else:
        config.load_kube_config()

    v1_batch = client.BatchV1Api()
    v1_core = client.CoreV1Api()
    # Create Job
    job = client.V1Job(
        api_version="batch/v1",
        kind="Job",
        metadata=client.V1ObjectMeta(generate_name="sen2cor-", namespace="dagster"),
        spec=client.V1JobSpec(
            ttl_seconds_after_finished=86400,
            template=client.V1PodTemplateSpec(
                metadata=client.V1ObjectMeta(labels={"app": "sen2cor"}),
                spec=client.V1PodSpec(
                    containers=[
                        client.V1Container(
                            name="sen2cor",
                            image="dymaxionlabs/sen2cor",
                            args=[
                                el,
                                "--resolution",
                                "10",
                                "--output_dir",
                                output_folder
                            ],
                            volume_mounts=[
                                client.V1VolumeMount(
                                    name="gfs-data",
                                    mount_path="/gfs"
                                )
                            ]
                        )
                    ],
                    restart_policy="Never",
                    volumes=[
                        client.V1Volume(
                            name="gfs-data",
                            persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource(
                                claim_name="dagster-shared-volume"
                            )
                        )
                    ]
                ),
            )
        )
    )

    # Create Job
    api_response = v1_batch.create_namespaced_job("dagster", job)
    <http://context.log.info|context.log.info>("'%s' - Job created." % str(api_response.metadata.name))

    while True:
        job_status = v1_batch.read_namespaced_job_status(name=api_response.metadata.name, namespace="dagster")

        pod_list = v1_core.list_namespaced_pod(namespace="dagster", label_selector="app=sen2cor").items
        pod_list = [pod for pod in pod_list if pod.metadata.name.startswith(api_response.metadata.name)]
        for pod in pod_list:
            # Forward available logs to dagster
            try:
                for line in v1_core.read_namespaced_pod_log(name=pod.metadata.name, namespace='dagster', since_seconds=5).splitlines():
                    <http://context.log.info|context.log.info>(pod.metadata.name + " - " + line)
            except:
                pass

        if job_status.status.succeeded:
            <http://context.log.info|context.log.info>("'%s' - Job completed." % api_response.metadata.name)
            v1_batch.delete_namespaced_job(name=api_response.metadata.name, namespace="dagster")
            break
        elif job_status.status.failed:
            context.log.error("'%s' - Job failed." % api_response.metadata.name)
            v1_batch.delete_namespaced_job(name=api_response.metadata.name, namespace="dagster")
            raise DagsterError("Job failed")
        time.sleep(5)
👍 1
❤️ 4
🙌 1
d

daniel

12/21/2021, 3:56 PM
thanks!
Maybe we can use this as the basis for a more generic op for running k8s jobs in the dagster-k8s library
👍 3
a

Andrea Giardini

12/21/2021, 3:57 PM
It's very much a "dirty but works" kind of thing 😄 if you have any feedback let me know
❤️ 1
m

Manny Schneck

12/21/2021, 7:06 PM
Hah! Much cleaner than my prototype code (we didn't end up productionizing this use case):
Copy code
"""
I don't know how to python docstring can figure out later.
"""
@op(required_resource_keys={'batch_api'})
def k8s_job(context):
    manifest_path = context.op_config['manifest_path']

    with open(manifest_path, 'r') as wsyf:
        job_definition_manifest = yaml.safe_load(wsyf)

    K8S_MAX_JOB_NAME_LENGTH=63
    job_name = job_definition_manifest['metadata']['name'] + '-' + str(uuid.uuid4())
    job_name = job_name[:K8S_MAX_JOB_NAME_LENGTH]
    job_definition_manifest['metadata']['name'] = job_name

    api = context.resources.batch_api

    api.create_namespaced_job(body=job_definition_manifest, namespace='dagster')

    job_completed = False
    while not job_completed:
        api_response = api.read_namespaced_job_status(
            name=job_name,
            namespace="dagster")
        if not api_response:
            raise RuntimeError("api response should not be nil")
        if api_response.status.succeeded:
            job_completed = True
        if api_response.status.failed:
            raise RuntimeError(f"Oh no job failed very sad {api_response.status}")
        time.sleep(1)

def test_octopus_eats_captain():
    ctx = build_op_context(
            config={ "manifest_path": "./whale.yaml" },
            resources={
                "batch_api": batch_api.configured({"incluster": False})
                })
    k8s_job(ctx)
Andrea, that log forwarding is awesome!
🙏 1
r

Roel Hogervorst

01/05/2022, 4:22 PM
Thank you so much @Andrea Giardini, this is super useful for me!
I reworked @Andrea Giardini’s example into a more generic one. But I wonder if this is the way to go for these things, this example has • a jobname that also becomes the name of the container (and enables log copying when you have more than 1 container in a pod • In my kubernetes setup I cannot go outside this namespace but I have a env var that tells me what namespace I'm in • I need to supply resources (limits, requests) to my containers otherwise they will not run The cool thing is that I can use this operator to start different jobs, but you need to give it a lot of config, which feels, dirty?
Copy code
import os
import time
from typing import List

from dagster import In, op, DagsterError
from kubernetes import client, config


@op(
    ins = {
        'jobname':In(str,description="becomes names of container and first part of jobname"),
        "image_name": In(str, description="For instance …"),
        "cmdlineargs": In(List[str], description="The commands that need to run f.i. ['python runthis.py'] "),
        "max_cpu": In(str,default_value="750m"),
        "max_mem": In(str,default_value="256Mi"),
        "env": In(default_value=None, description="use client.V1EnvVar(name='',value=''"),
        "env_from": In(default_value=None, description= "use client.V1envFromSource(config_map_ref, secret_ref)")
        }
)
def create_k8s_job(context, jobname,image_name,cmdlineargs, max_cpu,max_mem, env, env_from):
    """Create and run a kubernetes job.

    For this operator to work we need: A name, a link to the image location and commands that 
    run the container. This operator will create the job on the cluster and
    stream the logs to dagster logging.
    You are not able to add volume mounts to this job.

    For more info about jobs see [kubernetes job docs](<https://kubernetes.io/docs/concepts/workloads/controllers/job/>)
    For more (sparse) info about the python library see [the library docs](<https://github.com/kubernetes-client/python/blob/master/kubernetes/README.md>)
    """
    if os.getenv('KUBERNETES_SERVICE_HOST'):
        context.log.debug("execution happening in kubernetes, so using `load_incluster_config`")
        config.load_incluster_config()
    else:
        context.log.debug("execution happening outside cluster, using `load_kube_config()`")
        config.load_kube_config()

    # Since the bucket and cluster namespace are the same in our case
    # we can use the bucketname as namespace variable too
    namespace = os.getenv('AWS_BUCKET','no-bucket')
    v1_batch = client.BatchV1Api()
    v1_core = client.CoreV1Api()
    # create jobdefinition
    job = client.V1Job(
        api_version="batch/v1",
        kind="Job",
        metadata=client.V1ObjectMeta(generate_name=f"{jobname}-", namespace=namespace),
        spec=client.V1JobSpec(
            # time to live (ttl) is 24 hours, but we delete it earlier when everything is alright
            ttl_seconds_after_finished=86400, 
            template=client.V1PodTemplateSpec(
                metadata=client.V1ObjectMeta(
                    annotations={"job.generated-by":"create_k8s_job"},
                    labels={
                    "app": jobname,
                    "<http://app.kubernetes.io/managed-by|app.kubernetes.io/managed-by>":"dagster"
                    }),
                spec=client.V1PodSpec(
                    containers=[
                        client.V1Container(
                            name=jobname,
                            image=image_name,
                            command=["/bin/sh", "-c"],
                            args=cmdlineargs,
                            resources=client.V1ResourceRequirements(
                                limits={"memory":max_mem, "cpu": max_cpu}, 
                                requests={"memory": "256Mi","cpu": "50m"}
                            ),
                            env=env,
                            env_from=env_from
                        )
                    ],
                    
                    restart_policy="Never",
                    service_account="default-editor"
                ),
            )
        )
    )
    # submit job to cluster
    api_response = v1_batch.create_namespaced_job(namespace, job)
    <http://context.log.info|context.log.info>("'%s' - k8sJob created." % str(api_response.metadata.name))
    # TODO: kill the job when not ready
    # poll and stream logs
    while True:
        job_status = v1_batch.read_namespaced_job_status(name=api_response.metadata.name, namespace=namespace)
        pod_list = v1_core.list_namespaced_pod(namespace=namespace, label_selector=f"app={jobname}").items
        pod_list = [pod for pod in pod_list if pod.metadata.name.startswith(api_response.metadata.name)]
        context.log.debug(f'There are {len(pod_list)} pods running of type "app={jobname}"')
        for pod in pod_list:
            context.log.debug(f'After filtering There are {len(pod_list)} pods running of type "app={jobname}"')
            # Forward available logs to dagster
            try:
                for line in v1_core.read_namespaced_pod_log(name=pod.metadata.name, namespace=namespace, since_seconds=5, container = jobname).splitlines():
                    <http://context.log.info|context.log.info>(pod.metadata.name + " - " + line)
                if api_response.status.conditions:
                    context.log.debug(pod.metadata.name + " - " + api_response.status.conditions)
            except:
                pass

        if job_status.status.succeeded:
            <http://context.log.info|context.log.info>("'%s' - k8sJob completed." % api_response.metadata.name)
            v1_batch.delete_namespaced_job(name=api_response.metadata.name, namespace=namespace)
            break
        elif job_status.status.failed:
            context.log.error("'%s' - k8sJob failed." % api_response.metadata.name)
            v1_batch.delete_namespaced_job(name=api_response.metadata.name, namespace=namespace)
            raise DagsterError("k8sJob failed")
        time.sleep(5)
If you want to run this op in a job I now need to do this:
Copy code
from dagster import job
from ops.kubernetes import create_k8s_job

@job(config={'ops':{"create_k8s_job":{"inputs":{"jobname":aname, "image_name": server/group/images/image:version, "cmdlineargs": "[sleep 3600 & echo 'job done' ]"}}}})
def run_k8s_job():
    create_k8s_job()
I can't help but feel a bit clumsy with this.
d

daniel

01/07/2022, 3:05 PM
Hi Roel - I assume this is for running code in the image that can't be written as a dagster op (e.g. because its not in Python)? Because if it can, another option is to use the k8s_job_executor to have dagster handle spinning up the pod for the op: https://docs.dagster.io/_apidocs/libraries/dagster-k8s#dagster_k8s.k8s_job_executor - that requires the command to be expressable in Python though. You could also use that executor, and have the op run the command in a subprocess (within the pod). But if none of those are possible, i think this kind of op is our best answer for running arbitrary k8s jobs within dagster right now.
r

Roel Hogervorst

01/07/2022, 5:14 PM
Alright thanks! It is indeed a job that is not python on an image that is outside of dagster's control.
I think I will change the INS to config settings that is cleaner
a

Andrea Giardini

01/08/2022, 9:43 AM
@daniel I think it would be nice to document in a gtihub issue the requirements and the expected interface for a "k8s-job-step" that does what we have been discussing in this thread: creating a new job with a custom image not containing dagster but rather any other application. Once we have written the requirements down we can think about doing a PR for it 🙂
r

Roel Hogervorst

01/13/2022, 8:13 AM
🙏 2
d

daniel

01/13/2022, 3:17 PM
Yes sorry for not following up here, thanks for doing that Roel!
a

Andrea Giardini

01/13/2022, 3:17 PM
@daniel Can we get some feedback from Elementl on what would be the preferred interface for this step?
d

daniel

01/13/2022, 3:21 PM
One high level thing is that I think it would be nice if we shared a similar scheme with the existing ways that we have to tag ops with k8s config when using the k8s executor (thinking of the tags here: https://docs.dagster.io/deployment/guides/kubernetes/customizing-your-deployment#job-or-op-kubernetes-configuration - maybe a similar scheme could become the config for the op when it's responsible for spinning up the k8s job?) cc @johann who has also done some thinking about these types of questions)
m

Michel Ebner

02/22/2022, 3:43 PM
Hi guys, crashing into this thread as I am facing a similar issue. My problem: • I want to run a k8s job as an "op" in dagster. Currently it runs using cronjob in k8s. My question: • The
k8s_job_executor
would fit my needs as I can pass along every info I need (image, nemespace, pull secrets...). But can I use it on an "op"? Because other steps are depending on this one. Ex: i have a singer tap which extract the data and sents it to stitch and then to the warehouse. After a 5 min delay I want the corresponding dbt job to start. This my whole job.
I guess, this partly answers my question. Will try to set it up!
7 Views