https://dagster.io/ logo
Title
j

Jeremy Lyman

04/05/2023, 6:47 PM
I took the PartitionedParquetIOManager and was testing it out with an HourlyPartitioned definition and when I run a backfill as a single job, I just get spammed with
ASSET_MATERIALIZATION - Materialized value component myasset.
I only see it calling
handle_output
on the IO manager once with a range, IO manager has saved it as a single xxxxxxx_xxxxxx.pq with all assets inside of it. But the asset job keeps repeating the debug log (possibly for each of the thousands of partitions). Any thoughts?
:dagster-bot-resolve: 1
d

Daniel Gafni

04/05/2023, 6:50 PM
Where did you get
PartitionedParquetIOManager
from? Is it from the github example? Just interested Also seems like you're materializing a range of partitions in the same run, which is not the same as backfilling with multiple runs (one per partition).
j

Jeremy Lyman

04/05/2023, 6:54 PM
Yes, it's from the project fully featured example.
d

Daniel Gafni

04/05/2023, 6:54 PM
Could you send me the link please?
d

Daniel Gafni

04/05/2023, 6:55 PM
ok so this example is a bit outdated
j

Jeremy Lyman

04/05/2023, 6:55 PM
And, yes, when I go to the UI to materialize, I select to backfill in a single run.
d

Daniel Gafni

04/05/2023, 6:55 PM
just curious, why do you need to do that?
j

Jeremy Lyman

04/05/2023, 6:56 PM
As opposed to multiple runs?
d

Daniel Gafni

04/05/2023, 6:56 PM
Yeah
j

Jeremy Lyman

04/05/2023, 6:57 PM
I have no good reason. It seemed like the right thing to do. I set up my asset to handle a range and then load the data to materialize from the start time (for an hourly partition).
I am just playing with things to see how it works. 😄
@asset(
    io_manager_key="parquet_io_manager",
    key_prefix=["component"],
    metadata={
        "partition_expr": "time",
    },
    partitions_def=hourly_partitions_def,
    required_resource_keys={"ods_client"},
)
def ods_components(context) -> pd.DataFrame:
    """Components from the ODS."""
    start, end = context.asset_partitions_time_window_for_output()
    # partition_date_str = context.asset_partition_key_for_output()
    get_dagster_logger().info(f"Materializing ODS assets between: {start} {end}")
    return context.resources.ods_client.extract_between(start, end)
d

Daniel Gafni

04/05/2023, 6:59 PM
I see. It doesn't seem like your case actually needs the range so I would suggest just using normal runs. You could use the range if your code could handle multiple partitions efficiently (like it's some SQL that doesn't care how many partitions does it work with). Or perhaps if you are using PySpark. Probably with DataFrames you would be safer by launching multiple runs. Less risk to OOM too in case of a lot of data.
j

Jeremy Lyman

04/05/2023, 7:01 PM
Yeah, that's a good consideration.
d

Daniel Gafni

04/05/2023, 7:03 PM
here is my IOManager for parquet (using Polars). It's a bit fancy because it's returning either
pl.DataFrame
or
pl.LazyFrame
based on how did you type the input. It will also work with
gcs
,
s3
, etc. Perhaps you want to write something like this.
class PolarsParquetIOManager(UPathIOManager):
    """
    Will return polars.DataFrame or polars.LazyFrame based on the type hint or dagster_type.typing_type
    """

    extension: str = ".parquet"

    def dump_to_path(self, context: OutputContext, obj: pl.DataFrame, path: UPath):
        with path.open("wb") as file:
            obj.write_parquet(file)

    def load_from_path(self, path: UPath, context: InputContext) -> pl.DataFrame | pl.LazyFrame:
        fs: fsspec.AbstractFileSystem | None = None

        try:
            fs = path._accessor._fs
        except AttributeError:
            pass

        ldf = pl.scan_ds(ds.dataset(str(path), filesystem=fs))

        columns = context.metadata.get("columns")
        if columns is not None:
            context.log.debug(f"Loading {columns=}")
            ldf = ldf.select(columns)

        if context.dagster_type.typing_type in (
            pl.DataFrame,
            Dict[str, pl.DataFrame],
            Mapping[str, pl.DataFrame],
            None,
        ):
            return ldf.collect(streaming=True)
        elif context.dagster_type.typing_type in (
            pl.LazyFrame,
            Dict[str, pl.LazyFrame],
            Mapping[str, pl.LazyFrame],
        ):
            return ldf
        else:
            raise NotImplementedError(f"Can't load object for type annotation {context.dagster_type.typing_type}")
j

Jeremy Lyman

04/05/2023, 7:04 PM
Cool, let me have a look and see what I get rewriting this to handle the multi-run.
d

Daniel Gafni

04/05/2023, 7:05 PM
It would be great if you could contribute the partitions range run logic to the
UPathIOManager
b

Binoy Shah

04/06/2023, 2:23 PM
Hi Daniel, you have used
ds.dataset
in your code, what is the
ds
alias to ?
d

Daniel Gafni

04/06/2023, 2:23 PM
import pyarrow.dataset as ds
b

Binoy Shah

04/06/2023, 2:23 PM
ah .. okay Thanks
d

Daniel Gafni

04/06/2023, 2:23 PM
this allows reading not only single parquet files but also for example partitioned datasets produced by spark
b

Binoy Shah

04/06/2023, 2:24 PM
just out of curiosity, so this is compatible with partitions on parquet file
👍 1
if yes then just wondering why you said “It would be great if you could contribute the partitions range run logic to the
UPathIOManager
”
d

Daniel Gafni

04/06/2023, 2:27 PM
So the partitions range run is a new thing in Dagster. I don't use it myself. This is when you combine all backfill runs into a single run. There is a checkbox for it in Dagit. By supporting partitions I mean supporting loading multiple partitions with partitions mapping. This is for situations when for example a daily asset depends on 24 partitions of an upstream hourly asset. So just a single run for this asset would require loading multiple partitions.
UPathIOManager
supports exactly this. The partitions would be then returned in a dictionary with partition_key->obj mapping.
b

Binoy Shah

04/06/2023, 2:28 PM
Oh okay, got it
d

Daniel Gafni

04/06/2023, 2:28 PM
And it obviously supports normal partitioned assets without mappings
When a daily asset depends on another daily asset
b

Binoy Shah

04/06/2023, 2:29 PM
Just yesterday I was wondering how can I re-use my kuberntes pod to do more work instead of having more pods generated.. since I already have spare CPU.. this answers my question, do you by any chance recall what that feature is called..
d

Daniel Gafni

04/06/2023, 2:36 PM
I think it's called partitions range run. Here is what comes up in Dagit. I don't think there are a lot of docs about this available right now. I also don't see how this will help you in kubernetes, you will still use the same compute. Also, at best you would be able to run the same thing in parallel (by writing some parallel code and using more CPUs for your single pod), otherwise you will run it sequentially and it will take more time than launching multiple pods at the same time. Partitions range run can help when the compute happens outside, can be stretched - like spark, big query, etc, and when running all the partitions at the same time can provide some sort of optimization over running them individually in parallel jobs (I can't provide any examples right now). Like I said, I'm not using it personally.
b

Binoy Shah

04/06/2023, 2:36 PM
Ooh.. Interesting.. Thank you Daniel
👍 1
j

Jeremy Lyman

04/06/2023, 3:11 PM
I am playing with it now and a multi-run over 10k extract load operations has a lot of overhead in the spin-up of a run, where if I can get a range run to process, it'll be to load the entire range of datasets in a single run, with controlled parallelism or even just a looped iteration and not add the extra latency of the spin-up for each run to the time to process. Though I imagine for a use case like this, I could have a job for an entire one time load, and then partitioned job take over after that time period but I'm just exploring right now.
👍 1
b

Binoy Shah

04/06/2023, 7:10 PM
One last thing @Daniel Gafni would you mind sharing your
@resource
definition for the above Polars IO Manager ?
d

Daniel Gafni

04/06/2023, 7:11 PM
from upath import UPath

@io_manager(config_schema={"base_path": Field(str, is_required=False)})
def polars_parquet_io_manager(
    init_context: InitResourceContext,
) -> PolarsParquetIOManager:
    assert init_context.instance is not None
    base_path = UPath(init_context.resource_config.get("base_path", init_context.instance.storage_directory()))
    return PolarsParquetIOManager(base_path=base_path)
base_path
can be a remote storage uri like
<s3://bucket/directory>
b

Binoy Shah

04/06/2023, 7:14 PM
wonderful so universal-pathlib takes in default aws creds to connect
d

Daniel Gafni

04/06/2023, 7:14 PM
yes