Martin Picard
12/15/2022, 12:05 PMjamie
12/15/2022, 3:37 PMDaniel Gafni
12/15/2022, 4:20 PMUPathIOManager
).
Here is the example from the docs.
Here are my examples:
class JSONIOManager(UPathIOManager):
extension: str = ".json"
def dump_to_path(self, context: OutputContext, obj: Any, path: UPath):
with path.open("wb") as file:
file.write(orjson.dumps(obj))
def load_from_path(self, context: InputContext, path: UPath) -> Any:
with path.open("rb") as file:
return orjson.loads(file.read())
class ParquetIOManager(UPathIOManager):
extension: str = ".pq"
def dump_to_path(self, context: OutputContext, obj: pl.DataFrame, path: UPath):
with path.open("wb") as file:
obj.write_parquet(file)
def load_from_path(self, path: UPath, context: InputContext) -> pl.DataFrame:
columns = context.metadata.get("columns")
if columns is not None:
context.log.debug(f"Loading {columns=}")
with path.open("rb") as file:
return pl.read_parquet(file, columns=columns)
def get_metadata(self, context: OutputContext, obj: pl.DataFrame) -> dict[str, MetadataValue]:
return {
"report": MetadataValue.json(polars_df_report(obj)),
"row_count": <http://MetadataValue.int|MetadataValue.int>(len(obj)),
"head": MetadataValue.json(orjson.loads(obj.head().write_json())),
}
Example for the actual `io_manager`:
@io_manager(config_schema={"base_path": Field(str, is_required=False)})
def local_parquet_io_manager(init_context: InitResourceContext) -> ParquetIOManager:
assert init_context.instance is not None
base_path = UPath(init_context.resource_config.get("base_path", init_context.instance.storage_directory()))
return ParquetIOManager(base_path=base_path)
Usage with S3
would be the same, you would just have to define the AWS_ACCESS_KEY_ID
and AWS_SECRET_ACCESS_KEY
environment variables when creating the IOManager instance. You can specify them as required resources for your s3_parquet_io_manager
. They will be recognized by UPath
if the base_path
starts with s3://
.
P.S. overriding UPathIOManager.get_metadata
is a little broken right now (failing for loading single inputs), but this PR should fix it. Hopefully the 1.1.8 release will have this merged.Martin Picard
12/16/2022, 9:39 AMDaniel Gafni
12/16/2022, 11:26 AMI had to cast the UPath to a stringDo you mind elaborating on this issue? was it on Dagster's side or on yours?
figure out how partitioning works1. Loading single partitions works out of the box 2. Loading multiple partitions with
PartitionMapping
also works out of the box
UPathIOManager writes asset partitions in the same directory like this:
asset/key/2022-01-01.parquet
assety/key/20-22-01-02.parquet
I don't think this plays well with pyarrow
partitioning out of the box.
@sandy what do you think of extending `UPathIOManager`'s functionality by allowing the end user to specify how to format the partition path?
So something like this would be possible:
# in my awesome IOManager
def format_partition_path(context, obj: pl.DataFrame) -> str:
dates_in_df = obj["date"].unique().to_list()
assert len(dates_in_df) == 1
assert dates_in_df[0] == context.partition_key # need to actually parse the date here
return f"date={context.partition_key}"
This would result in partitioning structure compatible with the hive schema (and pyarrow or Apache Spark would understand it out of the box):
asset/key/date=2022-01-01.parquet
assety/key/date=20-22-01-02.parquet
Using empty path if the partitioning is handled inside PySpark and not by Dagster:
def format_partition_path(context, obj: pl.DataFrame) -> str:
return ""
Of course the default formatting would not change.Martin Picard
12/16/2022, 2:52 PMfrom adlfs import AzureBlobFileSystem
from azure.identity.aio import DefaultAzureCredential
from dagster import Any, InputContext, OutputContext, UPathIOManager
from pyarrow import fs
from upath import UPath
from data_pipeline import parquet
def parquet_io_manager(context):
"""Persistent IO manager for PyArrow Parquet."""
account_name = context.resource_config["storage_account"]
credential = DefaultAzureCredential(exclude_environment_credential=True)
file_system = AzureBlobFileSystem(account_name=account_name, credential=credential)
base_path = UPath(context.resource_config["container_name"])
return ParquetIOManager(base_path=base_path, file_system=file_system)
class ParquetIOManager(UPathIOManager):
extension: str = ".parquet"
def __init__(self, base_path: UPath, file_system: fs.FileSystem):
super().__init__(base_path)
self._fs = file_system
def load_from_path(self, context: InputContext, path: UPath) -> Any:
return parquet.read_table(path, filesystem=self._fs)
def dump_to_path(self, context: OutputContext, obj: Any, path: UPath):
parquet.write_table(obj, str(path), filesystem=self._fs)
the last line was where I was mistakenly using UPath but pyarrow.parquet.write_table wants str or "path-like object" which is confusing because I thought UPath was already "path-like" but pyarrow meant something different hahaDaniel Gafni
12/16/2022, 3:47 PMMartin Picard
12/16/2022, 4:26 PMThe above exception was caused by the following exception:
PermissionError: [Errno 13] Permission denied: 'container_name'
File "/opt/data_pipeline/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/utils.py", line 56, in op_execution_error_boundary
yield
File "/opt/data_pipeline/.venv/lib/python3.9/site-packages/dagster/_utils/__init__.py", line 458, in iterate_with_context
next_output = next(iterator)
File "/opt/data_pipeline/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 626, in _gen_fn
gen_output = output_manager.handle_output(output_context, output.value)
File "/opt/data_pipeline/.venv/lib/python3.9/site-packages/dagster/_core/storage/upath_io_manager.py", line 223, in handle_output
path.parent.mkdir(parents=True, exist_ok=True)
File "/usr/local/lib/python3.9/pathlib.py", line 1323, in mkdir
self._accessor.mkdir(self, mode)
base_path = UPath(f"abfss://{context.resource_config['container_name']}")
but I got
ValueError: unable to connect to account for Must provide either a connection_string or account_name with credentials!!
File "/Users/martin/workspace/experiment-data-pipeline/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/utils.py", line 56, in op_execution_error_boundary
yield
File "/Users/martin/workspace/experiment-data-pipeline/.venv/lib/python3.9/site-packages/dagster/_utils/__init__.py", line 458, in iterate_with_context
next_output = next(iterator)
File "/Users/martin/workspace/experiment-data-pipeline/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 626, in _gen_fn
gen_output = output_manager.handle_output(output_context, output.value)
File "/Users/martin/workspace/experiment-data-pipeline/.venv/lib/python3.9/site-packages/dagster/_core/storage/upath_io_manager.py", line 229, in handle_output
path.parent.mkdir(parents=True, exist_ok=True)
File "/Users/martin/workspace/experiment-data-pipeline/.venv/lib/python3.9/site-packages/upath/core.py", line 403, in mkdir
self._accessor.mkdir(
File "/Users/martin/workspace/experiment-data-pipeline/.venv/lib/python3.9/site-packages/upath/core.py", line 134, in __getattr__
self._accessor = _accessor = self._default_accessor(
File "/Users/martin/workspace/experiment-data-pipeline/.venv/lib/python3.9/site-packages/upath/core.py", line 28, in __init__
self._fs = cls(**url_kwargs)
File "/Users/martin/workspace/experiment-data-pipeline/.venv/lib/python3.9/site-packages/fsspec/spec.py", line 76, in __call__
obj = super().__call__(*args, **kwargs)
File "/Users/martin/workspace/experiment-data-pipeline/.venv/lib/python3.9/site-packages/adlfs/spec.py", line 268, in __init__
self.do_connect()
File "/Users/martin/workspace/experiment-data-pipeline/.venv/lib/python3.9/site-packages/adlfs/spec.py", line 444, in do_connect
raise ValueError(f"unable to connect to account for {e}")
so I am not sure how/if I will get UPathIOManager to work with ablfsDaniel Gafni
12/16/2022, 7:47 PMMartin Picard
12/19/2022, 8:30 AMstorage_options={'account_name': ACCOUNT_NAME, 'anon': False}
to use the DefaultAzureCredential
because I don't/can't use the typical envvars to authenticate, I'm using oauth
Daniel Gafni
12/19/2022, 10:03 AMoauth
in the @io_manager
code and pass the credentials as storage_options
kwargs to UPath
?
Something like this:
base_path = UPath("<abfs://my/path>", account_name="account", anon=False, account_key="password")
Martin Picard
12/19/2022, 11:38 AMDaniel Gafni
12/19/2022, 11:51 AMMartin Picard
12/19/2022, 1:45 PMbase_path = UPath(f"abfss://{container_name}/", account_name=account_name, anon=False)
and it doesn't work, I get a "please specify container name errorDaniel Gafni
12/19/2022, 1:47 PMMartin Picard
12/19/2022, 1:48 PMThe above exception was caused by the following exception:
ValueError: Failed to fetch container properties for for Please specify a container name.
File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/utils.py", line 56, in op_execution_error_boundary
yield
File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/dagster/_utils/__init__.py", line 458, in iterate_with_context
next_output = next(iterator)
File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 626, in _gen_fn
gen_output = output_manager.handle_output(output_context, output.value)
File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/dagster/_core/storage/upath_io_manager.py", line 224, in handle_output
path.parent.mkdir(parents=True, exist_ok=True)
File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/upath/core.py", line 403, in mkdir
self._accessor.mkdir(
File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/upath/core.py", line 67, in mkdir
return self._fs.mkdir(
File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/fsspec/asyn.py", line 113, in wrapper
return sync(self.loop, func, *args, **kwargs)
File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/fsspec/asyn.py", line 98, in sync
raise return_result
File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/fsspec/asyn.py", line 53, in _runner
result[0] = await coro
File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/adlfs/spec.py", line 1109, in _mkdir
container_exists = await self._container_exists(container_name)
File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/adlfs/spec.py", line 1084, in _container_exists
raise ValueError(
The above exception was caused by the following exception:
ValueError: Please specify a container name.
File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/adlfs/spec.py", line 1077, in _container_exists
async with self.service_client.get_container_client(
File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/azure/storage/blob/aio/_blob_service_client_async.py", line 631, in get_container_client
return ContainerClient(
File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/azure/storage/blob/aio/_container_client_async.py", line 120, in __init__
super(ContainerClient, self).__init__(
File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/azure/storage/blob/_container_client.py", line 154, in __init__
raise ValueError("Please specify a container name.")
Daniel Gafni
12/19/2022, 1:50 PMbase_path
?Martin Picard
12/19/2022, 1:54 PMpyarrow.lib.ArrowInvalid: Unrecognized filesystem type in URI: <abfss://dagster/test/birthdays.parquet>
File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/utils.py", line 56, in op_execution_error_boundary
yield
File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/dagster/_utils/__init__.py", line 458, in iterate_with_context
next_output = next(iterator)
File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 626, in _gen_fn
gen_output = output_manager.handle_output(output_context, output.value)
File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/dagster/_core/storage/upath_io_manager.py", line 226, in handle_output
self.dump_to_path(context=context, obj=obj, path=path)
File "/Users/martin/workspace/data-pipeline/data_pipeline/parquet/io_managers.py", line 58, in dump_to_path
parquet.write_table(obj, str(path))
File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/pyarrow/parquet/core.py", line 2964, in write_table
with ParquetWriter(
File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/pyarrow/parquet/core.py", line 948, in __init__
filesystem, path = _resolve_filesystem_and_path(
File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/pyarrow/fs.py", line 185, in _resolve_filesystem_and_path
filesystem, path = FileSystem.from_uri(path)
File "pyarrow/_fs.pyx", line 470, in pyarrow._fs.FileSystem.from_uri
File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
Daniel Gafni
12/19/2022, 1:57 PMUPath's
file to pyarrow. You pass the string.with path.open("wb") as file:
parquet.write_table(obj, file)
Martin Picard
12/19/2022, 3:05 PM@io_manager(
config_schema={
"container_name": Field(StringSource),
"storage_account": Field(StringSource),
},
)
def parquet_io_manager(context):
"""Persistent IO manager for PyArrow Parquet."""
container_name = context.resource_config["container_name"]
storage_account = context.resource_config["storage_account"]
credential = DefaultAzureCredential(exclude_environment_credential=True)
base_path = UPath(
f"abfss://{container_name}",
account_name=storage_account,
credential=credential,
)
return ParquetIOManager(base_path=base_path)
class ParquetIOManager(UPathIOManager):
extension: str = ".parquet"
def load_from_path(self, context: InputContext, path: UPath) -> Any:
columns = context.metadata.get("columns")
if columns is not None:
context.log.debug(f"Loading {columns=}")
with path.open("rb") as file:
return parquet.read_table(file, columns=columns)
def dump_to_path(self, context: OutputContext, obj: Any, path: UPath):
with path.open("wb") as file:
parquet.write_table(obj, file)
dagster._core.errors.DagsterExecutionHandleOutputError: Error occurred while handling output "result" of step "birthdays":
File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_plan.py", line 266, in dagster_event_sequence_for_step
for step_event in check.generator(step_events):
File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 394, in core_dagster_event_sequence_for_step
for evt in _type_check_and_store_output(step_context, user_event, input_lineage):
File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 451, in _type_check_and_store_output
for evt in _store_output(step_context, step_output_handle, output, input_lineage):
File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 636, in _store_output
for elt in iterate_with_context(
File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/dagster/_utils/__init__.py", line 460, in iterate_with_context
return
File "/opt/homebrew/Cellar/python@3.9/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/contextlib.py", line 137, in __exit__
self.gen.throw(typ, value, traceback)
File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/utils.py", line 86, in op_execution_error_boundary
raise error_cls(
The above exception was caused by the following exception:
ValueError: Failed to fetch container properties for for Please specify a container name.
File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/utils.py", line 56, in op_execution_error_boundary
yield
File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/dagster/_utils/__init__.py", line 458, in iterate_with_context
next_output = next(iterator)
File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 626, in _gen_fn
gen_output = output_manager.handle_output(output_context, output.value)
File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/dagster/_core/storage/upath_io_manager.py", line 223, in handle_output
path.parent.mkdir(parents=True, exist_ok=True)
File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/upath/core.py", line 403, in mkdir
self._accessor.mkdir(
File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/upath/core.py", line 67, in mkdir
return self._fs.mkdir(
File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/fsspec/asyn.py", line 113, in wrapper
return sync(self.loop, func, *args, **kwargs)
File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/fsspec/asyn.py", line 98, in sync
raise return_result
File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/fsspec/asyn.py", line 53, in _runner
result[0] = await coro
File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/adlfs/spec.py", line 1109, in _mkdir
container_exists = await self._container_exists(container_name)
File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/adlfs/spec.py", line 1084, in _container_exists
raise ValueError(
The above exception was caused by the following exception:
ValueError: Please specify a container name.
File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/adlfs/spec.py", line 1077, in _container_exists
async with self.service_client.get_container_client(
File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/azure/storage/blob/aio/_blob_service_client_async.py", line 631, in get_container_client
return ContainerClient(
File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/azure/storage/blob/aio/_container_client_async.py", line 120, in __init__
super(ContainerClient, self).__init__(
File "/Users/martin/workspace/data-pipeline/.venv/lib/python3.9/site-packages/azure/storage/blob/_container_client.py", line 154, in __init__
raise ValueError("Please specify a container name.")
Daniel Gafni
12/19/2022, 3:06 PMbase_path = UPath(
f"abfss://{container_name}",
account_name=storage_account,
credential=credential,
)
you removed the subdir againpath.parent
should fail in this case. Not sure if this is fsspec's bug.
Anyway, just specify any subdir for the base_pathMartin Picard
12/19/2022, 3:09 PMf"abfss://{storage_account}/{container_name}"
Daniel Gafni
12/19/2022, 3:11 PMUPathIOManager
will be convenient to use!hive
formatting can be implemented with UPathIOManager
by simply using for example year=%Y/month=%m/day=%d
for `DailyPartitionsDefinition`'s fmt
.
Can you try it out?