Danny Steffy
03/27/2023, 3:54 AMsean
03/27/2023, 5:13 PM@sensor
def my_sensor(context):
all_backfills = context.instance.get_backfills()
statuses = {b.backfill_id: b.status.value for b in all_backfills}
Danny Steffy
03/27/2023, 6:50 PMsean
03/27/2023, 8:08 PMprha
03/28/2023, 5:02 PMexpected_partition_count = backfill.get_num_partitions()
runs = instance.get_runs(filters=RunsFilter.for_backfill(backfill))
successful_count = len([run for run in runs where run.status == DagsterRunStatus.SUCCESS])
Please note that the number of runs matching the backfill will probably depend on whether things get retried automatically. To get full partition status, you might want to bucket run status by partition:
from dagster._core.storage.tags import PARTITION_NAME_TAG
partitions = backfill.get_partition_names()
runs = instance.get_runs(filters=RunsFilter.for_backfill(backfill))
run_status_by_partition = {}
for run in runs:
partition = run.tags.get(PARTITION_NAME_TAG)
if partition and partition not in run_status_by_partition:
# just take the first one, since runs are in descending time order (we want the last run per partition)
run_status_by_partition[partition] = run.status
Danny Steffy
03/28/2023, 5:12 PM