Hello, Is there a way to map static partitions to...
# ask-community
p
Hello, Is there a way to map static partitions to “entire” assets? We have schematically the following diagram (attached picture) 3 assets are computed and share the same data definition. We need to process these assets downstream in an identical (but independent fashion). In practice this “downstream asset” is actually a set of dbt models, therefore it would be complicated to split this downstream assets into individual assets for each upstream asset and it’s easier for us to map partitions to variables Therefore we are trying to create a mapping of partition to asset. I’ve managed to implement this with the following hack but it’s not ideal as it forces a single value partition definition in each upstream solely for the sake of mapping it to a downstream partition:
Copy code
supply_chains_partition = StaticPartitionsDefinition(
    ["supply_chain_1", "supply_chain_2", "supply_chain_3"]
)

sc1_partition = StaticPartitionsDefinition(["supply_chain_1"])
sc2_partition = StaticPartitionsDefinition(["supply_chain_2"])
sc3_partition = StaticPartitionsDefinition(["supply_chain_3"])

@asset(partitions_def=sc1_partition)
def sc1():
    return "supply_chain_1_value"

@asset(partitions_def=sc2_partition)
def sc2():
    return "supply_chain_2_value"

@asset(partitions_def=sc3_partition)
def sc3():
    return "supply_chain_3_value"

@asset(
    partitions_def=supply_chains_partition,
    ins={
        "sc1": AssetIn(
            "sc1",
            partition_mapping=StaticPartitionMapping(
                {"supply_chain_1": "supply_chain_1"}
            ),
        ),
        "sc2": AssetIn(
            "sc2",
            partition_mapping=StaticPartitionMapping(
                {"supply_chain_2": "supply_chain_2"}
            ),
        ),
        "sc3": AssetIn(
            "sc3",
            partition_mapping=StaticPartitionMapping(
                {"supply_chain_3": "supply_chain_3"}
            ),
        ),
    },
)
def fan_in(context, sc1, sc2, sc3):
    if context.partition_key == "supply_chain_1":
        return sc1
    elif context.partition_key == "supply_chain_2":
        return sc2
    elif context.partition_key == "supply_chain_3":
        return sc3

@asset(
    partitions_def=supply_chains_partition,
)
def common_downstream_asset(context, fan_in):
    <http://context.log.info|context.log.info>(f"fan_in: {fan_in}")
    return "downstream"
Did anybody encounter a similar situation and find a good solution? Is there something obvious we are doing wrong we should change instead? Thanks a lot 🙂
d
Hi @Pierre Cadman! We want to achieve a similar thing, as we do think it is part of normal DWH modeling that at some point you might combine the partition-definitions of multiple upstream assets. We created an issue for that, would be great to have you comment your use case as well and bring some attention to it: https://github.com/dagster-io/dagster/issues/19789
👀 1
i
You could do this by using MultiPartitionsDefinition, can't you?
p
Hum do you mean having the downstream have a multi partitions def over the upstreams? wouldn’t that end up making the cross product of all the upstream partitions
i
Yeah, but I'm considering you are using entity | date partition
you would have 1 asset that would process all your entities for a given date
p
hum actually our partitions upstream can be varied, and usually correspond to individual files. The downstream asset needs to load the entire upstream asset (1 2 or 3) and process it in one go, for example for partition1 it needs to read the entirety of asset 1, for partition 2 the entirety of partition 2, etc What we’re trying to do is find a mapping that says “partition 1 corresponds to all of asset 1”
i
I understand, so you can take a look at this class MultiToSingleDimensionPartitionMapping or https://dagster.slack.com/archives/C066HKS7EG1/p1709303806704509?thread_ts=1709232738.519749&amp;cid=C066HKS7EG1
The last resource you can use is a CustomPartitionMapping
p
CustomPartitionMapping would be ideal, it doesn’t seem like it’s implemented yet though 😢
Thanks for your inputs however @Ismael Rodrigues 🙏
i
You implement the custompartitionmapping, Pierre, shaushua, I forgot to say
#ask-ai to create an example for you, but don't trust it, because it usually generates with the wrong types, lmao, but you'll extend a PartitionMappring abstract class and implement its methods
p
From the docs it seems using a partition mapping beyond what dagster provides isn’t supported yet :
Copy code
Overriding PartitionMapping outside of Dagster is not supported. The abstract methods of this class may change at any time.
oh wow ok!
I’ll give that a shot
i
Do a basic example then, if you get this error, then it's up to the Dagster Team