https://dagster.io/ logo
#ask-community
Title
# ask-community
a

Adrian Rumpold

06/17/2021, 10:38 AM
Hi all! I was wondering if Dagster supports dynamic discovery of user deployments in Kubernetes, e.g. based on annotations on a pod or service exposing the gRPC API? I've read through the docs on how to separate the Dagster infrastructure and user code, but that approach currently requires specifying the hosts and ports of the user code deployments as part of the Dagster deployment (see https://docs.dagster.io/deployment/guides/kubernetes/customizing-your-deployment#separately-deploying-dagster-infrastructure-and-user-code)
d

daniel

06/17/2021, 2:01 PM
Hi Adrian - this is a good idea that's come up before and that we'd like to do in the future, but isn't currently supported. I'd be curious about your exact use case, do you imagine frequently spinning new user code deployments up and down?
y

Yasin Zaehringer

06/17/2021, 2:26 PM
I have a process running in parallel which monitors the cluster and based on that writes the
workspace.yml
- and triggers the GQL call to reload the file
a

Adrian Rumpold

06/17/2021, 3:10 PM
Thanks for your quick response, @daniel! The scenario I currently have in mind is being able to dynamically spin up user code deployments from a CI pipeline (e.g. on individual feature branches), execute some workflows, and tear down the user repo deployment afterwards. If the Dagster deployment was able to automatically discover these user code deployments, that would be great. 🙂
@Yasin Zaehringer that sounds promising! Would you be able to share your script? I was thinking of hacking something similar in small app, but would like to avoid reinventing the wheel
y

Yasin Zaehringer

06/17/2021, 3:20 PM
A few caveats first, I had to copy the dagster helm chart to modify it accordingly and it is a bit of a pain to set it up correctly - this code searches all kubernetes cluster namespaces and looks for
operator
services which it assumes to be a dagster deployments:
Copy code
# Standard library modules
import logging
import os
import subprocess
import sys
from datetime import timedelta
from itertools import compress
from pathlib import Path
from threading import Thread
from time import sleep

# Third party modules
import requests
import yaml
from kubernetes import client, config
from kubernetes.client import V1Service
from kubernetes.config import ConfigException

logging.basicConfig()
logging.root.setLevel(<http://logging.INFO|logging.INFO>)
logger = logging.getLogger("dagit")

WORKSPACE_YML_PATH = Path(os.environ["WORKSPACE_YML_PATH"])

DAGIT_CMD = ["dagit"]
DAGIT_ARGS = ["-w", str(WORKSPACE_YML_PATH)]
DAGIT_ARGS_IGNORE = {"-w", "/dagster-workspace/workspace.yaml"}
DAGIT_TIMEOUT = timedelta(seconds=10)
WORKSPACE_TIMEOUT = timedelta(minutes=5)
OPERATOR_NAME = "operator"


GQL_ENDPOINT = "<http://127.0.0.1:80/graphql>"
GQL_RELOAD_QUERY = """
mutation ReloadWorkspaceMutation {
  reloadWorkspace {
    ... on Workspace {
      locationEntries {
        name
        id
        loadStatus
      }
    }
  }
}
"""
GQL_RELOAD_DATA = {
    "query": GQL_RELOAD_QUERY,
    "operationName": "ReloadWorkspaceMutation",
    "variables": {},
}


def main_dagit() -> None:
    """Start dagit."""

    dagit_args = sys.argv[1:]
    while dagit_args and dagit_args[-1] in DAGIT_ARGS_IGNORE:
        dagit_args.pop(-1)
    cmd = DAGIT_CMD + dagit_args + DAGIT_ARGS
    <http://logger.info|logger.info>(f"Dagster command: {cmd}")

    process = subprocess.Popen(cmd)

    while True:
        sleep(DAGIT_TIMEOUT.total_seconds())
        poll = process.poll()
        if poll is not None:
            return


def setup_workspace_yml() -> None:
    """Set up the workspace yaml file writer."""

    try:
        config.load_incluster_config()
        <http://logger.info|logger.info>("Using incluster configuration.")
    except ConfigException:
        config.load_kube_config()
        <http://logger.info|logger.info>("Using kubeconfig.")

    # Initial write.
    _write_workspace_yml()


def _write_workspace_yml() -> None:
    """Overwrite the workspace yaml file."""

    v1 = client.CoreV1Api()
    services = v1.list_service_for_all_namespaces(watch=False)
    service: V1Service
    name_to_hosts = {}
    for service in services.items:
        name, namespace = service.metadata.name, service.metadata.namespace
        if name == OPERATOR_NAME:
            uri = f"{name}.{namespace}.svc"
            name_to_hosts[namespace] = uri

    data = {
        "load_from": [
            {"grpc_server": {"host": host, "port": 3030, "location_name": name}}
            for name, host in name_to_hosts.items()
        ]
    }
    data_yaml = yaml.safe_dump(data)
    <http://logger.info|logger.info>(f"workspace.yml:\n{data_yaml}")
    WORKSPACE_YML_PATH.write_text(data_yaml)


def _main_workspace_yml():
    """Updates the workspace yaml file periodically."""

    # Reload
    _write_workspace_yml()
    sleep(5)

    r = <http://requests.post|requests.post>(GQL_ENDPOINT, json=GQL_RELOAD_DATA)
    <http://logger.info|logger.info>(f"reload: status_code={r.status_code}")
    <http://logger.info|logger.info>(f"reload: text={r.text}")


def main_workspace_yml() -> None:
    """Updates the workspace yaml file periodically."""

    while True:
        sleep(WORKSPACE_TIMEOUT.total_seconds())
        _main_workspace_yml()


def main() -> None:
    <http://logger.info|logger.info>("Starting up...")

    # Setup the workspace.yaml file.
    setup_workspace_yml()

    # Create the manager threads and start them.
    thread_functions = [main_dagit, main_workspace_yml]
    threads = [Thread(target=f, daemon=True) for f in thread_functions]
    [thread.start() for thread in threads]

    # When a thread dies, bring down the whole application.
    while True:
        sleep(2)
        done_status = [not thread.is_alive() for thread in threads]
        if any(done_status):
            logger.fatal(f"A thread is not alive anymore: done_status={done_status}")
            logger.fatal(
                f"Failed function(s): {list(compress(thread_functions, done_status))}"
            )
            logger.fatal("The application will stop now.")
            sys.exit(1)


if __name__ == "__main__":
    main()
What is missing: • One also has to set up a cluster binding role for the service with the list services permission to find all the services • one has to build a Docker image with this file installed • one has to download the helm chart and modify the
_deployment-dagit.tpl
file to actually use this file
6 Views