Hey Guys, How can I make sure that a daily schedul...
# ask-community
t
Hey Guys, How can I make sure that a daily scheduled asset job only runs if the previous day was successful?
🤖 1
s
Hi Timo,
ScheduleEvaluationContext
gives you access to a
DagsterInstance
, which you can use to query for the previous day's runs:
Copy code
@schedule(job_name="some_asset_job")
def foo_schedule(context: ScheduleEvaluationContext) -> Sequence[RunRequest]:
    records = context.instance.get_run_records(
        filters=RunsFilter(job_name="some_asset_job")
        order_by="create_timestamp",
        ascending=False,
        limit=1
    )
    if records and records[0].dagster_run.is_success:
        return [RunRequest(...)]
    else:
        return []
t
Hey Sean, ok I see, you’d have to check somehow. So the default behavior is to run every tick no matter what correct?
s
Correct, but if you provide a schedule evaluation function like I did above then you can have pretty much arbitrary logic to determine the run(s) that will be launched each tick
t
Alright, thanks. Do you see this as future feature for schedules? I know e.g. Airflow back then pretty much had this outta the box. I believe they called it trigger rules https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dags.html#trigger-rules
Trigger Rules
By default, Airflow will wait for all upstream (direct parents) tasks for a task to be successful before it runs that task.
However, this is just the default behaviour, and you can control it using the
trigger_rule
argument to a Task. The options for
trigger_rule
are:
•
all_success
(default): All upstream tasks have succeeded
• `all_failed`: All upstream tasks are in a
failed
or
upstream_failed
state
• `all_done`: All upstream tasks are done with their execution
• `all_skipped`: All upstream tasks are in a
skipped
state
• `one_failed`: At least one upstream task has failed (does not wait for all upstream tasks to be done)
• `one_success`: At least one upstream task has succeeded (does not wait for all upstream tasks to be done)
• `one_done`: At least one upstream task succeeded or failed
• `none_failed`: All upstream tasks have not
failed
or
upstream_failed
- that is, all upstream tasks have succeeded or been skipped
• `none_failed_min_one_success`: All upstream tasks have not
failed
or
upstream_failed
, and at least one upstream task has succeeded.
• `none_skipped`: No upstream task is in a
skipped
state - that is, all upstream tasks are in a
success
,
failed
, or
upstream_failed
state
• `always`: No dependencies at all, run this task at any time
You can also combine this with the Depends On Past functionality if you wish.
s
I don't think we have any plans to add a higher level API to only launch the next schedule run if the previous one succeeded. I'm personally not that familiar with airflow, but my impression is that "task" maps roughly to Dagster's "op" concept-- in which case IIUC "Trigger rules" wouldn't really map onto successive runs in a schedule, but rather ops in a graph-- and yes if you execute a Dagster graph, downstream ops will not execute by default if an upstream one fails.
t
Alright, thanks for clarifying 🙂 A follow up question to your proposal, assuming this schedule is supposed to run a partitioned asset job daily: Is it theoretically possible to launch an asset RunRequest via API with all the failed partition keys including the latest one and pass that partition range to a single run as you can do in the UI?
Or what happens behind the scenes when I check “Pass partition ranges to single run”? That would help
Copy code
@schedule(job_name="some_asset_job")
def foo_schedule(context: ScheduleEvaluationContext) -> Sequence[RunRequest]:
    records = context.instance.get_run_records(
        filters=RunsFilter(job_name="some_asset_job")
        order_by="create_timestamp",
        ascending=False,
        limit=1
    )
    if records and records[0].dagster_run.is_success:
        return [RunRequest(...)]
    else:
        return []
Also how would this work with partitioned asset jobs, anyways?
s
(Marking this resolved since the convo is going in the other thread you opened)
đź‘Ť 1