Timo Klockow
08/22/2022, 9:36 AM@op
def op_1(context: OpExecutionContext):
print('external API call')
@job
def job_1():
op_1()
@schedule(
name='scheduled_job_1',
job=job_1,
cron_schedule='*/15 * * * *', # Every 15 minutes
default_status=DefaultScheduleStatus.RUNNING,
)
def scheduled_job_1(context: ScheduleEvaluationContext) -> RunRequest:
return RunRequest(run_key=None, run_config={'ops': {'op_1': {}}})
@op
def op_2(context: OpExecutionContext):
print('external API call')
@job
def job_2():
op_2()
@schedule(
name='scheduled_job_2',
job=job_2,
cron_schedule='15 * * * *', # At 15 minutes past the hour
default_status=DefaultScheduleStatus.RUNNING,
)
def scheduled_job_2(context: ScheduleEvaluationContext) -> RunRequest:
return RunRequest(run_key=None, run_config={'ops': {'op_2': {}}})
@op
def op_3(context: OpExecutionContext):
print('external API call')
@job
def job_3():
op_3()
@schedule(
name='scheduled_job_3',
job=job_3,
cron_schedule='15 * * * *', # At 15 minutes past the hour
default_status=DefaultScheduleStatus.RUNNING,
)
def scheduled_job_3(context: ScheduleEvaluationContext) -> RunRequest:
# TODO: Wait for job_1 at the 15 minute after the full hour to be finished first <--
# TODO: Wait for job_2 at the 15 minute after the full hour to be finished first <--
# TODO: Only then run this schedule <--
return RunRequest(run_key=None, run_config={'ops': {'op_3': {}}})
@repository
def repo():
return [scheduled_job_1, scheduled_job_2, scheduled_job_3]
My questions are the open TODOs!
Note I don’t produce any Assets here since I only need Dagster to orchestrate API calls.task_a >> task_b >> [task_c, task_d, ...]
chris
08/22/2022, 8:30 PMcontext.instance.get_run_records
method to check for runs of the previous to jobs that match the expected tick - and wait for that success before kicking off said run. A caveat here is that the job kickoff might not be on the execution time specified by the tick exactly, because now you're waiting until those jobs actually finish before running your downstream dependency.