Alex Service
05/08/2022, 7:20 PMyuhan
05/09/2022, 5:39 PMAlex Service
05/09/2022, 6:18 PMTravis Little
05/18/2022, 10:37 PMdef load_input(self, context: InputContext):
path = self._get_path(context.upstream_output)
context.log.debug(f"Loading Blob object from: {self._uri_for_path(path)}")
obj_pickle: StorageStreamDownloader = self.container_client.download_blob(
blob=path
)
obj = pickle.loads(bz2.decompress(obj_pickle.readall())) # type: ignore
return obj
def handle_output(self, context: OutputContext, obj: Any):
path = self._get_path(context)
blob_client = self.container_client.get_blob_client(path)
context.log.debug(f"Writing Blob object at: {self._uri_for_path(path)}")
with io.BytesIO() as pickled_obj_bytes:
pickled_obj = bz2.compress(pickle.dumps(obj, 4))
pickled_obj_bytes.write(pickled_obj)
length = pickled_obj_bytes.tell()
pickled_obj_bytes.seek(0)
blob_client.upload_blob(pickled_obj_bytes, length=length, overwrite=True)
portal_url = self._get_portal_url(context=context)
azure_blob_path: str = blob_client.url
metadata = self._get_metadata(azure_blob_path, portal_url, length, obj)
context.log_event(
AssetObservation(
asset_key=AssetKey(self._get_namespace(context)),
metadata_entries=metadata,
)
)
Alex Service
05/19/2022, 3:25 PM