https://dagster.io/ logo
#ask-community
Title
# ask-community
t

Timo Klockow

08/22/2022, 9:36 AM
Hey Dagster guys, I have the following problem I need to solve:
Copy code
@op
def op_1(context: OpExecutionContext):
    print('external API call')


@job
def job_1():
    op_1()


@schedule(
    name='scheduled_job_1',
    job=job_1,
    cron_schedule='*/15 * * * *',  # Every 15 minutes
    default_status=DefaultScheduleStatus.RUNNING,
)
def scheduled_job_1(context: ScheduleEvaluationContext) -> RunRequest:
    return RunRequest(run_key=None, run_config={'ops': {'op_1': {}}})


@op
def op_2(context: OpExecutionContext):
    print('external API call')


@job
def job_2():
    op_2()


@schedule(
    name='scheduled_job_2',
    job=job_2,
    cron_schedule='15 * * * *',  # At 15 minutes past the hour
    default_status=DefaultScheduleStatus.RUNNING,
)
def scheduled_job_2(context: ScheduleEvaluationContext) -> RunRequest:
    return RunRequest(run_key=None, run_config={'ops': {'op_2': {}}})


@op
def op_3(context: OpExecutionContext):
    print('external API call')


@job
def job_3():
    op_3()


@schedule(
    name='scheduled_job_3',
    job=job_3,
    cron_schedule='15 * * * *',  # At 15 minutes past the hour
    default_status=DefaultScheduleStatus.RUNNING,
)
def scheduled_job_3(context: ScheduleEvaluationContext) -> RunRequest:
    # TODO: Wait for job_1 at the 15 minute after the full hour to be finished first <--
    # TODO: Wait for job_2 at the 15 minute after the full hour to be finished first <--
    # TODO: Only then run this schedule <--
    return RunRequest(run_key=None, run_config={'ops': {'op_3': {}}})

@repository
def repo():
    return [scheduled_job_1, scheduled_job_2, scheduled_job_3]
My questions are the open TODOs! Note I don’t produce any Assets here since I only need Dagster to orchestrate API calls.
In Airflow it was as simple as
Copy code
task_a >> task_b >> [task_c, task_d, ...]
c

chris

08/22/2022, 8:30 PM
if I understand correctly, the above airflow code would expect that all of those tasks are running on the same schedule though right, which doesn't match the code that I see above? You would have to do cross-dag dependencies I believe
In any case, I think what you probably want here is to use the
context.instance.get_run_records
method to check for runs of the previous to jobs that match the expected tick - and wait for that success before kicking off said run. A caveat here is that the job kickoff might not be on the execution time specified by the tick exactly, because now you're waiting until those jobs actually finish before running your downstream dependency.
3 Views