Rafael Gomes
12/08/2022, 12:32 AMmarcos
12/08/2022, 6:06 PMRafael Gomes
12/08/2022, 6:07 PMmarcos
12/08/2022, 6:09 PMrun_id
. If you backfill 100 partitions, you are going to trigger 100 downstream jobs. This is the one I need to modify to look at all runs for the parent backfill and only trigger a run request if they have all completed.
@sensor(job=some_job)
def some_job_sensor(context):
run_records = context.instance.get_run_records(
filters=RunsFilter(
job_name="job_name_to_monitor",
statuses=[DagsterRunStatus.SUCCESS],
),
order_by="update_timestamp",
ascending=False,
)
for run_record in run_records:
yield RunRequest(
run_key=run_record.pipeline_run.run_id, # avoid double firing for the same run
)
Rafael Gomes
12/08/2022, 6:10 PMmarcos
12/08/2022, 6:18 PMdef backfill_statuses(instance, backfill_id):
run_partition_data = instance.get_run_partition_data(
runs_filter=RunsFilter(tags={'dagster/backfill': backfill_id})
)
statuses = {}
for item in run_partition_data:
statuses[item.partition] = item.status
return statuses
@sensor(job=some_job)
def some_job_sensor(context):
run_records = context.instance.get_run_records(
filters=RunsFilter(
job_name="job_name_to_monitor",
statuses=[DagsterRunStatus.SUCCESS],
),
order_by="update_timestamp",
ascending=False,
)
for run_record in run_records:
backfill_id = run_record.pipeline_run.tags["dagster/backfill"]
backfill_statuses = backfill_statuses(backfill_statuses)
# check to see if all partitions have succeeded
# if so, trigger run request with backfill id
# if job is not part of a backfill, trigger a run request with run id
yield RunRequest(
run_key=run_record.pipeline_run.run_id, # avoid double firing for the same run
)
sandy
12/08/2022, 10:45 PMcontext.instance
has get_backfill
and get_backfills
methods that allow you to fetch PartitionBackfill
objects. each PartitionBackfill
has a list of partition_names
. you could iterate through all the partition names and query the instance for runs to see if there's a successful run for that partition (filtering on the backfill tag and on `dagster/partition=<partition_name"), then yield a RunRequest if all the partitions have a successful run