Stephen Bailey
01/03/2023, 2:57 PMimport time
@asset
def upstream_short():
time.sleep(1)
return 1
@asset
def upstream_long():
time.sleep(600)
return 2
@asset
def downstream(upstream_short, upstream_long):
return 3
reconciliation_snesor = build_asset_reconciliation_sensor(
asset_selection=AssetSelection.keys("downstream"),
)
I would expect that reconciliation_sensor
will trigger a run of downstream
if upstream_short
is materialized, or if upstream_long
is materialized. What if upstream_short
is materialized, while upstream_long
is running?claire
01/03/2023, 8:57 PMStephen Bailey
01/04/2023, 2:06 AMclaire
01/04/2023, 2:41 AMZachary Bluhm
01/04/2023, 2:30 PMupstream_short
and upstream_long
above called upstream_main
. In this case downstream
would materialize twice for each partitionclaire
01/04/2023, 5:51 PMsandy
01/05/2023, 10:00 PMI'm thinking about a scenario where a large backfill is taking place for an upstream of bothIn this case, the existing asset reconciliation sensor should already work fine Afterandupstream_short
above calledupstream_long
. In this caseupstream_main
would materialize twice for each partitiondownstream
upstream_main
completes, upstream_long
will be considered stale. The reconciliation sensor won't materialize any assets until all their parents are fresh. I.e. it won't run downstream
until upstream_short
and upstream_long
have both incorporated the change to upstream_main
.