Jeremy Lyman
04/05/2023, 6:47 PMASSET_MATERIALIZATION - Materialized value component myasset.
I only see it calling handle_output
on the IO manager once with a range, IO manager has saved it as a single xxxxxxx_xxxxxx.pq with all assets inside of it. But the asset job keeps repeating the debug log (possibly for each of the thousands of partitions). Any thoughts?Daniel Gafni
04/05/2023, 6:50 PMPartitionedParquetIOManager
from? Is it from the github example? Just interested
Also seems like you're materializing a range of partitions in the same run, which is not the same as backfilling with multiple runs (one per partition).Jeremy Lyman
04/05/2023, 6:54 PMDaniel Gafni
04/05/2023, 6:54 PMDaniel Gafni
04/05/2023, 6:55 PMJeremy Lyman
04/05/2023, 6:55 PMDaniel Gafni
04/05/2023, 6:55 PMJeremy Lyman
04/05/2023, 6:56 PMDaniel Gafni
04/05/2023, 6:56 PMJeremy Lyman
04/05/2023, 6:57 PM@asset(
io_manager_key="parquet_io_manager",
key_prefix=["component"],
metadata={
"partition_expr": "time",
},
partitions_def=hourly_partitions_def,
required_resource_keys={"ods_client"},
)
def ods_components(context) -> pd.DataFrame:
"""Components from the ODS."""
start, end = context.asset_partitions_time_window_for_output()
# partition_date_str = context.asset_partition_key_for_output()
get_dagster_logger().info(f"Materializing ODS assets between: {start} {end}")
return context.resources.ods_client.extract_between(start, end)
Daniel Gafni
04/05/2023, 6:59 PMJeremy Lyman
04/05/2023, 7:01 PMDaniel Gafni
04/05/2023, 7:03 PMpl.DataFrame
or pl.LazyFrame
based on how did you type the input. It will also work with gcs
, s3
, etc.
Perhaps you want to write something like this.
class PolarsParquetIOManager(UPathIOManager):
"""
Will return polars.DataFrame or polars.LazyFrame based on the type hint or dagster_type.typing_type
"""
extension: str = ".parquet"
def dump_to_path(self, context: OutputContext, obj: pl.DataFrame, path: UPath):
with path.open("wb") as file:
obj.write_parquet(file)
def load_from_path(self, path: UPath, context: InputContext) -> pl.DataFrame | pl.LazyFrame:
fs: fsspec.AbstractFileSystem | None = None
try:
fs = path._accessor._fs
except AttributeError:
pass
ldf = pl.scan_ds(ds.dataset(str(path), filesystem=fs))
columns = context.metadata.get("columns")
if columns is not None:
context.log.debug(f"Loading {columns=}")
ldf = ldf.select(columns)
if context.dagster_type.typing_type in (
pl.DataFrame,
Dict[str, pl.DataFrame],
Mapping[str, pl.DataFrame],
None,
):
return ldf.collect(streaming=True)
elif context.dagster_type.typing_type in (
pl.LazyFrame,
Dict[str, pl.LazyFrame],
Mapping[str, pl.LazyFrame],
):
return ldf
else:
raise NotImplementedError(f"Can't load object for type annotation {context.dagster_type.typing_type}")
Jeremy Lyman
04/05/2023, 7:04 PMDaniel Gafni
04/05/2023, 7:05 PMUPathIOManager
Binoy Shah
04/06/2023, 2:23 PMds.dataset
in your code, what is the ds
alias to ?Daniel Gafni
04/06/2023, 2:23 PMimport pyarrow.dataset as ds
Binoy Shah
04/06/2023, 2:23 PMDaniel Gafni
04/06/2023, 2:23 PMBinoy Shah
04/06/2023, 2:24 PMUPathIOManager
”Daniel Gafni
04/06/2023, 2:27 PMUPathIOManager
supports exactly this. The partitions would be then returned in a dictionary with partition_key->obj mapping.Binoy Shah
04/06/2023, 2:28 PMDaniel Gafni
04/06/2023, 2:28 PMBinoy Shah
04/06/2023, 2:29 PMDaniel Gafni
04/06/2023, 2:36 PMBinoy Shah
04/06/2023, 2:36 PMJeremy Lyman
04/06/2023, 3:11 PMBinoy Shah
04/06/2023, 7:10 PM@resource
definition for the above Polars IO Manager ?Daniel Gafni
04/06/2023, 7:11 PMfrom upath import UPath
@io_manager(config_schema={"base_path": Field(str, is_required=False)})
def polars_parquet_io_manager(
init_context: InitResourceContext,
) -> PolarsParquetIOManager:
assert init_context.instance is not None
base_path = UPath(init_context.resource_config.get("base_path", init_context.instance.storage_directory()))
return PolarsParquetIOManager(base_path=base_path)
base_path
can be a remote storage uri like <s3://bucket/directory>
Binoy Shah
04/06/2023, 7:14 PMDaniel Gafni
04/06/2023, 7:14 PM