https://dagster.io/ logo
#ask-community
Title
# ask-community
e

Eric Marriott

08/03/2023, 4:41 AM
Is it possible to create a PartitionMapping for a downstream asset referencing a DynamicPartitionDefinition inside a MultiPartitionDefinition of an upstream asset?
t

Tim Castillo

08/03/2023, 2:11 PM
Hmm, not too familiar with deconstructing multipartitions, but if you're depending on one of partition of a multip, is there a particular reason why you need the multipartitioned asset and not directly depend on the dynamically partitioned asset?
e

Eric Marriott

08/03/2023, 3:01 PM
hopefully this makes more sense
Copy code
partitions_def_a = DynamicPartitionsDefinition(name="a")
partitions_def_b = DynamicPartitionsDefinition(name="b")
@asset(
    partitions_def=MultiPartitionsDefinition(
        {
            "a": full_table_settings_partitions_def,
            "b": filtering_partitions_def,
        }
    ),
)
def upstream(context):
    #...
    pass


@asset(
    partitions_def=MultiPartitionsDefinition(
        {
            "a": partitions_def_a,
            "b": partitions_def_b,
        }
    ),
)
def downstream_a(context, upstream) :
    # ...
    pass

@asset(
    partitions_def=partitions_def_a,
    ins={
        "upstream": AssetIn(
            partition_mapping=MultiPartitionMapping(
                # depends partitions_def_a
                # but only on 1 partition of partitions_def_b
            )
        )
    }
)
def downstream_b(upstream):
    # ...
    pass
I feel like I perhaps am missing a core concept
o

owen

08/03/2023, 6:23 PM
hi @Eric Marriott -- I think something like this might be what you're looking for:
Copy code
from dagster import (
    DynamicPartitionsDefinition,
    MultiPartitionsDefinition,
    asset,
    AssetIn,
    MultiPartitionMapping,
)
from dagster._core.definitions.partition import StaticPartitionsDefinition
from dagster._core.definitions.partition_mapping import (
    DimensionPartitionMapping,
    IdentityPartitionMapping,
    SpecificPartitionsPartitionMapping,
)

partitions_def_a = DynamicPartitionsDefinition(name="a")
partitions_def_b = DynamicPartitionsDefinition(name="b")


@asset(
    partitions_def=MultiPartitionsDefinition(
        {
            "a": partitions_def_a,
            "b": partitions_def_b,
        }
    ),
)
def upstream(context):
    # ...
    pass


@asset(
    partitions_def=MultiPartitionsDefinition(
        {
            "a": partitions_def_a,
            "b": partitions_def_b,
        }
    ),
)
def downstream_a(context, upstream):
    # ...
    pass


@asset(
    partitions_def=MultiPartitionsDefinition(
        {
            "a": partitions_def_a,
            "b": StaticPartitionsDefinition(["b1"]),
        }
    ),
    ins={
        "upstream": AssetIn(
            partition_mapping=MultiPartitionMapping(
                downstream_mappings_by_upstream_dimension={
                    "a": DimensionPartitionMapping("a", IdentityPartitionMapping()),
                    "b": DimensionPartitionMapping("b", SpecificPartitionsPartitionMapping(["b1"])),
                }
            )
        )
    },
)
def downstream_b(upstream):
    # ...
    pass
e

Eric Marriott

08/03/2023, 7:21 PM
I'll try, thanks!
5 Views