I'm trying to ensure dagit has only X simultaneous...
# announcements
d
I'm trying to ensure dagit has only X simultaneous runs of a specific pipeline or execution of specific solid (whichever is easier to accomplish). I'm looking to rate limit various parts of my data pipelines which access external APIs, use finite resources, etc. So far I've found only these options: 1.
max_concurrent_runs
, which afaik has been disabled and is not currently working. 2. Using an external semaphore mechanism and requiring the solid to acquire it at start time. 3. Implementing a custom poll+wait function against
DagsterInstance.get_runs()
for local or
pipelineRunsOrError
for graphql to not launch a pipeline run until enough slots are available (downside is this only works when launching via code, but can't catch web UI pipeline launches, so a poor solution). 4. Do some config magic in celery, which we use as the executor. Not sure what this config magic is though, a pointer to some docs/examples would be awesome. Are there any other strategies available at the moment for this?
👍 1
m
@daniel this is an allied question to termination
a
config magic in celery
dagster-celery worker start
takes a
-q
option to specify a queue name. You can add a tag to solids of
"dagster-celery/queue": "queue_name"
to specify which queue the work gets submitted to. docs definitely need to be better here but some pieces are mentioned https://docs.dagster.io/_apidocs/libraries/dagster_celery examples/tests: https://github.com/dagster-io/dagster/blob/master/python_modules/libraries/dagster-celery/dagster_celery_tests/test_queues.py
then the idea is that use the queue with a fixed limited number of workers to gate access to the constrained resource
d
Thanks @alex, makes sense. Are there any plans to bake solid concurrency control into the native dagster API in the future? Perhaps based on an instance level semaphore, similar to how Argo did it https://github.com/argoproj/argo/issues/2550? And are there plans to re-enable
max_concurrent_runs
?
a
Since so many parts of the system are pluggable and configurable its tough to have a single solution or API for concurrency control. We do want to add back
max_concurrent_runs
, likely scoped to the default run launcher implementation.
d
Understood
@alex how about adding in pipeline concurrency until solid concurrency is figured out: https://github.com/dagster-io/dagster/issues/2716
Is this a reasonable feature request or is it orthogonal to the team's current thinking on this?
a
definitely a reasonable request from a user perspective but we are a ways off from being able to support it generically like that for the reasons mentioned above.
👍 1