Hi everyone, I have assets defined and the custom ...
# ask-community
j
Hi everyone, I have assets defined and the custom UPathIOManager as follows. How can I pass the asset's group name into the io_manager so that it creates a subdirectory with the same name as my asset's group name such as this: {base path}/{asset_group_name}/{asset_name}, instead of using the default path storage/{asset_name}?
Copy code
@asset(
    io_manager_key="local_pyarrowparquet_manager",
    ins={"DIPC_Raw_daily": AssetIn(metadata={"allow_missing_partitions": True})},
    partitions_def=bmonth_partitions,
    group_name=DATASET_GROUP_BMONTH,
    metadata={
        "Dataset Name": WESM_Dataset["PMR"]["PMR_DIPC_Raw"].NAME,
    },
)
def DIPC_Raw_bmonth(
    DIPC_Raw_daily,
) -> pyarrow.Table:
    zipfiles = DIPC_Raw_daily.values()
    csvfileslist = map(
        lambda zf: fileiter_from_zip(BytesIO(zf), file_ext=".csv"), zipfiles
    )
    csvfiles = [csvfile for csvfilelist in csvfileslist for csvfile in csvfilelist]
    return WESM_Dataset["PMR"]["PMR_DIPC_Raw"].pyarrow_table_from_csv(csvfiles)

@io_manager(config_schema={"base_path": Field(str, is_required=False)})
def local_pyarrowparquet_io_manager(
    init_context: InitResourceContext,
) -> PyarrowParquetIOManager:
    assert init_context.instance is not None
    base_path = UPath(
        init_context.resource_config.get(
            "base_path", init_context.instance.storage_directory()
        )
    )
    return PyarrowParquetIOManager(base_path=base_path)
o
hi @Jan Samuel Matuba! if you want this io manager to work for multiple different asset groups at the same time, you'll need to add logic within the PyarrowParquetIOManager itself. However you choose to structure your code, you'll want to determine what asset group the output you're storing/loading is in from the context. Unfortunately, it looks like the asset group name is not currently available on the relevant context argument (but feel free to file a github issue for this!). However, a quick workaround would be to add the group name into the metadata of the asset, and pull the group name from
context.metadata["group_name"]
d
has
group_name
been added to the context since this message was posted?
I don't think it has. I added a feature request for it: https://github.com/dagster-io/dagster/issues/13945#issue-1687037202