https://dagster.io/ logo
#ask-community
Title
# ask-community
d

Daniel Mosesson

02/28/2023, 8:59 AM
How can I have a Daily partitioned asset depend on the previous day's hourly partitioned asset?
s

sandy

02/28/2023, 4:47 PM
Alas, this isn't possible yet. A little while back, a user (@Daniel Gafni) was investigating making this possible : https://github.com/dagster-io/dagster/pull/10190
d

Daniel Gafni

02/28/2023, 4:49 PM
I mean it’s completely possible by writing a custom partition mapping. Dagster just doesn’t provide such mapping right now.
d

Daniel Mosesson

02/28/2023, 4:52 PM
How do I do that?
d

Daniel Gafni

02/28/2023, 5:02 PM
I was not entirely clear. 1. Right now the default partition mapping would map the upstream 24 hourly partitions into a single daily downstream partition. You don't need to write any special code on the asset level to get that. But your IOManager would have to support loading multiple partitions. Dagster's default IO Manager has support for that (and any IOManager built on top of the UPathIOManager). An example can be found here. 2. If you need a more complex mapping, you would have to write a custom class for it. You do it by inheriting from the base class and implementing
get_downstream_partitions_for_partition_range
(I never had to use this one, so the implementation can be just anything) and
get_upstream_partitions_for_partition_range
(this one is important and actually does the mapping). Here is my example which is mapping N upstream days indo a single downstream day.
Copy code
class NDaysPartitionMapping(PartitionMapping):
    def __init__(self, days: int, offset: int = 0):
        self.days = days
        self.offset = offset

    def get_downstream_partitions_for_partition_range(
        self,
        upstream_partition_key_range: PartitionKeyRange,
        downstream_partitions_def: PartitionsDefinition | None,
        upstream_partitions_def: PartitionsDefinition,
    ) -> PartitionKeyRange:
        assert isinstance(upstream_partitions_def, DailyPartitionsDefinition)
        assert downstream_partitions_def is not None  # mypy

        return upstream_partition_key_range

    def get_upstream_partitions_for_partition_range(
        self,
        downstream_partition_key_range: PartitionKeyRange | None,
        downstream_partitions_def: PartitionsDefinition | None,
        upstream_partitions_def: PartitionsDefinition,
    ) -> PartitionKeyRange:
        assert isinstance(downstream_partitions_def, TimeWindowPartitionsDefinition)
        assert isinstance(upstream_partitions_def, TimeWindowPartitionsDefinition)
        assert downstream_partition_key_range is not None  # mypy
        assert downstream_partitions_def is not None  # mypy

        mapped_range = PartitionKeyRange(
            start=(
                datetime.strptime(downstream_partition_key_range.start, downstream_partitions_def.fmt)
                - timedelta(days=self.days)
                - timedelta(days=self.offset)
            ).strftime(upstream_partitions_def.fmt),
            end=(
                datetime.strptime(downstream_partition_key_range.end, downstream_partitions_def.fmt)
                - timedelta(days=self.offset)
            ).strftime(upstream_partitions_def.fmt),
        )
        return mapped_range
d

Daniel Mosesson

02/28/2023, 5:14 PM
Got it, so its really explicit
ins
+
AssetIn(partition_mapping
to make it work?
d

Daniel Gafni

02/28/2023, 5:15 PM
Yes. Sorry, I forgot to mention it lol
d

Daniel Mosesson

02/28/2023, 5:15 PM
perfect, thank you
4 Views