https://dagster.io/ logo
#ask-community
Title
# ask-community
j

John Smith

06/16/2023, 9:48 AM
Hi, could you provide an example code snippet for: defining a downstream partitioned asset which depends on 2 upstream assets of different partitions and outputs a cross product of the 2 upstream partitions please? the dependencies should be respected at the partition level.
j

John Smith

06/16/2023, 12:53 PM
apologies, its not clear to me from the docs how I would wire the 2 upstream dependencies.
o

owen

06/16/2023, 9:14 PM
Hi @John Smith! Justin is correct that the MultiPartitionsDefinition is the right tool for the job here, and the dependencies work similarly to other assets. i.e. if you have
Copy code
partitions_a = ...
partitions_b = ...

@asset(partitions_def=partitions_a)
def a():
    ...

@asset(partitions_def=partitions_b)
def b():
    ...

@asset(
    partitions_def=MultiPartitionsDefinition(
        {"a": partitions_a, "b": partitions_b},
    ),
)
def aXb(a, b):
    ...
then
aXb
will have one partition for each combination of partitions_a and partitions_b, and any one execution of aXb will target a single one of those combinations
🙏 1
j

John Smith

06/29/2023, 10:32 PM
Thanks for the example snippet, As a follow-up question are any of these partitioned dag definitions valid? especially in the context of eager materialization 1. define upstream without downstream:
Copy code
@asset(partitions_def=partitions_a)
def a():
    ...

@asset(partitions_def=partitions_b)
def b():
    ...

@asset(}
def aXb(a, b):
does this mean any updated upstream partition will trigger aXb essentially making aXb the product of latest a and latest b? 2. downstream without upstream
Copy code
@asset()
def a():
    ...

@asset()
def b():
    ...

@asset(
    partitions_def=MultiPartitionsDefinition(
        {"a": partitions_a, "b": partitions_b},
    ),
)
def aXb(a, b):
does this mean change in a or b will cause all partitions in aXb to rematerialize? 3. More than two upstream partitions. but not including the 3rd partition in the downstream output
Copy code
@asset(partitions_def=partitions_a)
def a():
    ...

@asset(partitions_def=partitions_b)
def b():
    ...

@asset(partitions_def=partitions_c)
def c():
    ...

@asset(
    partitions_def=MultiPartitionsDefinition(
        {"a": partitions_a, "b": partitions_b}, #but not c
    ),
)
def aXbXc(a, b, c):
None of them seem to throw an except upon loading the code location, but are they valid?
2 Views