Manga Dhatrika
05/20/2022, 3:46 PMjamie
05/20/2022, 5:46 PM@job
def my_job():
pull_1000_rows.alias("pull_0_999")(first=0, last=999)
pull_1000_rows.alias("pull_1000_1999)(first=1000, last=1999)
...
(you could also put this op generation in a loop)
@job
def my_job():
for i in range(0, 3000, 1000):
first = i
last = i + 9999
pull_1000_rows.alias(f"pull_{first}_{last}")(first=first, last=last)
Manga Dhatrika
05/20/2022, 5:50 PMjamie
05/20/2022, 5:53 PMManga Dhatrika
05/20/2022, 6:14 PMjamie
05/20/2022, 6:18 PMManga Dhatrika
05/20/2022, 6:27 PMjamie
05/20/2022, 7:01 PM@run_failure_sensor(
job_selection=[fetch_db_rows], # so the sensor only runs when this job fails
)
def rerun_db_fetch_sensor(context: RunFailureSensorContext):
# logic to get the rows to start on
yield RunRequest(
run_config={<your run config here>},
job_name="fetch_db_rows"
)
Manga Dhatrika
05/20/2022, 9:03 PM