Danny
05/15/2020, 5:06 AM@solid
def my_solid(context, num: int):
<http://context.log.info|context.log.info>(f'num = {num}')
execute_pipeline_iterator(my_pipeline, instance=DagsterInstance.get(), environment_dict={
'solids': {
'my_solid': {
'inputs': {
'num': num + 1
}
}
}
})
@pipeline
def my_pipeline():
my_solid()
The result is that a new pipeline run is listed in dagit but is stuck on state Starting...
- am I doing something wrong?sashank
05/15/2020, 5:54 AMexecute_pipeline_iterator
returns an iterator that of pipeline events from a pipeline execution. you need to actually iterate through the iterator to process the pipelineexecute_pipeline
instead. (example of this included in the gist above as well)Danny
05/15/2020, 6:12 AM_iterator
part didn't clue me in...🤦♂️
It works, thanks so much!max
05/15/2020, 6:18 AMDanny
05/15/2020, 6:47 AMmax
05/15/2020, 3:57 PMDanny
05/15/2020, 4:13 PM# This value will actually be stored in and queried from a database:
current_num = 1
@solid
def run_main_pipeline(context, num: int):
<http://context.log.info|context.log.info>(f'num = {num}')
execute_pipeline_iterator(main_pipeline, instance=DagsterInstance.get(), environment_dict={
'solids': {
'my_solid': {
'inputs': {
'num': num + 1
}
}
}
})
@solid
def get_next_batch_of_items(context):
while True:
active_runs = DagsterInstance.get_active_runs() # my made-up method
if active_runs < 10:
for i in range(10 - active_runs):
current_num += 1
yield Output(current_num, 'num')
time.sleep(1)
@pipeline
def main_loop_pipeline():
while True:
run_main_pipeline(get_next_batch_of_items())
This is obviously non-working but gives an idea. main_pipeline
is the actual pipeline I'm interested in running, whereas the "meta" main_loop_pipeline
is an operator or controller if you will, which can be started and stopped like any dagster pipeline. I'd love to find out I'm going down the wrong path with this and there's a more straightforward solution.dagster.core.scheduler.Scheduler
for this. Dagit already has first class support for managing schedules and in effect what I’m trying to do is run my pipeline on a custom schedule (start a new run whenever there are fewer than N runs currently active in dagit). @max @sashank do you know of any existing implementation of something like this? If not I can see if it’s easy enough to build based off the cron scheduler’s example. Any other examples of custom schedulers would be appreciated.