Adrian Rumpold
06/17/2021, 10:38 AMdaniel
06/17/2021, 2:01 PMYasin Zaehringer
06/17/2021, 2:26 PMworkspace.yml
- and triggers the GQL call to reload the fileAdrian Rumpold
06/17/2021, 3:10 PMYasin Zaehringer
06/17/2021, 3:20 PMoperator
services which it assumes to be a dagster deployments:# Standard library modules
import logging
import os
import subprocess
import sys
from datetime import timedelta
from itertools import compress
from pathlib import Path
from threading import Thread
from time import sleep
# Third party modules
import requests
import yaml
from kubernetes import client, config
from kubernetes.client import V1Service
from kubernetes.config import ConfigException
logging.basicConfig()
logging.root.setLevel(<http://logging.INFO|logging.INFO>)
logger = logging.getLogger("dagit")
WORKSPACE_YML_PATH = Path(os.environ["WORKSPACE_YML_PATH"])
DAGIT_CMD = ["dagit"]
DAGIT_ARGS = ["-w", str(WORKSPACE_YML_PATH)]
DAGIT_ARGS_IGNORE = {"-w", "/dagster-workspace/workspace.yaml"}
DAGIT_TIMEOUT = timedelta(seconds=10)
WORKSPACE_TIMEOUT = timedelta(minutes=5)
OPERATOR_NAME = "operator"
GQL_ENDPOINT = "<http://127.0.0.1:80/graphql>"
GQL_RELOAD_QUERY = """
mutation ReloadWorkspaceMutation {
reloadWorkspace {
... on Workspace {
locationEntries {
name
id
loadStatus
}
}
}
}
"""
GQL_RELOAD_DATA = {
"query": GQL_RELOAD_QUERY,
"operationName": "ReloadWorkspaceMutation",
"variables": {},
}
def main_dagit() -> None:
"""Start dagit."""
dagit_args = sys.argv[1:]
while dagit_args and dagit_args[-1] in DAGIT_ARGS_IGNORE:
dagit_args.pop(-1)
cmd = DAGIT_CMD + dagit_args + DAGIT_ARGS
<http://logger.info|logger.info>(f"Dagster command: {cmd}")
process = subprocess.Popen(cmd)
while True:
sleep(DAGIT_TIMEOUT.total_seconds())
poll = process.poll()
if poll is not None:
return
def setup_workspace_yml() -> None:
"""Set up the workspace yaml file writer."""
try:
config.load_incluster_config()
<http://logger.info|logger.info>("Using incluster configuration.")
except ConfigException:
config.load_kube_config()
<http://logger.info|logger.info>("Using kubeconfig.")
# Initial write.
_write_workspace_yml()
def _write_workspace_yml() -> None:
"""Overwrite the workspace yaml file."""
v1 = client.CoreV1Api()
services = v1.list_service_for_all_namespaces(watch=False)
service: V1Service
name_to_hosts = {}
for service in services.items:
name, namespace = service.metadata.name, service.metadata.namespace
if name == OPERATOR_NAME:
uri = f"{name}.{namespace}.svc"
name_to_hosts[namespace] = uri
data = {
"load_from": [
{"grpc_server": {"host": host, "port": 3030, "location_name": name}}
for name, host in name_to_hosts.items()
]
}
data_yaml = yaml.safe_dump(data)
<http://logger.info|logger.info>(f"workspace.yml:\n{data_yaml}")
WORKSPACE_YML_PATH.write_text(data_yaml)
def _main_workspace_yml():
"""Updates the workspace yaml file periodically."""
# Reload
_write_workspace_yml()
sleep(5)
r = <http://requests.post|requests.post>(GQL_ENDPOINT, json=GQL_RELOAD_DATA)
<http://logger.info|logger.info>(f"reload: status_code={r.status_code}")
<http://logger.info|logger.info>(f"reload: text={r.text}")
def main_workspace_yml() -> None:
"""Updates the workspace yaml file periodically."""
while True:
sleep(WORKSPACE_TIMEOUT.total_seconds())
_main_workspace_yml()
def main() -> None:
<http://logger.info|logger.info>("Starting up...")
# Setup the workspace.yaml file.
setup_workspace_yml()
# Create the manager threads and start them.
thread_functions = [main_dagit, main_workspace_yml]
threads = [Thread(target=f, daemon=True) for f in thread_functions]
[thread.start() for thread in threads]
# When a thread dies, bring down the whole application.
while True:
sleep(2)
done_status = [not thread.is_alive() for thread in threads]
if any(done_status):
logger.fatal(f"A thread is not alive anymore: done_status={done_status}")
logger.fatal(
f"Failed function(s): {list(compress(thread_functions, done_status))}"
)
logger.fatal("The application will stop now.")
sys.exit(1)
if __name__ == "__main__":
main()
_deployment-dagit.tpl
file to actually use this file