https://dagster.io/ logo
Title
o

Oliver

11/23/2022, 1:23 AM
Hi all 🙂 having a go with multidimension partitions, I'm a little lost on how I can achieve something. I have two assets
partition_def = StaticPartitionDefinition(['0','1','2'])
@asset(
partition_def=partition_def
)
def a(): return 'a'

@asset(
partition_def=partition_def
)
def b(): return 'b'
now I want to introduce a third asset c that produces a partition for the cross product of the other two. so
cross_part = MultiPartitionsDefinition({
    'a': partition_def,
    'b': partition_def
})
@asset(
partition_def=partition_def
)
def c(a, b): return a+b
I guess I need to implement two `PartitionMapping`s? one which takes the first part of the multikey and the other that takes the second part and then supply like
partition_mapping={'a': APartitionMapping(), 'b': BPartitionMapping()}
- is that the correct approach?
this was my solution, any thoughts?
DELIMITER = '|'
class PartMap(PartitionMapping):

    def __init__(self, which):
        self.which = which

    def get_downstream_partitions_for_partition_range(self, upstream_partition_key_range: PartitionKeyRange, downstream_partitions_def: Optional[PartitionsDefinition], upstream_partitions_def: PartitionsDefinition) -> PartitionKeyRange:
        raise NotImplementedError()
        return upstream_partition_key_range
    
    def get_upstream_partitions_for_partition_range(self, downstream_partition_key_range: Optional[PartitionKeyRange], downstream_partitions_def: Optional[PartitionsDefinition], upstream_partitions_def: PartitionsDefinition) -> PartitionKeyRange:
        
        # assuming only single value
        assert downstream_partition_key_range.start==downstream_partition_key_range.end
        multi_partition_key = downstream_partition_key_range.start
        multi_partition_dimensions = list(map(lambda x: x.name, downstream_partitions_def.partitions_defs))

        part_key_idx = multi_partition_dimensions.index(self.which)
        this_partition_key = multi_partition_key.split(DELIMITER)[part_key_idx]

        return PartitionKeyRange(this_partition_key, this_partition_key)
s

sandy

11/23/2022, 2:33 AM
@claire - does this look right to you?
c

claire

11/23/2022, 6:16 PM
Hi Oliver, yep, that looks right! Though context methods and partition key ranges return multi-dimensional partition keys as a
MultiPartitionKey
object (which subclasses the str class). So you can actually do something like this:
class PartMap(PartitionMapping):
    def __init__(self, partition_dimension):
        self.partition_dimension = partition_dimension

    def get_downstream_partitions_for_partition_range(
        self,
        upstream_partition_key_range: PartitionKeyRange,
        downstream_partitions_def: Optional[PartitionsDefinition],
        upstream_partitions_def: PartitionsDefinition,
    ) -> PartitionKeyRange:
        raise NotImplementedError()
        return upstream_partition_key_range

    def get_upstream_partitions_for_partition_range(
        self,
        downstream_partition_key_range: Optional[PartitionKeyRange],
        downstream_partitions_def: Optional[PartitionsDefinition],
        upstream_partitions_def: PartitionsDefinition,
    ) -> PartitionKeyRange:

        # assuming only single value
        assert downstream_partition_key_range.start == downstream_partition_key_range.end
        multi_partition_key = downstream_partition_key_range.start
        upstream_partition_key = multi_partition_key.keys_by_dimension.get(self.partition_dimension)
        return PartitionKeyRange(upstream_partition_key, upstream_partition_key)
which might clean the code up a bit
:ty-spinny: 1
s

sandy

11/23/2022, 8:05 PM
I filed an issue to track adding a built-in version of this: https://github.com/dagster-io/dagster/issues/10722
:rainbow-daggy: 1
o

Oliver

11/24/2022, 4:07 AM
awesome, thanks 🙂