Just curious, has anyone tried creating an IOManag...
# dagster-feedback
a
Just curious, has anyone tried creating an IOManager that utilizes zstd or lz4 compression in addition to standard pickling? It almost seems like an option that could be available out of the box. I had an incident where I was sending json data between a couple of ops for transforms and was surprised to see 500GB of disk space disappear 😛
👀 1
y
Hey Alex, that’s a great idea - we currently do not have a built-in support for that. would love to hear if anyone from the community has things to share big dag eyes
🎉 1
a
For anyone who’s interested, there are many pages comparing various compression algorithms. What got me thinking about it was this comparison that focused on the two methods I mentioned above
t
I just added bz2 compression to our io_manager. This change reduced the pickled object size by 78%. An example Dataframe, the pickled size is 205 MB and the compressed size is 41 MB resulting in an 80% reduction. I didn't record timing of any of these operations.
Copy code
def load_input(self, context: InputContext):
        path = self._get_path(context.upstream_output)
        context.log.debug(f"Loading Blob object from: {self._uri_for_path(path)}")

        obj_pickle: StorageStreamDownloader = self.container_client.download_blob(
            blob=path
        )

        obj = pickle.loads(bz2.decompress(obj_pickle.readall()))  # type: ignore

        return obj

    def handle_output(self, context: OutputContext, obj: Any):
        path = self._get_path(context)
        blob_client = self.container_client.get_blob_client(path)
        context.log.debug(f"Writing Blob object at: {self._uri_for_path(path)}")

        with io.BytesIO() as pickled_obj_bytes:
            pickled_obj = bz2.compress(pickle.dumps(obj, 4))
            pickled_obj_bytes.write(pickled_obj)
            length = pickled_obj_bytes.tell()
            pickled_obj_bytes.seek(0)
            blob_client.upload_blob(pickled_obj_bytes, length=length, overwrite=True)

        portal_url = self._get_portal_url(context=context)
        azure_blob_path: str = blob_client.url

        metadata = self._get_metadata(azure_blob_path, portal_url, length, obj)

        context.log_event(
            AssetObservation(
                asset_key=AssetKey(self._get_namespace(context)),
                metadata_entries=metadata,
            )
        )
a
Oh I love it, thanks! I hope to have a chance to test a similar approach myself