Eric Marriott
08/03/2023, 4:41 AMTim Castillo
08/03/2023, 2:11 PMEric Marriott
08/03/2023, 3:01 PMpartitions_def_a = DynamicPartitionsDefinition(name="a")
partitions_def_b = DynamicPartitionsDefinition(name="b")
@asset(
partitions_def=MultiPartitionsDefinition(
{
"a": full_table_settings_partitions_def,
"b": filtering_partitions_def,
}
),
)
def upstream(context):
#...
pass
@asset(
partitions_def=MultiPartitionsDefinition(
{
"a": partitions_def_a,
"b": partitions_def_b,
}
),
)
def downstream_a(context, upstream) :
# ...
pass
@asset(
partitions_def=partitions_def_a,
ins={
"upstream": AssetIn(
partition_mapping=MultiPartitionMapping(
# depends partitions_def_a
# but only on 1 partition of partitions_def_b
)
)
}
)
def downstream_b(upstream):
# ...
pass
Eric Marriott
08/03/2023, 3:01 PMowen
08/03/2023, 6:23 PMfrom dagster import (
DynamicPartitionsDefinition,
MultiPartitionsDefinition,
asset,
AssetIn,
MultiPartitionMapping,
)
from dagster._core.definitions.partition import StaticPartitionsDefinition
from dagster._core.definitions.partition_mapping import (
DimensionPartitionMapping,
IdentityPartitionMapping,
SpecificPartitionsPartitionMapping,
)
partitions_def_a = DynamicPartitionsDefinition(name="a")
partitions_def_b = DynamicPartitionsDefinition(name="b")
@asset(
partitions_def=MultiPartitionsDefinition(
{
"a": partitions_def_a,
"b": partitions_def_b,
}
),
)
def upstream(context):
# ...
pass
@asset(
partitions_def=MultiPartitionsDefinition(
{
"a": partitions_def_a,
"b": partitions_def_b,
}
),
)
def downstream_a(context, upstream):
# ...
pass
@asset(
partitions_def=MultiPartitionsDefinition(
{
"a": partitions_def_a,
"b": StaticPartitionsDefinition(["b1"]),
}
),
ins={
"upstream": AssetIn(
partition_mapping=MultiPartitionMapping(
downstream_mappings_by_upstream_dimension={
"a": DimensionPartitionMapping("a", IdentityPartitionMapping()),
"b": DimensionPartitionMapping("b", SpecificPartitionsPartitionMapping(["b1"])),
}
)
)
},
)
def downstream_b(upstream):
# ...
pass
Eric Marriott
08/03/2023, 7:21 PM