Mycchaka Kleinbort
03/11/2024, 5:13 PMpython
'io_manager': RemoteIOManagerWithLocalCache(base_dir='<az://dgster-assets/>', cachedir='<c://dagster-cachedir/>')
Chris Roth
03/12/2024, 8:54 PMimport pandas as pd
from upath import UPath
from dagster import (
Field,
InitResourceContext,
InputContext,
OutputContext,
UPathIOManager,
io_manager,
)
class PandasParquetIOManager(UPathIOManager):
extension: str = ".parquet"
def dump_to_path(self, context: OutputContext, obj: pd.DataFrame, path: UPath):
with path.open("wb") as file:
obj.to_parquet(file)
def load_from_path(self, context: InputContext, path: UPath) -> pd.DataFrame:
print("loading file...")
with path.open("rb") as file:
return pd.read_parquet(file)
@io_manager(config_schema={"base_path": Field(str, is_required=False)})
def local_pandas_parquet_io_manager(
init_context: InitResourceContext,
) -> PandasParquetIOManager:
assert init_context.instance is not None # to please mypy
base_path = UPath(
init_context.resource_config.get(
"base_path", init_context.instance.storage_directory()
)
)
return PandasParquetIOManager(base_path=base_path)
Chris Roth
03/12/2024, 8:55 PMcachedir
to the config_schema
and then edit your dump_to_path and load_from_path according to the cachedir logic you want.Mycchaka Kleinbort
03/13/2024, 11:18 PMMycchaka Kleinbort
03/13/2024, 11:19 PMMycchaka Kleinbort
03/13/2024, 11:19 PM