Andrea Giardini
12/14/2021, 10:57 AMdaniel
12/14/2021, 1:17 PMdaniel
12/14/2021, 1:18 PMAndrea Giardini
12/14/2021, 1:20 PMAndrea Giardini
12/17/2021, 9:06 AMAndrea Giardini
12/17/2021, 1:09 PMwhile True:
job_status = v1_batch.read_namespaced_job_status(name=api_response.metadata.name, namespace="dagster")
pod_list = v1_core.list_namespaced_pod(namespace="dagster", label_selector="app=sen2cor").items
pod_list = [pod for pod in pod_list if pod.metadata.name.startswith(api_response.metadata.name)]
for pod in pod_list:
# Forward available logs to dagster
try:
for line in v1_core.read_namespaced_pod_log(name=pod.metadata.name, namespace='dagster', since_seconds=5).splitlines():
<http://context.log.info|context.log.info>(pod.metadata.name + " - " + line)
except:
pass
if job_status.status.succeeded:
<http://context.log.info|context.log.info>("'%s' - Job completed." % api_response.metadata.name)
v1_batch.delete_namespaced_job(name=api_response.metadata.name, namespace="dagster")
break
elif job_status.status.failed:
context.log.error("'%s' - Job failed." % api_response.metadata.name)
v1_batch.delete_namespaced_job(name=api_response.metadata.name, namespace="dagster")
raise DagsterError("Job failed")
time.sleep(5)
Not as elegant as I would have liked, but it works 🙂daniel
12/17/2021, 1:18 PM