Andrew Smith
09/06/2022, 4:23 PM@io_manager(config_schema={"base_dir": Field(StringSource, is_required=False)})
def tempdir_io_manager(init_context):
base_dir = init_context.resource_config.get(
"base_dir", init_context.instance.storage_directory()
)
with TemporaryDirectory(dir=base_dir) as temp_dir:
yield PickledObjectFilesystemIOManager(base_dir=temp_dir)
This IO manager is then assigned to a job using resource_defs
on a graph to_job
method, like so:
table_migration_job = update_latest_tables.to_job(
name=f"TM_{table}",
resource_defs={
"source_db": sql_resources.sql_res,
"sink_db": sql_resources.sql_res,
"fs": io_managers.tempdir_io_manager
},
config=configs[table],
)
However, it seems that the temporary directory isn't kept open for the entire graph execution. How would I properly yield the temporary directory so that each step in the graph utilizes the temp directory?yuhan
09/07/2022, 12:13 AM@job(resource_defs={"io_manager": tempdir_io_manager}, executor_def=in_process_executor)
• if it’s in a unit test, pass the temp dir as a config to the io manager (example)