Eduardo Santizo
04/09/2021, 3:35 PMdaniel
04/09/2021, 3:40 PMsashank
04/09/2021, 3:42 PMEduardo Santizo
04/09/2021, 3:45 PMsashank
04/09/2021, 3:46 PMEduardo Santizo
04/09/2021, 5:02 PMdocker.containers.run()
command to sync up the execution of the solid displayed in dagit with the execution of the container. If I enable the detach
option in that function, Dagit will only register the startup period as the solid execution time. If the pipeline is terminated manually in Dagit, obviously the pipeline would appear halted, but the container would still be running. Is there a way to sync up both the container and the solid that spins it up? Is a new RunLauncher necessary for this?daniel
04/30/2021, 3:16 PMEduardo Santizo
04/30/2021, 3:19 PMdaniel
04/30/2021, 3:22 PMEduardo Santizo
04/30/2021, 3:23 PMdaniel
04/30/2021, 3:27 PMcheck_interval = 5 # Whatever makes sense for your use case
try:
container = docker.run(..., detach = True)
while True:
# check container.status to see if its still running, break if it finished
time.sleep(check_interval)
except:
if container:
container.stop() # Stop container if the solid is interrupted
Eduardo Santizo
04/30/2021, 3:30 PM@solid(
description="Launch an existing docker container from Dagster",
input_defs=[
InputDefinition(name="image_name", dagster_type=String),
InputDefinition(name="image_tag", dagster_type=Optional[String]),
],
required_resource_keys={'docker'},
)
def launch_container(context, image_name, image_tag):
# Get the container name from the image name (all characters after last "/")
container_name = image_name[image_name.rfind('/') + 1:] + "_" + image_tag
# Build the full image name (include the tag)
full_image_name = image_name + ":" + image_tag
# Get list of the names of all the available images
all_images = context.resources.docker.images.list()
available_image_names = [image.tags for image in all_images]
# The list of sub-lists is flattened to create a single flat list
available_image_names = [item for sublist in available_image_names for item in sublist]
# Pull image if its not available locally
if full_image_name not in available_image_names:
<http://context.log.info|context.log.info>("Info: Pulling container from repo.")
# Pull the image from the specified dockerhub repo
context.resources.docker.images.pull(repository=image_name, tag=image_tag)
<http://context.log.info|context.log.info>("Info: Finished container pull.")
# Get list of the names of all the available containers
all_containers = context.resources.docker.containers.list(all=True)
available_container_names = [container.name for container in all_containers]
# Try to run the container.
# If an error or manual termination occurs, stop the container
try:
# If container already exists
if container_name in available_container_names:
# Restart the container
container = context.resources.docker.containers.get(container_name)
container.restart()
<http://context.log.info|context.log.info>("Info: Restarting existing container.")
# Block the program here until the container finishes
container.wait()
# Build and run container if it does not exist
else:
context.resources.docker.containers.run(image=full_image_name, name=container_name)
<http://context.log.info|context.log.info>("Info: Beginning new container execution.")
# Signal that the container has finished
<http://context.log.info|context.log.info>("Info: The container has finished its execution.")
# Get the previously created container
container = context.resources.docker.containers.get(container_name)
# Reload the container's attributes
container.reload()
# Remove the container and image once it exits
if container.status == "exited":
container.remove()
context.resources.docker.images.remove(image=full_image_name)
except:
container = context.resources.docker.containers.get(container_name)
container.stop()
<http://context.log.info|context.log.info>("Info: Stopped container due to an exception")
container.wait()
command.