How can I create a sensor that will run only after...
# ask-community
r
How can I create a sensor that will run only after all partitions from a partition job finish? So, after running a backfill the sensor job will run only once.
m
@Rafael Gomes Working on this as well. Let me know if you’ve had any learnings since posting this. I will also share how I approach this one.
r
Will do. No clue so far
Maybe a job sensor
m
Haha, same. The job sensor I am using right now looks at all runs for a job and fires off run requests using the
run_id
. If you backfill 100 partitions, you are going to trigger 100 downstream jobs. This is the one I need to modify to look at all runs for the parent backfill and only trigger a run request if they have all completed.
Copy code
@sensor(job=some_job)
def some_job_sensor(context):
    run_records = context.instance.get_run_records(
        filters=RunsFilter(
            job_name="job_name_to_monitor",
            statuses=[DagsterRunStatus.SUCCESS],
        ),
        order_by="update_timestamp",
        ascending=False,
    )
    for run_record in run_records:
        yield RunRequest(
            run_key=run_record.pipeline_run.run_id,  # avoid double firing for the same run
        )
r
I got in the same situation 😅 Did something very similar, but like you mentioned it will trigger for all of them
@sandy Any idea or workaround to help us?
m
An idea:
Copy code
def backfill_statuses(instance, backfill_id):
    run_partition_data = instance.get_run_partition_data(
        runs_filter=RunsFilter(tags={'dagster/backfill': backfill_id})
    )
    statuses = {}
    for item in run_partition_data:
        statuses[item.partition] = item.status
    return statuses

@sensor(job=some_job)
def some_job_sensor(context):
    run_records = context.instance.get_run_records(
        filters=RunsFilter(
            job_name="job_name_to_monitor",
            statuses=[DagsterRunStatus.SUCCESS],
        ),
        order_by="update_timestamp",
        ascending=False,
    )
    for run_record in run_records:
        backfill_id = run_record.pipeline_run.tags["dagster/backfill"]
        backfill_statuses = backfill_statuses(backfill_statuses)
        # check to see if all partitions have succeeded
        # if so, trigger run request with backfill id
        # if job is not part of a backfill, trigger a run request with run id
        yield RunRequest(
            run_key=run_record.pipeline_run.run_id,  # avoid double firing for the same run
        )
Likely some typos in there as I have not build it out yet
CC @daniel
s
context.instance
has
get_backfill
and
get_backfills
methods that allow you to fetch
PartitionBackfill
objects. each
PartitionBackfill
has a list of
partition_names
. you could iterate through all the partition names and query the instance for runs to see if there's a successful run for that partition (filtering on the backfill tag and on `dagster/partition=<partition_name"), then yield a RunRequest if all the partitions have a successful run
clapping all 2
315 Views