Jeff Hulbert
03/05/2021, 4:31 PMalex
03/05/2021, 4:45 PMPrratek Ramchandani
03/05/2021, 4:57 PMJeff Hulbert
03/05/2021, 5:57 PMPrratek Ramchandani
03/05/2021, 6:20 PM.jsonl
file and then uploads that to GCS.max
03/05/2021, 6:57 PMsandy
03/05/2021, 7:18 PMJeff Hulbert
03/05/2021, 8:20 PMclass CustomPathFileObjectFilesystemIOManager(IOManager):
def __init__(self, base_dir=None):
self.base_dir = check.opt_str_param(base_dir, "base_dir")
self.write_mode = "wb"
self.read_mode = "rb"
def _get_path(self, path):
print(f"path: {path}")
return os.path.join(self.base_dir, path)
def handle_output(self, context, obj):
check.inst_param(context, "context", OutputContext)
metadata = context.metadata
print(f"context.metadata{context.metadata}")
path = check.str_param(metadata.get("path"), "metadata.path")
filepath = self._get_path(path)
# Ensure path exists
mkdir_p(os.path.dirname(filepath))
context.log.debug(f"Writing file at: {filepath}")
with open(filepath, self.write_mode) as write_obj, open(obj, self.read_mode) as read_obj:
shutil.copyfileobj(read_obj, write_obj)
return AssetMaterialization(
asset_key=AssetKey([context.pipeline_name, context.step_key, context.name]),
metadata_entries=[EventMetadataEntry.fspath(os.path.abspath(filepath))],
)
def load_input(self, context):
check.inst_param(context, "context", InputContext)
metadata = context.upstream_output.metadata
path = check.str_param(metadata.get("path"), "metadata.path")
filepath = self._get_path(path)
context.log.debug(f"Return filepath from: {filepath}")
return filepath
class CustomPathPickledObjectFilesystemIOManager(IOManager):
class CustomPathFileObjectADLS2IOManager(IOManager):
def __init__(self, file_system, adls2_client, base_dir):
self.adls2_client = adls2_client
self.file_system_client = self.adls2_client.get_file_system_client(file_system)
self.lease_duration = _LEASE_DURATION
self.base_dir = base_dir
def _get_local_path(self, key):
return os.path.join(self.base_dir, key)
def _rm_object(self, key):
check.str_param(key, "key")
check.param_invariant(len(key) > 0, "key")
# This operates recursively already so is nice and simple.
self.file_system_client.delete_file(key)
def _has_object(self, key):
check.str_param(key, "key")
check.param_invariant(len(key) > 0, "key")
try:
file = self.file_system_client.get_file_client(key)
file.get_file_properties()
return True
except ResourceNotFoundError:
return False
def _uri_for_key(self, key, protocol=None):
check.str_param(key, "key")
protocol = check.opt_str_param(protocol, "protocol", default="abfss://")
return "{protocol}{filesystem}@{account}.<http://dfs.core.windows.net/{key}%22.format(|dfs.core.windows.net/{key}".format(>
protocol=protocol,
filesystem=self.file_system_client.file_system_name,
account=self.file_system_client.account_name,
key=key,
)
def load_input(self, context):
check.inst_param(context, "context", InputContext)
metadata = context.upstream_output.metadata
key = check.str_param(metadata.get("path"), "metadata.path")
context.log.debug(f"Loading ADLS2 object from: {self._uri_for_key(key)}")
file = self.file_system_client.get_file_client(key)
filepath = self._get_local_path(key)
with open(filepath, mode='wb') as fp:
fp.write(file.download_file())
return filepath
def handle_output(self, context, obj):
check.inst_param(context, "context", OutputContext)
metadata = context.metadata
key = check.str_param(metadata.get("path"), "metadata.path")
context.log.debug(f"Writing ADLS2 object at: {self._uri_for_key(key)}")
if self._has_object(key):
context.log.warning(f"Removing existing ADLS2 key: {key}")
self._rm_object(key)
file = self.file_system_client.create_file(key)
with file.acquire_lease(self.lease_duration) as lease:
file.upload_data(open(obj, mode='rb'), lease=lease, overwrite=True)
@io_manager(
config_schema={
"adls2_file_system": Field(StringSource, description="ADLS Gen2 file system name"),
"base_dir": Field(StringSource, description="Local file path to save files"),
},
required_resource_keys={"adls2"},
)
def adls2_file_io_manager(init_context):
adls_resource = init_context.resources.adls2
adls2_client = adls_resource.adls2_client
adls_io_manager = CustomPathFileObjectADLS2IOManager(
init_context.resource_config["adls2_file_system"],
adls2_client,
init_context.resource_config.get("base_dir"),
)
return adls_io_manager