What is the right dagster abstraction if you want ...
# announcements
j
What is the right dagster abstraction if you want to persist a file to cloud storage (ADLS,S3) with a specific path and then use it in a subsequent solid but have it also work when running locally (to file without cloud storage? The file manager doesn't have ability to choose the file name. Is the best approach to have 1 resource name (i.e. adls2) and then using FakeADLS2Resource for local and just be aware that FakeADLS2Resource stores in memory and not the filesystem? or write an alternate version of FakeADLS2Resource that uses files?
a
p
@Jeff Hulbert I wrote a custom IO manager like @alex is suggesting when doing something similar
j
what type do you have the solid return to the IO manager if its a file?
p
my use case was a little different - the data was being extracted from an API and i didn’t want to read it into memory so the solid yields a generator. the IO manager then iterates through it and writes each line to a
.jsonl
file and then uploads that to GCS.
dagsir 1
👍 1
but i imagine you could just return a file path to the IO Manager in your case
m
cc @sandy
s
Hey @Jeff Hulbert here's something we prototyped for a similar use case: https://dagster.phacility.com/D5934 The idea was that the solid would return a byte array or stream of bytes. I'd be happy to talk through this with you on a call if it would be helpful
👀 1
j
Copy code
class CustomPathFileObjectFilesystemIOManager(IOManager):
    def __init__(self, base_dir=None):
        self.base_dir = check.opt_str_param(base_dir, "base_dir")
        self.write_mode = "wb"
        self.read_mode = "rb"

    def _get_path(self, path):
        print(f"path: {path}")
        return os.path.join(self.base_dir, path)

    def handle_output(self, context, obj):
        check.inst_param(context, "context", OutputContext)
        metadata = context.metadata
        print(f"context.metadata{context.metadata}")
        path = check.str_param(metadata.get("path"), "metadata.path")

        filepath = self._get_path(path)

        # Ensure path exists
        mkdir_p(os.path.dirname(filepath))
        context.log.debug(f"Writing file at: {filepath}")

        with open(filepath, self.write_mode) as write_obj, open(obj, self.read_mode) as read_obj:
            shutil.copyfileobj(read_obj, write_obj)

        return AssetMaterialization(
            asset_key=AssetKey([context.pipeline_name, context.step_key, context.name]),
            metadata_entries=[EventMetadataEntry.fspath(os.path.abspath(filepath))],
        )

    def load_input(self, context):
        check.inst_param(context, "context", InputContext)
        metadata = context.upstream_output.metadata
        path = check.str_param(metadata.get("path"), "metadata.path")
        filepath = self._get_path(path)
        context.log.debug(f"Return filepath from: {filepath}")

        return filepath
Here is an IO manager I wrote based on @Prratek Ramchandani suggestion - this is just the file based one and seems to do what I need. Need to make an ADLS2 based version. This is based on, writing a file instead of pickle
Copy code
class CustomPathPickledObjectFilesystemIOManager(IOManager):
Here is a version for ADLS based on the existing pickle version.
Copy code
class CustomPathFileObjectADLS2IOManager(IOManager):
    def __init__(self, file_system, adls2_client, base_dir):
        self.adls2_client = adls2_client
        self.file_system_client = self.adls2_client.get_file_system_client(file_system)
        self.lease_duration = _LEASE_DURATION
        self.base_dir = base_dir

    def _get_local_path(self, key):
        return os.path.join(self.base_dir, key)

    def _rm_object(self, key):
        check.str_param(key, "key")
        check.param_invariant(len(key) > 0, "key")

        # This operates recursively already so is nice and simple.
        self.file_system_client.delete_file(key)

    def _has_object(self, key):
        check.str_param(key, "key")
        check.param_invariant(len(key) > 0, "key")

        try:
            file = self.file_system_client.get_file_client(key)
            file.get_file_properties()
            return True
        except ResourceNotFoundError:
            return False

    def _uri_for_key(self, key, protocol=None):
        check.str_param(key, "key")
        protocol = check.opt_str_param(protocol, "protocol", default="abfss://")
        return "{protocol}{filesystem}@{account}.<http://dfs.core.windows.net/{key}%22.format(|dfs.core.windows.net/{key}".format(>
            protocol=protocol,
            filesystem=self.file_system_client.file_system_name,
            account=self.file_system_client.account_name,
            key=key,
        )

    def load_input(self, context):
        check.inst_param(context, "context", InputContext)
        metadata = context.upstream_output.metadata
        key = check.str_param(metadata.get("path"), "metadata.path")
        context.log.debug(f"Loading ADLS2 object from: {self._uri_for_key(key)}")
        file = self.file_system_client.get_file_client(key)
        filepath = self._get_local_path(key)
        with open(filepath, mode='wb') as fp:
            fp.write(file.download_file())

        return filepath

    def handle_output(self, context, obj):
        check.inst_param(context, "context", OutputContext)
        metadata = context.metadata
        key = check.str_param(metadata.get("path"), "metadata.path")
        context.log.debug(f"Writing ADLS2 object at: {self._uri_for_key(key)}")

        if self._has_object(key):
            context.log.warning(f"Removing existing ADLS2 key: {key}")
            self._rm_object(key)

        file = self.file_system_client.create_file(key)
        with file.acquire_lease(self.lease_duration) as lease:
            file.upload_data(open(obj, mode='rb'), lease=lease, overwrite=True)


@io_manager(
    config_schema={
        "adls2_file_system": Field(StringSource, description="ADLS Gen2 file system name"),
        "base_dir": Field(StringSource, description="Local file path to save files"),
    },
    required_resource_keys={"adls2"},
)
def adls2_file_io_manager(init_context):
    adls_resource = init_context.resources.adls2
    adls2_client = adls_resource.adls2_client
    adls_io_manager = CustomPathFileObjectADLS2IOManager(
        init_context.resource_config["adls2_file_system"],
        adls2_client,
        init_context.resource_config.get("base_dir"),
    )
    return adls_io_manager