https://dagster.io/ logo
Title
m

Marco

03/03/2023, 5:35 PM
Team, what is the best way to avoid persisting to storage (disk, DB, etc) materialisations that are only required until a given job has finished running? I was thinking to use the mem_io_manager but it does not work with multiprocess execution
s

Spencer Nelson

03/03/2023, 5:38 PM
I don’t have a short answer, but I suspect you could build an IOManager on top of
multiprocessing.shared_memory
(https://docs.python.org/3/library/multiprocessing.shared_memory.html) to achieve this
Some trickiness with that is that you’d need to know how many bytes of shared memory to allocate ahead of time, when you first create the shared memory block
d

Danny Steffy

03/03/2023, 5:46 PM
Would that work for a job being executed via DockerRunLauncher? We have a similar use-case
m

Marco

03/03/2023, 5:47 PM
What I was also thinking is to use standard io managers and instead have a ‘cleanup routine’ when the job exists. Are there dagster functions to facilitate that?
s

Spencer Nelson

03/03/2023, 5:47 PM
@Danny Steffy If the ops are running in the same container, sure. The shared memory space can’t cross container boundaries. I don’t know about DockerRunLauncher but I think it launches all ops in one container
@Marco yes, io managers are resources so you can use them as context managers. You could use that to implement cleanup, kinda like in https://docs.dagster.io/concepts/resources#context-manager-resources
for example, maybe something like this (just a sketch, i haven’t run this anywhere and can’t promise anything about whether it works)
from contextlib import contextmanager
from dagster import io_manager, PickledObjectFilesystemIOManager

@io_manager
@contextmanager
def fs_io_manager_with_cleanup():
    fs_io_mgr = PickledObjectFilesystemIOManager()
    yield fs_io_mgr
    do_cleanup(fs_io_mgr.base_dir)
m

Marco

03/03/2023, 5:58 PM
Thanks @Spencer Nelson - so if I understand correctly you are suggesting to do the cleanup after yielding the object. Would that work when the object is used by several nodes?
s

Spencer Nelson

03/03/2023, 6:04 PM
What do you mean by “nodes” there?
If you’re using the multiprocessing executor it might not be what you want, certainly
An important nuance is that resources are initialized (and torn down) once per process. This means that if using the in-process executor, which runs all steps in a single process, resources will be initialized at the beginning of execution, and torn down after every single step is finished executing. In contrast, when using the multiprocess executor (or other out-of-process executors), where there is a single process for each step, at the beginning of each step execution, the resource will be initialized, and at the end of that step’s execution, the finally block will be run.
from the docs I linked
m

Marco

03/03/2023, 6:07 PM
Nodes=ops in the job
s

Spencer Nelson

03/03/2023, 6:08 PM
Yeah, then it depends on the executor you’re using. It kind of sounds like you’d rather use a single process executor
m

Marco

03/03/2023, 6:10 PM
Ok - sounds like this is not the right solution for me then. What I was thinking is different though. Ie have a end-of-job routine (op) that does the cleanup. As opposed to embed that into the io manager
Maybe I can subclass ‘job’ and have a new decorator that has this additional functionality?
s

Spencer Nelson

03/03/2023, 6:11 PM
Seems like that would work, but it definitely indicates a problem with Dagster’s abstractions if you have to do cleanup IO in an op instead of handling that in IO managers
I should probably hold off and let someone who actually works at Dagster chime in 🙂
m

Marco

03/03/2023, 6:24 PM
Sounds good - thanks for the tips Spenser! I think I’ll hold off any implementation for now, waiting for ‘official’ Dagster feedback
s

sandy

03/03/2023, 11:29 PM
Hey folks - I filed an issue to track adding some way of doing this cleanup: https://github.com/dagster-io/dagster/issues/12707
m

Marco

03/10/2023, 10:47 AM
Great! Any update/plans plans to implement this?