Timo Klockow
02/06/2023, 5:28 PMsean
02/06/2023, 7:52 PMScheduleEvaluationContext
gives you access to a DagsterInstance
, which you can use to query for the previous day's runs:
@schedule(job_name="some_asset_job")
def foo_schedule(context: ScheduleEvaluationContext) -> Sequence[RunRequest]:
records = context.instance.get_run_records(
filters=RunsFilter(job_name="some_asset_job")
order_by="create_timestamp",
ascending=False,
limit=1
)
if records and records[0].dagster_run.is_success:
return [RunRequest(...)]
else:
return []
Timo Klockow
02/06/2023, 7:54 PMsean
02/06/2023, 7:55 PMTimo Klockow
02/06/2023, 7:59 PMTrigger Rules
By default, Airflow will wait for all upstream (direct parents) tasks for a task to be successful before it runs that task.
However, this is just the default behaviour, and you can control it using theargument to a Task. The options fortrigger_rule
are:trigger_rule
•(default): All upstream tasks have succeededall_success
• `all_failed`: All upstream tasks are in aorfailed
stateupstream_failed
• `all_done`: All upstream tasks are done with their execution
• `all_skipped`: All upstream tasks are in astateskipped
• `one_failed`: At least one upstream task has failed (does not wait for all upstream tasks to be done)
• `one_success`: At least one upstream task has succeeded (does not wait for all upstream tasks to be done)
• `one_done`: At least one upstream task succeeded or failed
• `none_failed`: All upstream tasks have notorfailed
- that is, all upstream tasks have succeeded or been skippedupstream_failed
• `none_failed_min_one_success`: All upstream tasks have notorfailed
, and at least one upstream task has succeeded.upstream_failed
• `none_skipped`: No upstream task is in astate - that is, all upstream tasks are in askipped
,success
, orfailed
stateupstream_failed
• `always`: No dependencies at all, run this task at any time
You can also combine this with the Depends On Past functionality if you wish.
sean
02/06/2023, 8:09 PMTimo Klockow
02/06/2023, 8:46 PM@schedule(job_name="some_asset_job")
def foo_schedule(context: ScheduleEvaluationContext) -> Sequence[RunRequest]:
records = context.instance.get_run_records(
filters=RunsFilter(job_name="some_asset_job")
order_by="create_timestamp",
ascending=False,
limit=1
)
if records and records[0].dagster_run.is_success:
return [RunRequest(...)]
else:
return []
Also how would this work with partitioned asset jobs, anyways?sean
02/07/2023, 3:31 PM