Is there a way to have a sensor that checks the st...
# ask-community
d
Is there a way to have a sensor that checks the status of any backfills that are currently running? would I need to search the dagster instance logs in the body of the sensor?
🤖 1
s
Hi Danny, So I’m not sure if there’s an official means in the public API (looking into it), but I think this should work:
Copy code
@sensor
def my_sensor(context):
    all_backfills = context.instance.get_backfills()
    statuses = {b.backfill_id: b.status.value for b in all_backfills}
d
thank you!
What would that status value be? would a backfill be in progress and have information on the partitions that have run and the partitions that still need to run?
So no fine-grained info on individual partitions
Still waiting to hear if we have a pubilc API for this-- if not then we should probably open an issue
p
Hi Danny. This is slightly complicated because there is a status of the backfill, but that is mostly to track whether the backfill runs have been scheduled or not (helps us track daemon progress). You probably care the most about whether the runs have completed or not. To fetch that status, you will need to query the status of the individual runs that compose that backfill. Disclaimer: How to do this accurately depends on whether or not you are doing an asset backfill or a job backfill. I’ve included some snippets that queries for job backfills:
Copy code
expected_partition_count = backfill.get_num_partitions()
runs = instance.get_runs(filters=RunsFilter.for_backfill(backfill))
successful_count = len([run for run in runs where run.status == DagsterRunStatus.SUCCESS])
Please note that the number of runs matching the backfill will probably depend on whether things get retried automatically. To get full partition status, you might want to bucket run status by partition:
Copy code
from dagster._core.storage.tags import PARTITION_NAME_TAG

partitions = backfill.get_partition_names()
runs = instance.get_runs(filters=RunsFilter.for_backfill(backfill))
run_status_by_partition = {}
for run in runs:
    partition = run.tags.get(PARTITION_NAME_TAG)
    if partition and partition not in run_status_by_partition:
        # just take the first one, since runs are in descending time order (we want the last run per partition)
        run_status_by_partition[partition] = run.status
d
thank you so much for the help!