raul
12/21/2021, 3:34 PMdaniel
12/21/2021, 3:37 PM@op(
tags = {
'dagster-k8s/config': {
'container_config': {
'image': 'foobar:latest'
},
},
},
)
daniel
12/21/2021, 3:37 PMdaniel
12/21/2021, 3:38 PMDagster Bot
12/21/2021, 3:38 PMraul
12/21/2021, 3:45 PMdaniel
12/21/2021, 3:46 PMdaniel
12/21/2021, 3:47 PMraul
12/21/2021, 3:48 PMdaniel
12/21/2021, 3:50 PMAndrea Giardini
12/21/2021, 3:54 PM@op(
ins={
"output_folder": In(str)
}
)
def create_k8s_job(context, el, output_folder):
from kubernetes import client, config
if os.getenv('KUBERNETES_SERVICE_HOST'):
# If I am running in Kubernetes, use the "in cluster" configuration
config.load_incluster_config()
else:
config.load_kube_config()
v1_batch = client.BatchV1Api()
v1_core = client.CoreV1Api()
# Create Job
job = client.V1Job(
api_version="batch/v1",
kind="Job",
metadata=client.V1ObjectMeta(generate_name="sen2cor-", namespace="dagster"),
spec=client.V1JobSpec(
ttl_seconds_after_finished=86400,
template=client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(labels={"app": "sen2cor"}),
spec=client.V1PodSpec(
containers=[
client.V1Container(
name="sen2cor",
image="dymaxionlabs/sen2cor",
args=[
el,
"--resolution",
"10",
"--output_dir",
output_folder
],
volume_mounts=[
client.V1VolumeMount(
name="gfs-data",
mount_path="/gfs"
)
]
)
],
restart_policy="Never",
volumes=[
client.V1Volume(
name="gfs-data",
persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource(
claim_name="dagster-shared-volume"
)
)
]
),
)
)
)
# Create Job
api_response = v1_batch.create_namespaced_job("dagster", job)
<http://context.log.info|context.log.info>("'%s' - Job created." % str(api_response.metadata.name))
while True:
job_status = v1_batch.read_namespaced_job_status(name=api_response.metadata.name, namespace="dagster")
pod_list = v1_core.list_namespaced_pod(namespace="dagster", label_selector="app=sen2cor").items
pod_list = [pod for pod in pod_list if pod.metadata.name.startswith(api_response.metadata.name)]
for pod in pod_list:
# Forward available logs to dagster
try:
for line in v1_core.read_namespaced_pod_log(name=pod.metadata.name, namespace='dagster', since_seconds=5).splitlines():
<http://context.log.info|context.log.info>(pod.metadata.name + " - " + line)
except:
pass
if job_status.status.succeeded:
<http://context.log.info|context.log.info>("'%s' - Job completed." % api_response.metadata.name)
v1_batch.delete_namespaced_job(name=api_response.metadata.name, namespace="dagster")
break
elif job_status.status.failed:
context.log.error("'%s' - Job failed." % api_response.metadata.name)
v1_batch.delete_namespaced_job(name=api_response.metadata.name, namespace="dagster")
raise DagsterError("Job failed")
time.sleep(5)
daniel
12/21/2021, 3:56 PMdaniel
12/21/2021, 3:56 PMAndrea Giardini
12/21/2021, 3:57 PMManny Schneck
12/21/2021, 7:06 PM"""
I don't know how to python docstring can figure out later.
"""
@op(required_resource_keys={'batch_api'})
def k8s_job(context):
manifest_path = context.op_config['manifest_path']
with open(manifest_path, 'r') as wsyf:
job_definition_manifest = yaml.safe_load(wsyf)
K8S_MAX_JOB_NAME_LENGTH=63
job_name = job_definition_manifest['metadata']['name'] + '-' + str(uuid.uuid4())
job_name = job_name[:K8S_MAX_JOB_NAME_LENGTH]
job_definition_manifest['metadata']['name'] = job_name
api = context.resources.batch_api
api.create_namespaced_job(body=job_definition_manifest, namespace='dagster')
job_completed = False
while not job_completed:
api_response = api.read_namespaced_job_status(
name=job_name,
namespace="dagster")
if not api_response:
raise RuntimeError("api response should not be nil")
if api_response.status.succeeded:
job_completed = True
if api_response.status.failed:
raise RuntimeError(f"Oh no job failed very sad {api_response.status}")
time.sleep(1)
def test_octopus_eats_captain():
ctx = build_op_context(
config={ "manifest_path": "./whale.yaml" },
resources={
"batch_api": batch_api.configured({"incluster": False})
})
k8s_job(ctx)
Manny Schneck
12/21/2021, 7:08 PMRoel Hogervorst
01/05/2022, 4:22 PMRoel Hogervorst
01/07/2022, 7:57 AMimport os
import time
from typing import List
from dagster import In, op, DagsterError
from kubernetes import client, config
@op(
ins = {
'jobname':In(str,description="becomes names of container and first part of jobname"),
"image_name": In(str, description="For instance …"),
"cmdlineargs": In(List[str], description="The commands that need to run f.i. ['python runthis.py'] "),
"max_cpu": In(str,default_value="750m"),
"max_mem": In(str,default_value="256Mi"),
"env": In(default_value=None, description="use client.V1EnvVar(name='',value=''"),
"env_from": In(default_value=None, description= "use client.V1envFromSource(config_map_ref, secret_ref)")
}
)
def create_k8s_job(context, jobname,image_name,cmdlineargs, max_cpu,max_mem, env, env_from):
"""Create and run a kubernetes job.
For this operator to work we need: A name, a link to the image location and commands that
run the container. This operator will create the job on the cluster and
stream the logs to dagster logging.
You are not able to add volume mounts to this job.
For more info about jobs see [kubernetes job docs](<https://kubernetes.io/docs/concepts/workloads/controllers/job/>)
For more (sparse) info about the python library see [the library docs](<https://github.com/kubernetes-client/python/blob/master/kubernetes/README.md>)
"""
if os.getenv('KUBERNETES_SERVICE_HOST'):
context.log.debug("execution happening in kubernetes, so using `load_incluster_config`")
config.load_incluster_config()
else:
context.log.debug("execution happening outside cluster, using `load_kube_config()`")
config.load_kube_config()
# Since the bucket and cluster namespace are the same in our case
# we can use the bucketname as namespace variable too
namespace = os.getenv('AWS_BUCKET','no-bucket')
v1_batch = client.BatchV1Api()
v1_core = client.CoreV1Api()
# create jobdefinition
job = client.V1Job(
api_version="batch/v1",
kind="Job",
metadata=client.V1ObjectMeta(generate_name=f"{jobname}-", namespace=namespace),
spec=client.V1JobSpec(
# time to live (ttl) is 24 hours, but we delete it earlier when everything is alright
ttl_seconds_after_finished=86400,
template=client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(
annotations={"job.generated-by":"create_k8s_job"},
labels={
"app": jobname,
"<http://app.kubernetes.io/managed-by|app.kubernetes.io/managed-by>":"dagster"
}),
spec=client.V1PodSpec(
containers=[
client.V1Container(
name=jobname,
image=image_name,
command=["/bin/sh", "-c"],
args=cmdlineargs,
resources=client.V1ResourceRequirements(
limits={"memory":max_mem, "cpu": max_cpu},
requests={"memory": "256Mi","cpu": "50m"}
),
env=env,
env_from=env_from
)
],
restart_policy="Never",
service_account="default-editor"
),
)
)
)
# submit job to cluster
api_response = v1_batch.create_namespaced_job(namespace, job)
<http://context.log.info|context.log.info>("'%s' - k8sJob created." % str(api_response.metadata.name))
# TODO: kill the job when not ready
# poll and stream logs
while True:
job_status = v1_batch.read_namespaced_job_status(name=api_response.metadata.name, namespace=namespace)
pod_list = v1_core.list_namespaced_pod(namespace=namespace, label_selector=f"app={jobname}").items
pod_list = [pod for pod in pod_list if pod.metadata.name.startswith(api_response.metadata.name)]
context.log.debug(f'There are {len(pod_list)} pods running of type "app={jobname}"')
for pod in pod_list:
context.log.debug(f'After filtering There are {len(pod_list)} pods running of type "app={jobname}"')
# Forward available logs to dagster
try:
for line in v1_core.read_namespaced_pod_log(name=pod.metadata.name, namespace=namespace, since_seconds=5, container = jobname).splitlines():
<http://context.log.info|context.log.info>(pod.metadata.name + " - " + line)
if api_response.status.conditions:
context.log.debug(pod.metadata.name + " - " + api_response.status.conditions)
except:
pass
if job_status.status.succeeded:
<http://context.log.info|context.log.info>("'%s' - k8sJob completed." % api_response.metadata.name)
v1_batch.delete_namespaced_job(name=api_response.metadata.name, namespace=namespace)
break
elif job_status.status.failed:
context.log.error("'%s' - k8sJob failed." % api_response.metadata.name)
v1_batch.delete_namespaced_job(name=api_response.metadata.name, namespace=namespace)
raise DagsterError("k8sJob failed")
time.sleep(5)
If you want to run this op in a job I now need to do this:
from dagster import job
from ops.kubernetes import create_k8s_job
@job(config={'ops':{"create_k8s_job":{"inputs":{"jobname":aname, "image_name": server/group/images/image:version, "cmdlineargs": "[sleep 3600 & echo 'job done' ]"}}}})
def run_k8s_job():
create_k8s_job()
I can't help but feel a bit clumsy with this.daniel
01/07/2022, 3:05 PMRoel Hogervorst
01/07/2022, 5:14 PMRoel Hogervorst
01/07/2022, 5:14 PMAndrea Giardini
01/08/2022, 9:43 AMRoel Hogervorst
01/13/2022, 8:13 AMdaniel
01/13/2022, 3:17 PMAndrea Giardini
01/13/2022, 3:17 PMdaniel
01/13/2022, 3:21 PMMichel Ebner
02/22/2022, 3:43 PMk8s_job_executor
would fit my needs as I can pass along every info I need (image, nemespace, pull secrets...). But can I use it on an "op"? Because other steps are depending on this one. Ex: i have a singer tap which extract the data and sents it to stitch and then to the warehouse. After a 5 min delay I want the corresponding dbt job to start. This my whole job.Michel Ebner
02/22/2022, 3:59 PM