Daniel Gafni
03/14/2023, 4:21 PMdagster._core.errors.DagsterInvalidDefinitionError: Problem using type 'upath.core.UPath | pathlib.Path' from type annotation for argument 'test_path', correct the issue or explicitly set the dagster_type via In().
yuhan
03/15/2023, 12:32 AMDaniel Gafni
03/15/2023, 2:53 PMclass RawUPathInputManager(UPathIOManager):
def dump_to_path(self, context: OutputContext, obj: Any, path: UPath | Path):
raise NotImplementedError()
def load_from_path(self, path: UPath | Path, context: InputContext) -> UPath | Path:
extension = context.metadata.get("extension", "")
path = path.with_suffix(extension)
assert path.exists(), path
return path
And then I'm just trying to use it in my asset:
train_path = SourceAsset(
key=["ar-resp", "df_train_seq"],
io_manager_def=source_assets_static_raw_upath_input_manager,
)
@asset(
ins={
"train_path": AssetIn(
asset_key=["ar-resp", "df_train_seq"],
metadata={"extension": ".parquet"},
dagster_type=path_like, # had to create this DagsterType
)
},
)
def model(
train_path: UPath | Path # <- here is the error
) -> dict:
...