https://dagster.io/ logo
a

Antoine

03/31/2020, 9:32 AM
Hi everyone! I’m new to this slack, so please forward me to proper chan if this isn’t the appropriate one to ask this question. I’m trying to link two pipelines via scheduling. For example, imagine, my two pipelines have two different cron cycles, pipeline A runs once a month and pipeline B runs every day. But pipeline B depends on last run of pipeline A. So, on the day A runs, I would like to be able to run pipeline B only if A has finished and succeeded, and postpone any subsequent run of B until A is fixed (if it happens on the weekend for instance). Is this something that is currently possible with current scheduler?
s

sashank

03/31/2020, 1:22 PM
Hey @Antoine, you could potentially do this via the
should_execute
function on the
ScheduleDefinition
. It is passed a context which an instance of
ScheduleExecutionContext
. On this we can access
context.instance
which lets us query the run storage using the
get_runs
method
Here’s a basic example that would prevent the pipeline B schedule from executing if the last run of pipeline A didn’t succeed:
Copy code
def should_execute(context):
    run_filter = PipelineRunsFilter(pipeline_name="Pipeline A")
    runs = context.instance.get_runs(run_filter, limit=1)
    if not len(runs):
        return False
    latest_run = runs[0]
    if latest_run.status == PipelineRunStatus.SUCCESS:
        return True
    return False
a

Antoine

03/31/2020, 1:24 PM
I was wondering about using that, but it seemed to me that
return False
in `should_execute`is final, in that it would skip that pipeline execution entirely.
s

sashank

03/31/2020, 1:29 PM
That is correct. To make sure your pipeline execution eventually runs when pipeline A has a successful run, you could adopt this backfilling schedule pattern:
What you would do is create a daily partition set for pipeline B, but set the schedule to run every small interval (say an hour). Then, in the
should_execute
, and between the
backfilling_partition_selector
and
should_execute
functions, you can define the logic to return appropriate partitions when they are ready to run
You should return
False
for should execute if your requirements to run any daily partition for schedule B are not satisfied, but
True
if at least one daily partition can be run. Then in
backfilling_partition_selector
you should return that partition.
You wouldn’t want to include these lines, however, which stops the schedule if there are no partitions available, since you want this schedule to run forever and keep checking if there are available partitions to run: https://github.com/dagster-io/dagster/blob/master/examples/dagster_examples/schedules.py#L58-L63
a

Antoine

03/31/2020, 1:46 PM
ok I see, thanks a lot for your answer!
BTW, really loving dagster :)
s

sashank

03/31/2020, 1:53 PM
That’s awesome! Definitely let us know if we can help in any way