Hello everyone Is it possible to have a dagster Op...
# deployment-kubernetes
a
Hello everyone Is it possible to have a dagster Op in kubernetes defined with a docker image and a command? I need to run the same command with a lot of multiple files and I would like to have a dagster Op to list all those files (I will write it myself) and then use dagster to create a new Op for every file in the list, with a custom docker image and command.
d
Hi Andrea - you can absolutely write an op that launches a kubernetes pod/job using the kubernetes python client. It's not something we have a built-in helper for yet, but I think it would not be hard to write
a
Thank you! I will check it out!
Hey @daniel I successfully managed to run K8s jobs from dagster and I was wondering if it would be possible to forward job logs to dagster. Haven't found a way of doing it yet but i'm digging into the dagster-k8s project
FYI I managed to get it working with something like this:
Copy code
while True:
        job_status = v1_batch.read_namespaced_job_status(name=api_response.metadata.name, namespace="dagster")

        pod_list = v1_core.list_namespaced_pod(namespace="dagster", label_selector="app=sen2cor").items
        pod_list = [pod for pod in pod_list if pod.metadata.name.startswith(api_response.metadata.name)]
        for pod in pod_list:
            # Forward available logs to dagster
            try:
                for line in v1_core.read_namespaced_pod_log(name=pod.metadata.name, namespace='dagster', since_seconds=5).splitlines():
                    <http://context.log.info|context.log.info>(pod.metadata.name + " - " + line)
            except:
                pass

        if job_status.status.succeeded:
            <http://context.log.info|context.log.info>("'%s' - Job completed." % api_response.metadata.name)
            v1_batch.delete_namespaced_job(name=api_response.metadata.name, namespace="dagster")
            break
        elif job_status.status.failed:
            context.log.error("'%s' - Job failed." % api_response.metadata.name)
            v1_batch.delete_namespaced_job(name=api_response.metadata.name, namespace="dagster")
            raise DagsterError("Job failed")
        time.sleep(5)
Not as elegant as I would have liked, but it works 🙂
d
That code is more or less what i was going to suggest 🙂 Yeah, the downside of having the op call the k8s api as opposed to taking advantage of, say, the k8s_job_executor (https://docs.dagster.io/_modules/dagster_k8s/executor#k8s_job_executor) to spin up a pod for each op, is that you lose some of the observability/programming model benefits and have to manage the logs and k8s lifecycle management yourself. But sometimes its unavoidable (for example, if you want to run a non-python command). If your op is generalizable I think it would make a great PR, but no pressure of course
🎉 1