https://dagster.io/ logo
Title
j

James Hale

10/28/2022, 8:20 PM
Is there a way to configure a sliding window for materializing partitioned assets? E.g., if I have a daily partition, can I set up a schedule to materialize the last 3 days every day?
d

Daniel Gafni

10/29/2022, 5:46 PM
You can create a downstream asset and use PartitionMapping to make the sliding window you need
j

James Hale

11/04/2022, 3:20 AM
@Daniel Gafni could you provide a conceptual example? I'm not sure what you mean.
d

Daniel Gafni

11/04/2022, 3:18 PM
The main ideas are: 1. Your IOManager has to be able to handle loading multiple upstream partitions (Dagster 1.0.16 just got
UPathIOManager
which can do it for filesystem data, it's easy to subclass it) 2. You have to define a custom
PartitionMapping
class 3. You need to define an upstream asset which would be daily partitioned 4. You need to define a downstream asset which would be daily partitioned and use your PartitionMapping class to load 3 partitions from the upstream asset. Here is a piece of my old code. You should make something similar.
from dagster.core.definitions.partition_key_range import PartitionKeyRange
from dagster.core.definitions.partition_mapping import PartitionMapping

class NDaysPartitionMapping(PartitionMapping):
    def __init__(self, days: int, offset: int = 0):
        self.days = days
        self.offset = offset

    def get_downstream_partitions_for_partition_range(
        self,
        upstream_partition_key_range: PartitionKeyRange,
        downstream_partitions_def: PartitionsDefinition,
        upstream_partitions_def: PartitionsDefinition,
    ) -> PartitionKeyRange:
        assert isinstance(upstream_partitions_def, DailyPartitionsDefinition)

        return upstream_partition_key_range

    def get_upstream_partitions_for_partition_range(
        self,
        downstream_partition_key_range: PartitionKeyRange,
        downstream_partitions_def: PartitionsDefinition,  # pylint: disable=unused-argument
        upstream_partitions_def: PartitionsDefinition,  # pylint: disable=unused-argument
    ) -> PartitionKeyRange:
        mapped_range = PartitionKeyRange(
            start=(
                datetime.strptime(downstream_partition_key_range.start, "%Y-%m-%d")
                - timedelta(days=self.days)
                - timedelta(days=self.offset)
            ).strftime("%Y-%m-%d"),
            end=(
                datetime.strptime(downstream_partition_key_range.end, "%Y-%m-%d") - timedelta(days=self.offset)
            ).strftime("%Y-%m-%d"),
        )
        return mapped_range

raw_partitions = DailyPartitionsDefinition(start_date=str(raw_start_date), fmt="%Y-%m-%d", timezone=timezone)


@asset(
    partitions_def=raw_partitions,
    io_manager_key="parquet_io_manager",
    required_resource_keys={"parquet_io_manager"},
)
def upstream(context: OpExecutionContext) -> pd.DataFrame:
    return pd.DataFrame(your_data)

@asset(
    partitions_def=raw_partitions,
    partition_mappings={"upstream": NDaysPartitionMapping(days=3)},
    io_manager_key="parquet_io_manager",
    required_resource_keys={"parquet_io_manager"},
)
def downstream(context: OpExecutionContext, upstream: List[pd.DataFrame]) -> pd.DataFrame:
    return pd.concat(upstream)
Regarding a partitioned IOManager, here is how easy it's to have one for
json
with the
UPathIOManager
(it's a little different from my code above because it output's a dict of partitions, not a list):
from dagster._core.storage.upath_io_manager import UPathIOManager


class JSONIOManager(UPathIOManager):
        extension: str = ".json"

        def dump_to_path(self, context: OutputContext, obj: Any, path: UPath):
            with path.open("w") as file:
                json.dump(obj, file)
                # file.write(json.dumps(obj).encode())

        def load_from_path(self, context: InputContext, path: UPath) -> Any:
            with path.open("r") as file:
                return json.load(file)
n

nickvazz

12/09/2022, 1:02 AM
Hi @Daniel Gafni, how does https://dagster.slack.com/archives/C01U954MEER/p1667575276984239?thread_ts=1666988436.894189&cid=C01U954MEER save partitions? I seems like its taking only the asset keys into consideration
d

Daniel Gafni

12/09/2022, 7:35 AM
It uses the asset key and the partition key. What else would you expect it to use?
n

nickvazz

12/09/2022, 4:55 PM
It doesnt seem to write out the partition key, should it?
d

Daniel Gafni

12/09/2022, 5:16 PM
The partition_key is part of the file name. You can check out the exact code here
n

nickvazz

12/09/2022, 5:19 PM
Maybe it is because I am trying to use
build_context
etc to generate/debug still, will report back!