Hello All! I am having three jobs Job 1 === multip...
# ask-community
s
Hello All! I am having three jobs Job 1 === multipartition job with partition : part_1, part_2 ..... and partition date .It will execute multiple run based on partition. Job 2 === multipartition job with partition same as Job 1 part_1, part_2 ..... and partition date.It will execute multiple run based on partition and date of Job 1. Job 3 === daily partition job it should execute only once rather than each time for each partition I had created a Sensor which depends on Job 1 to trigger Job 2 for each partition ie part_1, part_2 ..... and partition date. Need your help for creating sensor 2 which will execute Job3 only after verifying that all partitions of Job 1 and Job 2 are executed successfully for that date. Below is code example:
Copy code
daily_partition = DailyPartitionsDefinition(start_date="2023-07-01")
dynamic_partition = DynamicPartitionsDefinition(name="parts")

daily_multi_partition = MultiPartitionsDefinition(
    partitions_defs={"date": daily_partition, "parts": dynamic_partition})

@asset(
    partitions_def=daily_multi_partition
)
def Job1():
  do some task

@asset(
    partitions_def=daily_multi_partition
)
def Job2():
   do some task

@asset(
    partitions_def=daily_partition,
    non_argument_deps={"Job2"},
)
def Job3():
    do some task
Please help me out to create this sensor , link to sample code or documentation will be big help for me🙏 !
j
b
Hi Jamie, I dont think this meets our use case, What we’re looking for is a way for the sensor to be able to detect that all upstream partitions should have completed ie. if the upstream partition ( multi-partition ) is [hourly + (3 dynamic)], it has 24 x 3 total partitions per day, we want to be able to detect that all upstream partitions (72 partitions) are complete and only then the downstream asset starts Downstream job/asset is more of an aggregator of all upstream asset partitions, if we start aggregating before all 72 are gathered, we have incomplete data so it would be beneficial to have something like a (
End of Partitions
) event that we can check in sensor and only trigger downstream run if it is emitted
are there any dagster APIs that enable us to achieve this ? or Somehow we want to achieve A Fan-out --> Fan-In Pattern
Copy code
daily + dynamic --> daily + dynamic 
daily --<                                     >-- Daily
          daily + dynamic --> daily + dynamic
j
s
I had tried it , but it gives me error:
Copy code
UserWarning: Error loading repository location workflows:dagster._core.errors.DagsterInvalidDefinitionError: Multiple partitioned assets exist in assets job
my upstream is muti-partition (DailyPartitionsDefinition and DynamicPartitionsDefinition) and down stream is "DailyPartitionsDefinition". Is there any dagster api , which I can call from my sensor and it will give me count of total dynamic partition for the Date for which materialisation is going on and successfully materialised dynamic partition.
j
i’m not sure, maybe @claire will know?
c
@shailesh I think that error is occurring when multiple asset partitions defs are found in the same asset job--currently an asset job can only contain unpartitioned assets, and assets of the same partitioning