Peter Davidson
02/16/2023, 7:03 AMowen
02/16/2023, 10:45 PMowen
02/16/2023, 10:52 PMend_offset
parameter:
@asset(partitions_def=MonthlyPartitionsDefinition(start_date=..., end_offset=12))
def step1():
...
@asset(partitions_def=MonthlyPartitionsDefinition(start_date=...))
def step2(step1):
...
this way, at the beginning of the year, step1 will have partitions available for the next 12 months, so it can write out all of those partitions. then, those partitions will be available to read from in step2 each monthPeter Davidson
02/17/2023, 6:55 AMPeter Davidson
02/17/2023, 6:57 AMPeter Davidson
02/17/2023, 7:15 AM@asset(partitions_def=MonthlyPartitionsDefinition(start_date=...)
def step0():
""" Summarize all the year end informantion """
...
@asset(partitions_def=MonthlyPartitionsDefinition(start_date=..., end_offset=12))
def step1(step0):
""" Produce a file for each of the following 12 months based on December information """
...
@asset(partitions_def=MonthlyPartitionsDefinition(start_date=...))
def step2(step1):
""" Do the monthly reports using information produced at previous year end """
...
One idea would be to use LastPartitionMapping() in step 1 -> but this fails since Jan 2023 does exist for step0 (even if never materialized)
If there was an AnnualPartitionsDefinition this would work (I opend a feature request for this: https://github.com/dagster-io/dagster/issues/12415).
---
I also have the complication of using MultiPartitionDefinitions (static & monthly))Peter Davidson
02/27/2023, 9:51 AMfrom datetime import date
from typing import Optional
import dagster._check as check
from dagster._annotations import experimental
from dagster._core.definitions.partition import PartitionsDefinition, PartitionsSubset
from dagster._core.definitions.partition_key_range import PartitionKeyRange
from dagster._core.definitions.partition_mapping import PartitionMapping
from dagster._core.instance import DynamicPartitionsStore
from dagster import (
MultiPartitionKey,
MultiPartitionsDefinition,
MultiDependencyDefinition
)
@experimental
class MultiDimensionPriorPeriodMapping(PartitionMapping):
"""
Defines correspondence between an asset at a different time period.
Use case:
- refer to an asset at previous year end.
Args:
prior_period (str): The name of the previous period to map to,e.g. year_end.
"""
def __init__(self, prior_period: str):
self.prior_period = prior_period
def get_upstream_partitions_for_partition_range(
self,
downstream_partition_key_range: Optional[PartitionKeyRange],
downstream_partitions_def: Optional[PartitionsDefinition],
upstream_partitions_def: PartitionsDefinition,
) -> PartitionKeyRange:
raise NotImplementedError()
def get_downstream_partitions_for_partition_range(
self,
upstream_partition_key_range: PartitionKeyRange,
downstream_partitions_def: Optional[PartitionsDefinition],
upstream_partitions_def: PartitionsDefinition,
) -> PartitionKeyRange:
raise NotImplementedError()
def get_upstream_partitions_for_partitions(
self,
downstream_partitions_subset: Optional[PartitionsSubset],
upstream_partitions_def: MultiPartitionsDefinition,
dynamic_partitions_store: Optional[DynamicPartitionsStore] = None,
) -> PartitionsSubset:
upstream_partitions = set()
for partition_key in downstream_partitions_subset.get_partition_keys():
if not isinstance(partition_key, MultiPartitionKey):
check.failed(
"Partition keys in downstream partition key subset must be MultiPartitionKeys"
)
# rep_date, scenario_key = partition_key.split("|")
rep_date = partition_key.keys_by_dimension['rep_date']
rep_date = part_key_to_rep_date(rep_date)
# Use 1st of month according to dagster logic for partition key
if self.prior_period == "year_end":
previous_year_end = date(rep_date.year - 1, 12, 1)
previous_year_end = previous_year_end.strftime("%Y-%m-%d")
upstream_key = MultiPartitionKey({"rep_date": previous_year_end, "scenario_key": partition_key.keys_by_dimension['scenario_key']})
else:
raise NotImplementedError("Only prior year end mapping defined")
# upstream_partitions.add(partition_key.keys_by_dimension[self.partition_dimension_name])
upstream_partitions.add(upstream_key)
return upstream_partitions_def.empty_subset().with_partition_keys(upstream_partitions)
# return upstream_partitions_def
def get_downstream_partitions_for_partitions(
self,
upstream_partitions_subset: PartitionsSubset,
downstream_partitions_def: PartitionsDefinition,
dynamic_partitions_store: Optional[DynamicPartitionsStore] = None,
) -> PartitionsSubset:
downstream_partitions = set()
for partition_key in upstream_partitions_subset.get_partition_keys():
if not isinstance(partition_key, MultiPartitionKey):
check.failed(
"Partition keys in downstream partition key subset must be MultiPartitionKeys"
)
rep_date = partition_key.keys_by_dimension['rep_date']
rep_date = part_key_to_rep_date(rep_date)
# Use 1st of month according to dagster logic for partition key
if self.prior_period == "year_end":
if rep_date.month == 12:
# Only december has donstream dependedncies
# Every month in next 12 months depends on the year end
for next_month in range(1,13):
downstream_date = date(rep_date.year + 1, next_month, 1)
downstream_date = downstream_date.strftime("%Y-%m-%d")
downstream_key = MultiPartitionKey(
{"rep_date": downstream_date, "scenario_key": partition_key.keys_by_dimension['scenario_key']})
downstream_partitions.add(downstream_key)
else:
pass
else:
raise NotImplementedError("Only prior year end mapping defined")
return downstream_partitions_def.empty_subset().with_partition_keys(downstream_partitions)
Peter Davidson
03/01/2023, 8:13 AM