https://dagster.io/ logo
#dagster-feedback
Title
# dagster-feedback
a

Alex Service

05/08/2022, 7:20 PM
Just curious, has anyone tried creating an IOManager that utilizes zstd or lz4 compression in addition to standard pickling? It almost seems like an option that could be available out of the box. I had an incident where I was sending json data between a couple of ops for transforms and was surprised to see 500GB of disk space disappear 😛
👀 1
y

yuhan

05/09/2022, 5:39 PM
Hey Alex, that’s a great idea - we currently do not have a built-in support for that. would love to hear if anyone from the community has things to share big dag eyes
🎉 1
a

Alex Service

05/09/2022, 6:18 PM
For anyone who’s interested, there are many pages comparing various compression algorithms. What got me thinking about it was this comparison that focused on the two methods I mentioned above
t

Travis Little

05/18/2022, 10:37 PM
I just added bz2 compression to our io_manager. This change reduced the pickled object size by 78%. An example Dataframe, the pickled size is 205 MB and the compressed size is 41 MB resulting in an 80% reduction. I didn't record timing of any of these operations.
Copy code
def load_input(self, context: InputContext):
        path = self._get_path(context.upstream_output)
        context.log.debug(f"Loading Blob object from: {self._uri_for_path(path)}")

        obj_pickle: StorageStreamDownloader = self.container_client.download_blob(
            blob=path
        )

        obj = pickle.loads(bz2.decompress(obj_pickle.readall()))  # type: ignore

        return obj

    def handle_output(self, context: OutputContext, obj: Any):
        path = self._get_path(context)
        blob_client = self.container_client.get_blob_client(path)
        context.log.debug(f"Writing Blob object at: {self._uri_for_path(path)}")

        with io.BytesIO() as pickled_obj_bytes:
            pickled_obj = bz2.compress(pickle.dumps(obj, 4))
            pickled_obj_bytes.write(pickled_obj)
            length = pickled_obj_bytes.tell()
            pickled_obj_bytes.seek(0)
            blob_client.upload_blob(pickled_obj_bytes, length=length, overwrite=True)

        portal_url = self._get_portal_url(context=context)
        azure_blob_path: str = blob_client.url

        metadata = self._get_metadata(azure_blob_path, portal_url, length, obj)

        context.log_event(
            AssetObservation(
                asset_key=AssetKey(self._get_namespace(context)),
                metadata_entries=metadata,
            )
        )
a

Alex Service

05/19/2022, 3:25 PM
Oh I love it, thanks! I hope to have a chance to test a similar approach myself