Hey! I have a question. I want to "spin up" multip...
# announcements
e
Hey! I have a question. I want to "spin up" multiple docker containers asynchronously, check when the computation inside has completed and then "spin down" each container as they complete. Is this possible with Dagster?
1
d
Hi Eduardo - this this how the DockerRunLauncher used in this example works: https://docs.dagster.io/deployment/guides/docker#example Each run is launched in a container that is spun down when the run is completed.
s
Hm @Eduardo Santizo are you trying to run Dagster code inside these containers, or do you have pre-built containers that do some generic computation that you want to just execute
Either one is totally possible. @daniel just described how to do the former
e
I think that both will probably be needed. How can you do the latter?
s
The docker python sdk is quite nice: https://docker-py.readthedocs.io/en/stable/ You can just create a container and run a command in it, and trail the logs while it's running, all using the sdk. You can do that all within a solid.
We'll probably ship a solid for this in one of our libraries in the near future, but for now it's not hard to write yourself
e
Thanks a lot @sashank and @daniel! I will look into it. Thanks for the help.
Hey @sashank! I wanted to follow up on this question. I developed a solid similar to what you described (it was surprisingly easy btw), but i have a situation: When the external container is called as a solid, I use the
docker.containers.run()
command to sync up the execution of the solid displayed in dagit with the execution of the container. If I enable the
detach
option in that function, Dagit will only register the startup period as the solid execution time. If the pipeline is terminated manually in Dagit, obviously the pipeline would appear halted, but the container would still be running. Is there a way to sync up both the container and the solid that spins it up? Is a new RunLauncher necessary for this?
d
Hi Eduardo - what's the motivation for enabling the detach option? If you want the solid to start when the container starts and finish when it finishes, wouldn't keeping it attached give you that for free?
e
Yes, the detach option was only a test, but the second situation does pop up from time to time, so I wanted to know what I could do.
d
By 'the second situation' do you mean having the solid terminate the container when the pipeline is terminated?
e
Yep, that issue
d
Got it. I'm a little surprised docker doesn't already work that way actually (stopping the container for you if the run() call is interrupted). You could do something like this (pesudocode):
Copy code
check_interval = 5 # Whatever makes sense for your use case
try:
   container = docker.run(..., detach = True)

   while True:
      # check container.status to see if its still running, break if it finished
      time.sleep(check_interval)
except:
   if container:
      container.stop() # Stop container if the solid is interrupted
e
Thanks @daniel! I will report back when i get it to work.
condagster 1
Thank you @daniel, this solution worked perfectly! Here is my final solid for anyone interested:
Copy code
@solid(
    description="Launch an existing docker container from Dagster",
    input_defs=[
        InputDefinition(name="image_name", dagster_type=String),
        InputDefinition(name="image_tag", dagster_type=Optional[String]),
    ],
    required_resource_keys={'docker'},
)
def launch_container(context, image_name, image_tag):

    # Get the container name from the image name (all characters after last "/")
    container_name = image_name[image_name.rfind('/') + 1:] + "_" + image_tag

    # Build the full image name (include the tag)
    full_image_name = image_name + ":" + image_tag

    # Get list of the names of all the available images
    all_images = context.resources.docker.images.list()
    available_image_names = [image.tags for image in all_images]

    # The list of sub-lists is flattened to create a single flat list
    available_image_names = [item for sublist in available_image_names for item in sublist]

    # Pull image if its not available locally
    if full_image_name not in available_image_names:

        <http://context.log.info|context.log.info>("Info: Pulling container from repo.")

        # Pull the image from the specified dockerhub repo
        context.resources.docker.images.pull(repository=image_name, tag=image_tag)

        <http://context.log.info|context.log.info>("Info: Finished container pull.")

    # Get list of the names of all the available containers
    all_containers = context.resources.docker.containers.list(all=True)
    available_container_names = [container.name for container in all_containers]

    # Try to run the container.
    # If an error or manual termination occurs, stop the container
    try:

        # If container already exists
        if container_name in available_container_names:

            # Restart the container
            container = context.resources.docker.containers.get(container_name)
            container.restart()
            <http://context.log.info|context.log.info>("Info: Restarting existing container.")

            # Block the program here until the container finishes
            container.wait()

        # Build and run container if it does not exist
        else:
            context.resources.docker.containers.run(image=full_image_name, name=container_name)
            <http://context.log.info|context.log.info>("Info: Beginning new container execution.")

        # Signal that the container has finished
        <http://context.log.info|context.log.info>("Info: The container has finished its execution.")

        # Get the previously created container
        container = context.resources.docker.containers.get(container_name)

        # Reload the container's attributes
        container.reload()

        # Remove the container and image once it exits
        if container.status == "exited":
            container.remove()
            context.resources.docker.images.remove(image=full_image_name)

    except:
        container = context.resources.docker.containers.get(container_name)
        container.stop()
        <http://context.log.info|context.log.info>("Info: Stopped container due to an exception")
👏 1
🙏 1
Also a note: Apparently one can call the detach mode, do something and then "re-block" the program until the container finishes by using the
container.wait()
command.
🎉 1