https://dagster.io/ logo
#dagster-support
Title
# dagster-support
a

Andrew Smith

09/06/2022, 4:23 PM
There are a few threads in this channel's history about using a temporary directory for an IO manager. This way, once the job is complete, all files on disk are removed for one reason or another. Based on these threads I created the following IO Manager:
Copy code
@io_manager(config_schema={"base_dir": Field(StringSource, is_required=False)})
def tempdir_io_manager(init_context):
    base_dir = init_context.resource_config.get(
        "base_dir", init_context.instance.storage_directory()
    )

    with TemporaryDirectory(dir=base_dir) as temp_dir:
        yield PickledObjectFilesystemIOManager(base_dir=temp_dir)
This IO manager is then assigned to a job using
resource_defs
on a graph
to_job
method, like so:
Copy code
table_migration_job = update_latest_tables.to_job(
            name=f"TM_{table}",
            resource_defs={
                "source_db": sql_resources.sql_res,
                "sink_db": sql_resources.sql_res,
                "fs": io_managers.tempdir_io_manager
            },
            config=configs[table],
        )
However, it seems that the temporary directory isn't kept open for the entire graph execution. How would I properly yield the temporary directory so that each step in the graph utilizes the temp directory?
y

yuhan

09/07/2022, 12:13 AM
Hi Andrew, this is because by default a job will be run by the multi process executor, meaning every step will be executed in a separate process, where the resource will be init’ed in each process. so when you have the temp dir created in the resource body, every step/op will have a different temp dir. there’re several ways to get this working: • switch to in process executor so the tempdir_io_manager only init once:
@job(resource_defs={"io_manager": tempdir_io_manager}, executor_def=in_process_executor)
• if it’s in a unit test, pass the temp dir as a config to the io manager (example)
7 Views