Does someone have examples of using the UPathIOMan...
# ask-community
m
Does someone have examples of using the UPathIOManager? I checked the docs and it's not that obvious to me atm
j
The UPathIOManager is a pretty recent community contribution. @Daniel Gafni i believe it was your PR right? do you have some examples of using it?
d
Sure, I have some examples, but they don't differ much from the docs (because you need to write so little code to use
UPathIOManager
). Here is the example from the docs. Here are my examples:
Copy code
class JSONIOManager(UPathIOManager):
    extension: str = ".json"

    def dump_to_path(self, context: OutputContext, obj: Any, path: UPath):
        with path.open("wb") as file:
            file.write(orjson.dumps(obj))

    def load_from_path(self, context: InputContext, path: UPath) -> Any:
        with path.open("rb") as file:
            return orjson.loads(file.read())


class ParquetIOManager(UPathIOManager):
    extension: str = ".pq"

    def dump_to_path(self, context: OutputContext, obj: pl.DataFrame, path: UPath):
        with path.open("wb") as file:
            obj.write_parquet(file)

    def load_from_path(self, path: UPath, context: InputContext) -> pl.DataFrame:
        columns = context.metadata.get("columns")
        if columns is not None:
            context.log.debug(f"Loading {columns=}")

        with path.open("rb") as file:
            return pl.read_parquet(file, columns=columns)

    def get_metadata(self, context: OutputContext, obj: pl.DataFrame) -> dict[str, MetadataValue]:
        return {
            "report": MetadataValue.json(polars_df_report(obj)),
            "row_count": <http://MetadataValue.int|MetadataValue.int>(len(obj)),
            "head": MetadataValue.json(orjson.loads(obj.head().write_json())),
        }
Example for the actual `io_manager`:
Copy code
@io_manager(config_schema={"base_path": Field(str, is_required=False)})
def local_parquet_io_manager(init_context: InitResourceContext) -> ParquetIOManager:
    assert init_context.instance is not None
    base_path = UPath(init_context.resource_config.get("base_path", init_context.instance.storage_directory()))
    return ParquetIOManager(base_path=base_path)
Usage with
S3
would be the same, you would just have to define the
AWS_ACCESS_KEY_ID
and
AWS_SECRET_ACCESS_KEY
environment variables when creating the IOManager instance. You can specify them as required resources for your
s3_parquet_io_manager
. They will be recognized by
UPath
if the
base_path
starts with
s3://
. P.S. overriding
UPathIOManager.get_metadata
is a little broken right now (failing for loading single inputs), but this PR should fix it. Hopefully the 1.1.8 release will have this merged.
❤️ 1
m
Hey Daniel thanks a lot for replying, I'm using Azure so I ended up using azure data lake file system which worked perfectly, I was stuck with an issue with the path but I realized I had to cast the UPath to a string and it works beautifully. I am using pyarrow parquet so next step will be to figure out how partitioning works with UPathIOManager
d
ok cool!
I had to cast the UPath to a string
Do you mind elaborating on this issue? was it on Dagster's side or on yours?
figure out how partitioning works
1. Loading single partitions works out of the box 2. Loading multiple partitions with
PartitionMapping
also works out of the box UPathIOManager writes asset partitions in the same directory like this:
Copy code
asset/key/2022-01-01.parquet
assety/key/20-22-01-02.parquet
I don't think this plays well with
pyarrow
partitioning out of the box. @sandy what do you think of extending `UPathIOManager`'s functionality by allowing the end user to specify how to format the partition path? So something like this would be possible:
Copy code
# in my awesome IOManager
def format_partition_path(context, obj: pl.DataFrame) -> str:
    dates_in_df = obj["date"].unique().to_list()
    assert len(dates_in_df) == 1
    assert dates_in_df[0] == context.partition_key  # need to actually parse the date here

    return f"date={context.partition_key}"
This would result in partitioning structure compatible with the hive schema (and pyarrow or Apache Spark would understand it out of the box):
Copy code
asset/key/date=2022-01-01.parquet
assety/key/date=20-22-01-02.parquet
Using empty path if the partitioning is handled inside PySpark and not by Dagster:
Copy code
def format_partition_path(context, obj: pl.DataFrame) -> str:
    return ""
Of course the default formatting would not change.
🌈 1
m
Copy code
from adlfs import AzureBlobFileSystem
from azure.identity.aio import DefaultAzureCredential
from dagster import Any, InputContext, OutputContext, UPathIOManager
from pyarrow import fs
from upath import UPath

from data_pipeline import parquet


def parquet_io_manager(context):
    """Persistent IO manager for PyArrow Parquet."""
    account_name = context.resource_config["storage_account"]
    credential = DefaultAzureCredential(exclude_environment_credential=True)
    file_system = AzureBlobFileSystem(account_name=account_name, credential=credential)
    base_path = UPath(context.resource_config["container_name"])
    return ParquetIOManager(base_path=base_path, file_system=file_system)


class ParquetIOManager(UPathIOManager):
    extension: str = ".parquet"

    def __init__(self, base_path: UPath, file_system: fs.FileSystem):
        super().__init__(base_path)
        self._fs = file_system

    def load_from_path(self, context: InputContext, path: UPath) -> Any:
        return parquet.read_table(path, filesystem=self._fs)

    def dump_to_path(self, context: OutputContext, obj: Any, path: UPath):
        parquet.write_table(obj, str(path), filesystem=self._fs)
the last line was where I was mistakenly using UPath but pyarrow.parquet.write_table wants str or "path-like object" which is confusing because I thought UPath was already "path-like" but pyarrow meant something different haha
d
I see, so you had to define your own fs object. My original idea was to make theUPathIOManagers work with the pure UPaths since the fs can already be supplied inside the user-provided base_path.
m
Doesn't work with azure unfortunately I need to pass azure credentials to the fs
I am experiencing weird issues however
I think I know the problem but I am not sure how to fix it, here is the stacktrace
Copy code
The above exception was caused by the following exception:
PermissionError: [Errno 13] Permission denied: 'container_name'
  File "/opt/data_pipeline/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/utils.py", line 56, in op_execution_error_boundary
    yield
  File "/opt/data_pipeline/.venv/lib/python3.9/site-packages/dagster/_utils/__init__.py", line 458, in iterate_with_context
    next_output = next(iterator)
  File "/opt/data_pipeline/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 626, in _gen_fn
    gen_output = output_manager.handle_output(output_context, output.value)
  File "/opt/data_pipeline/.venv/lib/python3.9/site-packages/dagster/_core/storage/upath_io_manager.py", line 223, in handle_output
    path.parent.mkdir(parents=True, exist_ok=True)
  File "/usr/local/lib/python3.9/pathlib.py", line 1323, in mkdir
    self._accessor.mkdir(self, mode)
the above trace, it's running in k8s and it's trying to create a file on the local fs
I see some weird behaviour locally as well, it creates the "container_name" directory in my local fs then pushes the parquet file to azure blob confused dog
I tried different path combinations, for example
base_path = UPath(f"abfss://{context.resource_config['container_name']}")
but I got
Copy code
ValueError: unable to connect to account for Must provide either a connection_string or account_name with credentials!!
  File "/Users/martin/workspace/experiment-data-pipeline/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/utils.py", line 56, in op_execution_error_boundary
    yield
  File "/Users/martin/workspace/experiment-data-pipeline/.venv/lib/python3.9/site-packages/dagster/_utils/__init__.py", line 458, in iterate_with_context
    next_output = next(iterator)
  File "/Users/martin/workspace/experiment-data-pipeline/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 626, in _gen_fn
    gen_output = output_manager.handle_output(output_context, output.value)
  File "/Users/martin/workspace/experiment-data-pipeline/.venv/lib/python3.9/site-packages/dagster/_core/storage/upath_io_manager.py", line 229, in handle_output
    path.parent.mkdir(parents=True, exist_ok=True)
  File "/Users/martin/workspace/experiment-data-pipeline/.venv/lib/python3.9/site-packages/upath/core.py", line 403, in mkdir
    self._accessor.mkdir(
  File "/Users/martin/workspace/experiment-data-pipeline/.venv/lib/python3.9/site-packages/upath/core.py", line 134, in __getattr__
    self._accessor = _accessor = self._default_accessor(
  File "/Users/martin/workspace/experiment-data-pipeline/.venv/lib/python3.9/site-packages/upath/core.py", line 28, in __init__
    self._fs = cls(**url_kwargs)
  File "/Users/martin/workspace/experiment-data-pipeline/.venv/lib/python3.9/site-packages/fsspec/spec.py", line 76, in __call__
    obj = super().__call__(*args, **kwargs)
  File "/Users/martin/workspace/experiment-data-pipeline/.venv/lib/python3.9/site-packages/adlfs/spec.py", line 268, in __init__
    self.do_connect()
  File "/Users/martin/workspace/experiment-data-pipeline/.venv/lib/python3.9/site-packages/adlfs/spec.py", line 444, in do_connect
    raise ValueError(f"unable to connect to account for {e}")
so I am not sure how/if I will get UPathIOManager to work with ablfs
d
So that’s the thing, you need to pass credentials to the base_path when creating it. This can be done either with kwargs or with environment variables. Check out the example notebook here and adlfl docs here Edit: sorry, I completely missed that you are passing file_system to UPath, however I’m not near my PC right now and not sure if that’s a valid way of creating the UPath
Re: local filesystem in container This has to do with your docker image setup, most likely you are running your code in the container under a non-root user and didn’t chown this directory for this user
m
Yea i never wanted to touch the local filesystem, so it's probably the wrong UPath, with no protocol doing that right?
1
Yea I think in this edge case I won't be able to use the UPathIOManager because I need to pass
storage_options={'account_name': ACCOUNT_NAME, 'anon': False}
to use the
DefaultAzureCredential
because I don't/can't use the typical envvars to authenticate, I'm using
oauth
Thanks for your help thought
d
Why can't you use
oauth
in the
@io_manager
code and pass the credentials as
storage_options
kwargs to
UPath
? Something like this:
Copy code
base_path = UPath("<abfs://my/path>", account_name="account", anon=False, account_key="password")
m
Woah cool I did not know that I could pass additional arguments other than the 1st arg for path to UPath, thanks a lot Daniel
🎉 1
d
great! does it work? we probably should add this to Dagster's docs
m
I used this
base_path = UPath(f"abfss://{container_name}/", account_name=account_name, anon=False)
and it doesn't work, I get a "please specify container name error
comes from dagster/_core/storage/upath_io_manager.py", line 223
d
this line?
Could you post the error?
Copy code
The above exception was caused by the following exception:
ValueError: Failed to fetch container properties for for Please specify a container name.
  File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/utils.py", line 56, in op_execution_error_boundary
    yield
  File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/dagster/_utils/__init__.py", line 458, in iterate_with_context
    next_output = next(iterator)
  File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 626, in _gen_fn
    gen_output = output_manager.handle_output(output_context, output.value)
  File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/dagster/_core/storage/upath_io_manager.py", line 224, in handle_output
    path.parent.mkdir(parents=True, exist_ok=True)
  File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/upath/core.py", line 403, in mkdir
    self._accessor.mkdir(
  File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/upath/core.py", line 67, in mkdir
    return self._fs.mkdir(
  File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/fsspec/asyn.py", line 113, in wrapper
    return sync(self.loop, func, *args, **kwargs)
  File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/fsspec/asyn.py", line 98, in sync
    raise return_result
  File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/fsspec/asyn.py", line 53, in _runner
    result[0] = await coro
  File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/adlfs/spec.py", line 1109, in _mkdir
    container_exists = await self._container_exists(container_name)
  File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/adlfs/spec.py", line 1084, in _container_exists
    raise ValueError(
The above exception was caused by the following exception:
ValueError: Please specify a container name.
  File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/adlfs/spec.py", line 1077, in _container_exists
    async with self.service_client.get_container_client(
  File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/azure/storage/blob/aio/_blob_service_client_async.py", line 631, in get_container_client
    return ContainerClient(
  File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/azure/storage/blob/aio/_container_client_async.py", line 120, in __init__
    super(ContainerClient, self).__init__(
  File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/azure/storage/blob/_container_client.py", line 154, in __init__
    raise ValueError("Please specify a container name.")
path.parent looks valid
and so does path
path.parent is abfss://dagster which is a container i have already in the storage account
d
can you try specifying a subdirectory for the
base_path
?
m
if i add /test to the end i get another weird error
Copy code
pyarrow.lib.ArrowInvalid: Unrecognized filesystem type in URI: <abfss://dagster/test/birthdays.parquet>
  File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/utils.py", line 56, in op_execution_error_boundary
    yield
  File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/dagster/_utils/__init__.py", line 458, in iterate_with_context
    next_output = next(iterator)
  File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 626, in _gen_fn
    gen_output = output_manager.handle_output(output_context, output.value)
  File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/dagster/_core/storage/upath_io_manager.py", line 226, in handle_output
    self.dump_to_path(context=context, obj=obj, path=path)
  File "/Users/martin/workspace/data-pipeline/data_pipeline/parquet/io_managers.py", line 58, in dump_to_path
    parquet.write_table(obj, str(path))
  File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/pyarrow/parquet/core.py", line 2964, in write_table
    with ParquetWriter(
  File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/pyarrow/parquet/core.py", line 948, in __init__
    filesystem, path = _resolve_filesystem_and_path(
  File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/pyarrow/fs.py", line 185, in _resolve_filesystem_and_path
    filesystem, path = FileSystem.from_uri(path)
  File "pyarrow/_fs.pyx", line 470, in pyarrow._fs.FileSystem.from_uri
  File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
d
you don't pass the
UPath's
file to pyarrow. You pass the string.
You need to do something like:
Copy code
with path.open("wb") as file:
    parquet.write_table(obj, file)
m
Still doesn't work
Copy code
@io_manager(
    config_schema={
        "container_name": Field(StringSource),
        "storage_account": Field(StringSource),
    },
)
def parquet_io_manager(context):
    """Persistent IO manager for PyArrow Parquet."""
    container_name = context.resource_config["container_name"]
    storage_account = context.resource_config["storage_account"]
    credential = DefaultAzureCredential(exclude_environment_credential=True)

    base_path = UPath(
        f"abfss://{container_name}",
        account_name=storage_account,
        credential=credential,
    )
    return ParquetIOManager(base_path=base_path)


class ParquetIOManager(UPathIOManager):
    extension: str = ".parquet"

    def load_from_path(self, context: InputContext, path: UPath) -> Any:
        columns = context.metadata.get("columns")
        if columns is not None:
            context.log.debug(f"Loading {columns=}")

        with path.open("rb") as file:
            return parquet.read_table(file, columns=columns)

    def dump_to_path(self, context: OutputContext, obj: Any, path: UPath):
        with path.open("wb") as file:
            parquet.write_table(obj, file)
Copy code
dagster._core.errors.DagsterExecutionHandleOutputError: Error occurred while handling output "result" of step "birthdays":

  File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_plan.py", line 266, in dagster_event_sequence_for_step
    for step_event in check.generator(step_events):
  File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 394, in core_dagster_event_sequence_for_step
    for evt in _type_check_and_store_output(step_context, user_event, input_lineage):
  File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 451, in _type_check_and_store_output
    for evt in _store_output(step_context, step_output_handle, output, input_lineage):
  File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 636, in _store_output
    for elt in iterate_with_context(
  File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/dagster/_utils/__init__.py", line 460, in iterate_with_context
    return
  File "/opt/homebrew/Cellar/python@3.9/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/contextlib.py", line 137, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/utils.py", line 86, in op_execution_error_boundary
    raise error_cls(

The above exception was caused by the following exception:
ValueError: Failed to fetch container properties for for Please specify a container name.

  File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/utils.py", line 56, in op_execution_error_boundary
    yield
  File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/dagster/_utils/__init__.py", line 458, in iterate_with_context
    next_output = next(iterator)
  File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 626, in _gen_fn
    gen_output = output_manager.handle_output(output_context, output.value)
  File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/dagster/_core/storage/upath_io_manager.py", line 223, in handle_output
    path.parent.mkdir(parents=True, exist_ok=True)
  File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/upath/core.py", line 403, in mkdir
    self._accessor.mkdir(
  File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/upath/core.py", line 67, in mkdir
    return self._fs.mkdir(
  File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/fsspec/asyn.py", line 113, in wrapper
    return sync(self.loop, func, *args, **kwargs)
  File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/fsspec/asyn.py", line 98, in sync
    raise return_result
  File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/fsspec/asyn.py", line 53, in _runner
    result[0] = await coro
  File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/adlfs/spec.py", line 1109, in _mkdir
    container_exists = await self._container_exists(container_name)
  File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/adlfs/spec.py", line 1084, in _container_exists
    raise ValueError(

The above exception was caused by the following exception:
ValueError: Please specify a container name.

  File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/adlfs/spec.py", line 1077, in _container_exists
    async with self.service_client.get_container_client(
  File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/azure/storage/blob/aio/_blob_service_client_async.py", line 631, in get_container_client
    return ContainerClient(
  File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/azure/storage/blob/aio/_container_client_async.py", line 120, in __init__
    super(ContainerClient, self).__init__(
  File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/azure/storage/blob/_container_client.py", line 154, in __init__
    raise ValueError("Please specify a container name.")
d
Copy code
base_path = UPath(
        f"abfss://{container_name}",
        account_name=storage_account,
        credential=credential,
    )
you removed the subdir again
what happens here is we are tying to create the parent directory, but it can't exist (as the container_name gets removed). Seems like
path.parent
should fail in this case. Not sure if this is fsspec's bug. Anyway, just specify any subdir for the base_path
m
ah I see
so for me container name could be anything as i don't really see it change anything, the subdir ends up being the real container name in the azure portal
weird
I guess I will put
f"abfss://{storage_account}/{container_name}"
thanks for helping me debug
👍 1
d
you are welcome, hopefully
UPathIOManager
will be convenient to use!
hey @Martin Picard
hive
formatting can be implemented with
UPathIOManager
by simply using for example
year=%Y/month=%m/day=%d
for `DailyPartitionsDefinition`'s
fmt
. Can you try it out?