I have a custom parquet IO manager that extends `U...
# ask-community
p
I have a custom parquet IO manager that extends
UPathIOManager
than can provide
pandas.DataFrame
or
pyarrow.Table
This generally works fine and I can switch back and forth, but when I have a downstream asset that requires multiple upstream partitions, I get type mismatch error i.e.:
DagsterTypeCheckDidNotPass
. The only way I can get it to work is to have the downstream asset use
upstream: Dict[str, X]
where
X
is the type annotation on the IO manager’s
def load_from_path(...) -> X
method. I can’t seem to find the way to annotate my methods to make this dynamic.
o
hi @Philippe Laflamme! this would be expected behavior for the UPathIOManager + partitioned assets. When it loads multiple partitions, it will load them in a dictionary of {partition_key: value} (so load_from_path will get invoked multiple times). If you want different behavior (like loading every partition into one large dataframe), you'll likely need to override more than just load_from_path on the UPathIOManager (potentially overriding the load_input method)
p
Ok, but is it not possible to annotate the
load_from_path
method in such a way that type checking works for both
upstream: Dict[str, pd.DataFrame]
and
upstream: Dict[str, pa.Table]
?
I tried
load_from_path() -> Union[pd.DataFrame, pa.Table]
but that doesn’t type check
o
ah I see what you mean -- I the code inside the UPathIOManager is being overly-strict, requiring that the input type annotation is of the form
Dict[str, A]
, where
A
is exactly equal to the return annotation of
load_from_path
.
nod2 1
I think for now, it might be for the best to just annotate
load_from_path()
as
Any
(this dodges the type checking behavior I believe)
p
I think I tried that, but let me give it another try, I did change a few things around this
Yeah, that also fails. I get
CheckError
in this case
o
mind providing the stack trace?
p
yep, just a sec
Copy code
dagster._check.CheckError: Failure condition: Received `typing.Dict[str, polars.internals.dataframe.frame.DataFrame]` type in input of DagsterType <dagster._core.types.python_dict._TypedPythonDict object at 0x7fff811c2800>, but `<bound method PartitionedParquetIOManager.load_from_path of <core.parquet_io_manager.PartitionedParquetIOManager object at 0x7fff80f9b670>>` has typing.Any type annotation for obj. They should be both specified with type annotations and match. If you are loading multiple partitions, the upstream asset type annotation should be a typing.Dict.
o
ahh it's actually leaving the input unannotated which will turn off this behavior, my bad. this does seem like an overly-aggressive error, I think a warning might suffice in these sorts of cases
p
removing
Dict[str, X]
from the downstream asset gives me this:
Copy code
dagster._check.CheckError: Failure condition: Inputs of type <dagster._core.types.dagster_type._Any object at 0x7fffe974afb0> not supported. Please specify a valid type for this input either on the argument of the @asset-decorated function.
is that what you meant by “leaving the input unannotated”?
removing
-> Any
from
load_from_path
produces this:
Copy code
dagster._check.CheckError: Failure condition: Received `typing.Dict[str, polars.internals.dataframe.frame.DataFrame]` type in input of DagsterType <dagster._core.types.python_dict._TypedPythonDict object at 0x7fff811c2800>, but `<bound method PartitionedParquetIOManager.load_from_path of <core.parquet_io_manager.PartitionedParquetIOManager object at 0x7fff80f9f670>>` has <class 'inspect._empty'> type annotation for obj. They should be both specified with type annotations and match. If you are loading multiple partitions, the upstream asset type annotation should be a typing.Dict.
o
huh... sorry this is a bit of a rat's nest. not to keep giving you random suggestions, but my guess is that
load_from_path() -> Union[pandas.DataFrame, pyarrow.Table]
@asset
def downstream_asset(inp: Dict[str, Union[pandas.DataFrame, pyarrow.Table]])
might do the trick
pretty rough experience though, sorry you're running into this
p
Ok, thanks, this isn’t blocking, but it’s annoying. Should I open an issue about this?
o
yeah opening an issue here would be great
👍 1
p