https://dagster.io/ logo
#ask-community
Title
# ask-community
p

Peter Davidson

02/16/2023, 7:03 AM
is there a trick to override the partition of an output? --- Case I have: 1. At the end of the year (month =12) produce a file for each of the next 12 months 2. Every month the next year, refer to the file produced at the end of previous year, --- Pattern ideas: • Have a funtion that writes to file (step1) using specific file path logic to put it in the target folder for the next year • Then in step 2 can define a source asset with standard file path logic (io manager) to read the file If you can override the partition key of outputs, then it could be simply a factory pattern of assets x in range(12), setting the output partition = 2023-x-01, and use a standard iomanager. Maybe there are other solutions to this, e.g. partition mappings (but would need a custom mapping to map from current month to previous year end).
o

owen

02/16/2023, 10:45 PM
hi @Peter Davidson! I think partition mappings are the answer here actually -- the TimeWindowPartitionMapping allows you to specify a start/end offset, which sounds like exactly what you'd want here (e.g. setting both to -12 would tell the downstream asset for month Jan 2023 to read from the upstream asset's Jan 2022 partition)
actually, looking further, I don't think you would even need a partition mapping, as you still want to map (e.g.) Jan 2023 to Jan 2023. You just need to make sure that the partitions def of step1 has the ability to write files for future partitions. You can do that with the
end_offset
parameter:
Copy code
@asset(partitions_def=MonthlyPartitionsDefinition(start_date=..., end_offset=12))
def step1():
    ...

@asset(partitions_def=MonthlyPartitionsDefinition(start_date=...))
def step2(step1):
    ...
this way, at the beginning of the year, step1 will have partitions available for the next 12 months, so it can write out all of those partitions. then, those partitions will be available to read from in step2 each month
🌈 1
p

Peter Davidson

02/17/2023, 6:55 AM
ok, so the offset isn't fixed. For example: • Jan 2023 looks back to Dec 2022 • Feb 2023 also looks back to Dec 2022 • .... • Dec 2023 looks back to Dec 2022 • Jan 2024 looks to Dec 2023 • ... So it is a bit of a dynamic offset. (Context is financial reporting, looking at previous year end)
Writing future partitions is an excellent trick though, gonna investigate this
So I still think there is a mapping needed -> As at December, there are a lot of upstream assets (all available at December) which contribute to the generation of the next 12 months (summarized into a single asset at end Dec)
Copy code
@asset(partitions_def=MonthlyPartitionsDefinition(start_date=...)
def step0():
   """ Summarize all the year end informantion """    
   ...

@asset(partitions_def=MonthlyPartitionsDefinition(start_date=..., end_offset=12))
def step1(step0):
   """ Produce a file for each of the following 12 months based on December information """
   ...

@asset(partitions_def=MonthlyPartitionsDefinition(start_date=...))
def step2(step1):
   """ Do the monthly reports using information produced at previous year end """
    ...
One idea would be to use LastPartitionMapping() in step 1 -> but this fails since Jan 2023 does exist for step0 (even if never materialized) If there was an AnnualPartitionsDefinition this would work (I opend a feature request for this: https://github.com/dagster-io/dagster/issues/12415). --- I also have the complication of using MultiPartitionDefinitions (static & monthly))
So, where I gave got to now is to build a custom partition mapping which refers to an earlier version of an asset:
Copy code
from datetime import date
from typing import Optional


import dagster._check as check
from dagster._annotations import experimental
from dagster._core.definitions.partition import PartitionsDefinition, PartitionsSubset
from dagster._core.definitions.partition_key_range import PartitionKeyRange
from dagster._core.definitions.partition_mapping import PartitionMapping
from dagster._core.instance import DynamicPartitionsStore


from dagster import (
    MultiPartitionKey,
    MultiPartitionsDefinition,
MultiDependencyDefinition
)

@experimental
class MultiDimensionPriorPeriodMapping(PartitionMapping):
    """
    Defines correspondence between an asset at a different time period.
    Use case:
    - refer to an asset at previous year end.
    Args:
        prior_period (str): The name of the previous period to map to,e.g. year_end.
    """

    def __init__(self, prior_period: str):
        self.prior_period = prior_period
 
    def get_upstream_partitions_for_partition_range(
        self,
        downstream_partition_key_range: Optional[PartitionKeyRange],
        downstream_partitions_def: Optional[PartitionsDefinition],
        upstream_partitions_def: PartitionsDefinition,
    ) -> PartitionKeyRange:

        raise NotImplementedError()

    def get_downstream_partitions_for_partition_range(
        self,
        upstream_partition_key_range: PartitionKeyRange,
        downstream_partitions_def: Optional[PartitionsDefinition],
        upstream_partitions_def: PartitionsDefinition,
    ) -> PartitionKeyRange:
        raise NotImplementedError()

    def get_upstream_partitions_for_partitions(
        self,
        downstream_partitions_subset: Optional[PartitionsSubset],
        upstream_partitions_def: MultiPartitionsDefinition,
            dynamic_partitions_store: Optional[DynamicPartitionsStore] = None,

    ) -> PartitionsSubset:

        upstream_partitions = set()

        for partition_key in downstream_partitions_subset.get_partition_keys():
            if not isinstance(partition_key, MultiPartitionKey):
                check.failed(
                    "Partition keys in downstream partition key subset must be MultiPartitionKeys"
                )
            # rep_date, scenario_key = partition_key.split("|")
            rep_date = partition_key.keys_by_dimension['rep_date']
            rep_date = part_key_to_rep_date(rep_date)
            # Use 1st of month according to dagster logic for partition key
            if self.prior_period == "year_end":
                previous_year_end = date(rep_date.year - 1, 12, 1)
                previous_year_end = previous_year_end.strftime("%Y-%m-%d")
                upstream_key = MultiPartitionKey({"rep_date": previous_year_end, "scenario_key":  partition_key.keys_by_dimension['scenario_key']})
            else:
                raise NotImplementedError("Only prior year end mapping defined")
            # upstream_partitions.add(partition_key.keys_by_dimension[self.partition_dimension_name])
            upstream_partitions.add(upstream_key)
        return upstream_partitions_def.empty_subset().with_partition_keys(upstream_partitions)
        # return upstream_partitions_def

    def get_downstream_partitions_for_partitions(
        self,
        upstream_partitions_subset: PartitionsSubset,
        downstream_partitions_def: PartitionsDefinition,
            dynamic_partitions_store: Optional[DynamicPartitionsStore] = None,

    ) -> PartitionsSubset:

        downstream_partitions = set()

        for partition_key in upstream_partitions_subset.get_partition_keys():
            if not isinstance(partition_key, MultiPartitionKey):
                check.failed(
                    "Partition keys in downstream partition key subset must be MultiPartitionKeys"
                )
            rep_date = partition_key.keys_by_dimension['rep_date']
            rep_date = part_key_to_rep_date(rep_date)
            # Use 1st of month according to dagster logic for partition key
            if self.prior_period == "year_end":
                if rep_date.month == 12:
                    # Only december has donstream dependedncies
                    # Every month in next 12 months depends on the year end
                    for next_month in range(1,13):
                        downstream_date = date(rep_date.year + 1, next_month, 1)
                        downstream_date = downstream_date.strftime("%Y-%m-%d")
                        downstream_key = MultiPartitionKey(
                            {"rep_date": downstream_date, "scenario_key": partition_key.keys_by_dimension['scenario_key']})
                        downstream_partitions.add(downstream_key)
                else:
                    pass
            else:
                raise NotImplementedError("Only prior year end mapping defined")

        return downstream_partitions_def.empty_subset().with_partition_keys(downstream_partitions)
@Alec Koumjian this
4 Views