I took the PartitionedParquetIOManager and was tes...
# ask-community
j
I took the PartitionedParquetIOManager and was testing it out with an HourlyPartitioned definition and when I run a backfill as a single job, I just get spammed with
ASSET_MATERIALIZATION - Materialized value component myasset.
I only see it calling
handle_output
on the IO manager once with a range, IO manager has saved it as a single xxxxxxx_xxxxxx.pq with all assets inside of it. But the asset job keeps repeating the debug log (possibly for each of the thousands of partitions). Any thoughts?
🤖 1
d
Where did you get
PartitionedParquetIOManager
from? Is it from the github example? Just interested Also seems like you're materializing a range of partitions in the same run, which is not the same as backfilling with multiple runs (one per partition).
j
Yes, it's from the project fully featured example.
d
Could you send me the link please?
d
ok so this example is a bit outdated
j
And, yes, when I go to the UI to materialize, I select to backfill in a single run.
d
just curious, why do you need to do that?
j
As opposed to multiple runs?
d
Yeah
j
I have no good reason. It seemed like the right thing to do. I set up my asset to handle a range and then load the data to materialize from the start time (for an hourly partition).
I am just playing with things to see how it works. 😄
Copy code
@asset(
    io_manager_key="parquet_io_manager",
    key_prefix=["component"],
    metadata={
        "partition_expr": "time",
    },
    partitions_def=hourly_partitions_def,
    required_resource_keys={"ods_client"},
)
def ods_components(context) -> pd.DataFrame:
    """Components from the ODS."""
    start, end = context.asset_partitions_time_window_for_output()
    # partition_date_str = context.asset_partition_key_for_output()
    get_dagster_logger().info(f"Materializing ODS assets between: {start} {end}")
    return context.resources.ods_client.extract_between(start, end)
d
I see. It doesn't seem like your case actually needs the range so I would suggest just using normal runs. You could use the range if your code could handle multiple partitions efficiently (like it's some SQL that doesn't care how many partitions does it work with). Or perhaps if you are using PySpark. Probably with DataFrames you would be safer by launching multiple runs. Less risk to OOM too in case of a lot of data.
j
Yeah, that's a good consideration.
d
here is my IOManager for parquet (using Polars). It's a bit fancy because it's returning either
pl.DataFrame
or
pl.LazyFrame
based on how did you type the input. It will also work with
gcs
,
s3
, etc. Perhaps you want to write something like this.
Copy code
class PolarsParquetIOManager(UPathIOManager):
    """
    Will return polars.DataFrame or polars.LazyFrame based on the type hint or dagster_type.typing_type
    """

    extension: str = ".parquet"

    def dump_to_path(self, context: OutputContext, obj: pl.DataFrame, path: UPath):
        with path.open("wb") as file:
            obj.write_parquet(file)

    def load_from_path(self, path: UPath, context: InputContext) -> pl.DataFrame | pl.LazyFrame:
        fs: fsspec.AbstractFileSystem | None = None

        try:
            fs = path._accessor._fs
        except AttributeError:
            pass

        ldf = pl.scan_ds(ds.dataset(str(path), filesystem=fs))

        columns = context.metadata.get("columns")
        if columns is not None:
            context.log.debug(f"Loading {columns=}")
            ldf = ldf.select(columns)

        if context.dagster_type.typing_type in (
            pl.DataFrame,
            Dict[str, pl.DataFrame],
            Mapping[str, pl.DataFrame],
            None,
        ):
            return ldf.collect(streaming=True)
        elif context.dagster_type.typing_type in (
            pl.LazyFrame,
            Dict[str, pl.LazyFrame],
            Mapping[str, pl.LazyFrame],
        ):
            return ldf
        else:
            raise NotImplementedError(f"Can't load object for type annotation {context.dagster_type.typing_type}")
j
Cool, let me have a look and see what I get rewriting this to handle the multi-run.
d
It would be great if you could contribute the partitions range run logic to the
UPathIOManager
b
Hi Daniel, you have used
ds.dataset
in your code, what is the
ds
alias to ?
d
import pyarrow.dataset as ds
b
ah .. okay Thanks
d
this allows reading not only single parquet files but also for example partitioned datasets produced by spark
b
just out of curiosity, so this is compatible with partitions on parquet file
👍 1
if yes then just wondering why you said “It would be great if you could contribute the partitions range run logic to the
UPathIOManager
d
So the partitions range run is a new thing in Dagster. I don't use it myself. This is when you combine all backfill runs into a single run. There is a checkbox for it in Dagit. By supporting partitions I mean supporting loading multiple partitions with partitions mapping. This is for situations when for example a daily asset depends on 24 partitions of an upstream hourly asset. So just a single run for this asset would require loading multiple partitions.
UPathIOManager
supports exactly this. The partitions would be then returned in a dictionary with partition_key->obj mapping.
b
Oh okay, got it
d
And it obviously supports normal partitioned assets without mappings
When a daily asset depends on another daily asset
b
Just yesterday I was wondering how can I re-use my kuberntes pod to do more work instead of having more pods generated.. since I already have spare CPU.. this answers my question, do you by any chance recall what that feature is called..
d
I think it's called partitions range run. Here is what comes up in Dagit. I don't think there are a lot of docs about this available right now. I also don't see how this will help you in kubernetes, you will still use the same compute. Also, at best you would be able to run the same thing in parallel (by writing some parallel code and using more CPUs for your single pod), otherwise you will run it sequentially and it will take more time than launching multiple pods at the same time. Partitions range run can help when the compute happens outside, can be stretched - like spark, big query, etc, and when running all the partitions at the same time can provide some sort of optimization over running them individually in parallel jobs (I can't provide any examples right now). Like I said, I'm not using it personally.
b
Ooh.. Interesting.. Thank you Daniel
👍 1
j
I am playing with it now and a multi-run over 10k extract load operations has a lot of overhead in the spin-up of a run, where if I can get a range run to process, it'll be to load the entire range of datasets in a single run, with controlled parallelism or even just a looped iteration and not add the extra latency of the spin-up for each run to the time to process. Though I imagine for a use case like this, I could have a job for an entire one time load, and then partitioned job take over after that time period but I'm just exploring right now.
👍 1
b
One last thing @Daniel Gafni would you mind sharing your
@resource
definition for the above Polars IO Manager ?
d
Copy code
from upath import UPath

@io_manager(config_schema={"base_path": Field(str, is_required=False)})
def polars_parquet_io_manager(
    init_context: InitResourceContext,
) -> PolarsParquetIOManager:
    assert init_context.instance is not None
    base_path = UPath(init_context.resource_config.get("base_path", init_context.instance.storage_directory()))
    return PolarsParquetIOManager(base_path=base_path)
base_path
can be a remote storage uri like
<s3://bucket/directory>
b
wonderful so universal-pathlib takes in default aws creds to connect
d
yes