Jan Samuel Matuba
02/14/2023, 1:25 AM@asset(
io_manager_key="local_pyarrowparquet_manager",
ins={"DIPC_Raw_daily": AssetIn(metadata={"allow_missing_partitions": True})},
partitions_def=bmonth_partitions,
group_name=DATASET_GROUP_BMONTH,
metadata={
"Dataset Name": WESM_Dataset["PMR"]["PMR_DIPC_Raw"].NAME,
},
)
def DIPC_Raw_bmonth(
DIPC_Raw_daily,
) -> pyarrow.Table:
zipfiles = DIPC_Raw_daily.values()
csvfileslist = map(
lambda zf: fileiter_from_zip(BytesIO(zf), file_ext=".csv"), zipfiles
)
csvfiles = [csvfile for csvfilelist in csvfileslist for csvfile in csvfilelist]
return WESM_Dataset["PMR"]["PMR_DIPC_Raw"].pyarrow_table_from_csv(csvfiles)
@io_manager(config_schema={"base_path": Field(str, is_required=False)})
def local_pyarrowparquet_io_manager(
init_context: InitResourceContext,
) -> PyarrowParquetIOManager:
assert init_context.instance is not None
base_path = UPath(
init_context.resource_config.get(
"base_path", init_context.instance.storage_directory()
)
)
return PyarrowParquetIOManager(base_path=base_path)
owen
02/14/2023, 5:31 PMcontext.metadata["group_name"]
Drew You
04/27/2023, 2:40 PMgroup_name
been added to the context since this message was posted?