martin o leary
08/10/2023, 6:07 PMZach
08/10/2023, 6:30 PMmartin o leary
08/10/2023, 6:34 PMZach
08/10/2023, 6:35 PMmartin o leary
08/10/2023, 6:36 PMZach
08/10/2023, 6:37 PMDaniel Gafni
08/11/2023, 7:51 AMUPathIOManager
to easily implement your own filesystem-based IOManager with custom logic. It would work with S3 out of the box too.martin o leary
08/11/2023, 10:18 AMDaniel Gafni
08/11/2023, 10:21 AMDaniel Gafni
08/11/2023, 10:23 AMimport json
from typing import Any, Optional
import dagster._check as check
from dagster import ConfigurableIOManager, InitResourceContext, InputContext, OutputContext, UPathIOManager
from pydantic import Field, PrivateAttr
from upath import UPath
class JSONIOManager(ConfigurableIOManager, UPathIOManager):
base_dir: Optional[str] = Field(default=None, description="Base directory for storing files.")
_base_path: UPath = PrivateAttr()
def setup_for_execution(self, context: InitResourceContext) -> None:
self._base_path = (
UPath(self.base_dir)
if self.base_dir is not None
else UPath(check.not_none(context.instance).storage_directory())
)
def load_from_path(self, context: InputContext, path: UPath) -> str:
with path.open("rb") as file:
return json.loads(file)
def dump_to_path(self, context: OutputContext, obj: Any, path: UPath):
with path.open("wb") as file:
json.dumps(obj, file)
Daniel Gafni
08/11/2023, 10:24 AMUPathIOManager
usage examples in the docs?martin o leary
08/11/2023, 10:30 AMmartin o leary
08/11/2023, 10:31 AM•ands3:
AWS S3 (requiress3a:
to be installed)s3fs
Daniel Gafni
08/11/2023, 10:31 AMS3FileSystem
from s3fs
internally, which is used for all the FS operations