Binoy Shah
06/06/2022, 9:22 PMdagster
and user-deployments
We need K8sJobExecutor to be customized for our deployments.
Some Customizations are required due to infra needs
• Kubernetes Annotations for vault Integration
• Secrets management with Vault and Consul
• New Relic APM annotations
Came to know about the @job
annotations/customizations from https://dagster.slack.com/archives/C014N0PK37E/p1654273239231269
I am not sure if there is a pythonic way to make couple standard pre-defined extensions to the @job
definitions
ie.
• @heavyjob
is predefined instance of @job
with atleast { 8G memory, all required annotations and some secrets }
• @lightjob
is predefined instance of @job
with { 512M memory, annotations, TTL of 10 seconds, etc}
• otherjob
etc, etc
This way our code is not cluttered with extensive @job
annotations and it becomes cleaner and more readable
Currently code appears bit clunky
@job(
tags={
"dagster-k8s/config": {
"pod_template_spec_metadata": {
"annotations":
{"<http://vault.security.banzaicloud.io/enable-json-log|vault.security.banzaicloud.io/enable-json-log>": "true",
"<http://vault.security.banzaicloud.io/vault-addr|vault.security.banzaicloud.io/vault-addr>": "<https://vault.blah.com>",
"<http://vault.security.banzaicloud.io/vault-env-daemon|vault.security.banzaicloud.io/vault-env-daemon>": "true",
"<http://vault.security.banzaicloud.io/vault-path|vault.security.banzaicloud.io/vault-path>": "<http://qak8s.blah.com|qak8s.blah.com>",
"<http://vault.security.banzaicloud.io/vault-role|vault.security.banzaicloud.io/vault-role>": "qak8s-services-role",
"<http://vault.security.banzaicloud.io/vault-skip-verify|vault.security.banzaicloud.io/vault-skip-verify>": "false",
"<http://vault.security.banzaicloud.io/vault-tls-secret|vault.security.banzaicloud.io/vault-tls-secret>": "vault-ca"}
},
"job_spec_config": {
"ttl_seconds_after_finished": 300
}
},
},
)
def classify_cereal_by_nutrition():
cereals = download_cereals()
display_results(
most_calories=find_highest_calorie_cereal(cereals),
most_protein=find_highest_protein_cereal(cereals),
most_sugar=find_highest_sugar_cereal(cereals)
)
<http://logging.info|logging.info>(f"Cereal Categorization complete")
OR have a baseJobSpec
configuration of K8s Job Runner to be embedded in values.yaml
file such that we can tweak it directly in our CI/CD based deployments.
Any thoughts / hints ?daniel
06/06/2022, 9:27 PMBinoy Shah
06/06/2022, 9:40 PM@my_op_factory
annotation instead ? or @job_factory
daniel
06/06/2022, 10:24 PMStephen Bailey
06/07/2022, 12:58 AM@heavyjob
, but just passing in kwargs. It's a useful pattern and works for kube spec, other tags, hooks, etc
from my_repo.utils import heavy_job_spec
@job(**heavy_job_spec)
def my_job():
...
We also have a function version that will merge with default specs, so you can customize per run too. something like
def default_job_tags(tags={}):
default_tags = {"foo": "bar"}
return default_tags.update(tags)
Stephen Bailey
06/07/2022, 1:01 AMdef default_job_spec(tags={}):
default_tags = {"foo":"bar"}
return {**default_tags, **tags}