Oliver
11/23/2022, 1:23 AMpartition_def = StaticPartitionDefinition(['0','1','2'])
@asset(
partition_def=partition_def
)
def a(): return 'a'
@asset(
partition_def=partition_def
)
def b(): return 'b'
now I want to introduce a third asset c that produces a partition for the cross product of the other two. so
cross_part = MultiPartitionsDefinition({
'a': partition_def,
'b': partition_def
})
@asset(
partition_def=partition_def
)
def c(a, b): return a+b
I guess I need to implement two `PartitionMapping`s? one which takes the first part of the multikey and the other that takes the second part and then supply like partition_mapping={'a': APartitionMapping(), 'b': BPartitionMapping()}
- is that the correct approach?DELIMITER = '|'
class PartMap(PartitionMapping):
def __init__(self, which):
self.which = which
def get_downstream_partitions_for_partition_range(self, upstream_partition_key_range: PartitionKeyRange, downstream_partitions_def: Optional[PartitionsDefinition], upstream_partitions_def: PartitionsDefinition) -> PartitionKeyRange:
raise NotImplementedError()
return upstream_partition_key_range
def get_upstream_partitions_for_partition_range(self, downstream_partition_key_range: Optional[PartitionKeyRange], downstream_partitions_def: Optional[PartitionsDefinition], upstream_partitions_def: PartitionsDefinition) -> PartitionKeyRange:
# assuming only single value
assert downstream_partition_key_range.start==downstream_partition_key_range.end
multi_partition_key = downstream_partition_key_range.start
multi_partition_dimensions = list(map(lambda x: x.name, downstream_partitions_def.partitions_defs))
part_key_idx = multi_partition_dimensions.index(self.which)
this_partition_key = multi_partition_key.split(DELIMITER)[part_key_idx]
return PartitionKeyRange(this_partition_key, this_partition_key)
sandy
11/23/2022, 2:33 AMclaire
11/23/2022, 6:16 PMMultiPartitionKey
object (which subclasses the str class). So you can actually do something like this:
class PartMap(PartitionMapping):
def __init__(self, partition_dimension):
self.partition_dimension = partition_dimension
def get_downstream_partitions_for_partition_range(
self,
upstream_partition_key_range: PartitionKeyRange,
downstream_partitions_def: Optional[PartitionsDefinition],
upstream_partitions_def: PartitionsDefinition,
) -> PartitionKeyRange:
raise NotImplementedError()
return upstream_partition_key_range
def get_upstream_partitions_for_partition_range(
self,
downstream_partition_key_range: Optional[PartitionKeyRange],
downstream_partitions_def: Optional[PartitionsDefinition],
upstream_partitions_def: PartitionsDefinition,
) -> PartitionKeyRange:
# assuming only single value
assert downstream_partition_key_range.start == downstream_partition_key_range.end
multi_partition_key = downstream_partition_key_range.start
upstream_partition_key = multi_partition_key.keys_by_dimension.get(self.partition_dimension)
return PartitionKeyRange(upstream_partition_key, upstream_partition_key)
which might clean the code up a bitsandy
11/23/2022, 8:05 PMOliver
11/24/2022, 4:07 AM