André Augusto
02/14/2024, 6:50 PMAndré Augusto
02/14/2024, 6:55 PM@schedule(
job=my_job,
cron_schedule=["0 23 * * 1-5"],
)
def my_schedule(context: ScheduleEvaluationContext):
runs = context.instance.get_runs(
filters=RunsFilter(
job_name=my_job.name,
statuses=[DagsterRunStatus.SUCCESS],
),
ascending=False,
)
if runs:
last_execution_date = None
for run in runs:
if execution_date := run.tags.get("execution_date", None):
execution_date = datetime.strptime(execution_date, "%Y-%m-%d")
if not last_execution_date:
last_execution_date = execution_date
elif execution_date > last_execution_date:
last_execution_date = execution_date
filter_date = last_execution_date
if filter_date:
... # do something about it to my job to run it "incrementally"
run_key = context.scheduled_execution_time.strftime("%Y%m%d") # type: ignore
return RunRequest(
run_key=run_key,
tags={
"execution_date": datetime.strftime(datetime.now(), "%Y-%m-%d"),
},
)