Hi! I'm trying to get a solid to start a new run o...
# announcements
d
Hi! I'm trying to get a solid to start a new run of its own pipeline, asynchronously, in a recursive fashion. My code:
Copy code
@solid
def my_solid(context, num: int):

    <http://context.log.info|context.log.info>(f'num = {num}')

    execute_pipeline_iterator(my_pipeline, instance=DagsterInstance.get(), environment_dict={
        'solids': {
            'my_solid': {
                'inputs': {
                    'num': num + 1
                }
            }
        }
    })

@pipeline
def my_pipeline():
    my_solid()
The result is that a new pipeline run is listed in dagit but is stuck on state
Starting...
- am I doing something wrong?
s
execute_pipeline_iterator
returns an iterator that of pipeline events from a pipeline execution. you need to actually iterate through the iterator to process the pipeline
If you don’t care about processing the events of the recursive pipeline execution within your solid, you may want to just use
execute_pipeline
instead. (example of this included in the gist above as well)
d
Guess the
_iterator
part didn't clue me in...🤦‍♂️ It works, thanks so much!
m
very interested in what you're doing with the recursive runs 🙂
d
I have a pipeline that takes a single input and whose solids use web services to enrich it and store the result in a db. There are a very large number of these inputs and I need to process them in parallel under these constraints: 1. I always want N pipelines to be running at the same time. I would tune N according to the hardware available in the cluster and according to the observed resource usage by a single pipeline run. I see I can use something like this https://docs.dagster.io/docs/deploying/instance#dagit to control max concurrent runs, but it's not clear how I should be invoking the pipeline. It's not ideal to schedule thousands of runs right from the start and let that max_concurrent_runs dagit setting pace the rate of processing, since now I've cluttered the UI with jobs that may only run weeks from now, and there's no easy way to delete all the pending runs if I need to stop everything. Instead, something needs to monitor dagit and start a new pipeline run when it sees that a previous pipeline finished. That something should ideally be stoppable itself, and I'm thinking to make it be a meta-pipeline that invokes this main pipeline. This is why I'm experimenting with execute_pipeline_iterator, but so far it looks like the code needed to make these invocations efficiently is non-trivial. 2. The web services have rate limits, so I'll need to find a way to set the max concurrent executions of individual solids, or ideally a class of solids (e.g. all solids that call the github api should share a concurrent execution limit across the cluster). I've seen that Prefect allows tagging tasks and then setting task concurrency controls per tag https://docs.prefect.io/orchestration/concepts/task-concurrency-limiting.html, which is what I'm looking to replicate with dagster. Any tips would be hugely appreciated!
m
that’s fascinating. what environment are you deploying to?
d
@max for now playing locally to put together a POC, eventually it'll go on k8s, likely with celery
Here's some pseudocode of what I'm thinking the "meta-pipeline" I described above in #1 would look like:
Copy code
# This value will actually be stored in and queried from a database:
current_num = 1

@solid
def run_main_pipeline(context, num: int):
    <http://context.log.info|context.log.info>(f'num = {num}')
    execute_pipeline_iterator(main_pipeline, instance=DagsterInstance.get(), environment_dict={
        'solids': {
            'my_solid': {
                'inputs': {
                    'num': num + 1
                }
            }
        }
    })

@solid
def get_next_batch_of_items(context):

    while True:
        active_runs = DagsterInstance.get_active_runs()  # my made-up method
        if active_runs < 10:
            for i in range(10 - active_runs):
                current_num += 1
                yield Output(current_num, 'num')
        time.sleep(1)

@pipeline
def main_loop_pipeline():

    while True:
        run_main_pipeline(get_next_batch_of_items())
This is obviously non-working but gives an idea.
main_pipeline
is the actual pipeline I'm interested in running, whereas the "meta"
main_loop_pipeline
is an operator or controller if you will, which can be started and stopped like any dagster pipeline. I'd love to find out I'm going down the wrong path with this and there's a more straightforward solution.
Another approach I’m looking into is creating a new subclass of
dagster.core.scheduler.Scheduler
for this. Dagit already has first class support for managing schedules and in effect what I’m trying to do is run my pipeline on a custom schedule (start a new run whenever there are fewer than N runs currently active in dagit). @max @sashank do you know of any existing implementation of something like this? If not I can see if it’s easy enough to build based off the cron scheduler’s example. Any other examples of custom schedulers would be appreciated.