https://dagster.io/ logo
Title
t

Tadas Barzdžius

12/08/2021, 12:04 PM
How can I limit how many ops are running concurrently? In this case I’m talking about DynamicOut ops.
d

Daniel Suissa

12/08/2021, 12:56 PM
+1 @Amit Arie
👀 1
a

Aleksandr

12/08/2021, 2:21 PM
Hi! You may set "max_concurrent" in job decorator:
@dagster.job(
	config={
		"execution": {
			"config": {
				"multiprocess": {
					"max_concurrent": 5
				}
			}
		}
	}
)
def some_job():
or manually run from a Launchpad in dagit UI:
execution:
  config:
    multiprocess:
      max_concurrent: 5
https://docs.dagster.io/_apidocs/execution#dagster.multiprocess_executor
a

Amit Arie

12/08/2021, 2:22 PM
Hi @Aleksandr, in what file should I place this yaml configuration? and who should load it via CLI?
t

Tadas Barzdžius

12/08/2021, 2:23 PM
I’m using k8s_job_executor, so multiprocess executor doesn’t work for me 😞
d

daniel

12/08/2021, 3:21 PM
I don't think the k8s_job_executor currently has a way to limit the number of concurrent ops within a single job unfortunately. We do have a way to use celery to apply a global limit (across all running jobs) of the number of ops running at once, that's described here: https://docs.dagster.io/deployment/guides/kubernetes/deploying-with-helm-advanced - would that possibly be appropriate for your use case, or are you specifically hoping to limit the number of ops running within a single job?
t

Tadas Barzdžius

12/08/2021, 3:22 PM
I think it would work, I was looking for global limit per op. I will test it tomorrow.
👍 1
d

daniel

12/08/2021, 3:24 PM
@Amit Arie to your question about run config, you can read about the different ways to apply config to your jobs here (it can be either be a file using the --config arg in the CLI, or via the Launchpad UI in dagit): https://docs.dagster.io/concepts/configuration/config-schema#run-configuration -
t

Tadas Barzdžius

12/09/2021, 8:42 AM
@daniel I’m trying to use dagster-celery-k8s with volumes. Where should I add that configuration? I’ve tried using via
dagster-k8s
tag, but I got error:
dagster.core.errors.DagsterSubprocessError: During celery execution errors occurred in workers:
[split_ids_file]: TypeError: __init__() got an unexpected keyword argument 'volumes'Stack Trace:File "/app/.venv/lib/python3.9/site-packages/dagster_celery/core_execution_loop.py", line 81, in core_celery_execution_loopstep_events = result.get()File "/app/.venv/lib/python3.9/site-packages/celery/result.py", line 220, in getself.maybe_throw(callback=callback)File "/app/.venv/lib/python3.9/site-packages/celery/result.py", line 336, in maybe_throwself.throw(value, self._to_remote_traceback(tb))File "/app/.venv/lib/python3.9/site-packages/celery/result.py", line 329, in throwself.on_ready.throw(*args, **kwargs)File "/app/.venv/lib/python3.9/site-packages/vine/promises.py", line 234, in throwreraise(type(exc), exc, tb)File "/app/.venv/lib/python3.9/site-packages/vine/utils.py", line 30, in reraiseraise value
  File "/app/.venv/lib/python3.9/site-packages/dagster/core/execution/api.py", line 775, in pipeline_execution_iterator
    for event in pipeline_context.executor.execute(pipeline_context, execution_plan):
  File "/app/.venv/lib/python3.9/site-packages/dagster_celery/core_execution_loop.py", line 162, in core_celery_execution_loop
    raise DagsterSubprocessError(
I’m looking at defining it in yaml via execution, but I think it gives me bad schema:
/* A list of volumes to include in the Job's Pod. Default: ``[]``. For the many possible volume source types that can be included, see: <https://v1-18.docs.kubernetes.io/docs/reference/generated/kubernetes-api/v1.18/#volume-v1-core> */
  volumes?: [{
    name: String
  }]
d

daniel

12/09/2021, 3:13 PM
This is a place where celery-k8s is annoyingly inconsistent with vanilla k8s - we can look into cleaning that up. In the meantime defining it in yaml via execution should work - would you mind sharing the excution yaml you used and I can take a look?
t

Tadas Barzdžius

12/09/2021, 3:26 PM
I’ve tried defining it via execution yaml, but it didn’t work. I got error when trying to define volumes via yaml. I’ve tried adding in execution block:
volumes:
      - name: storage
        persistentVolumeClaim: {claimName: dagster-cephfs.mount}
I can’t currently access the error 😞 I can paste it here tomorrow.
d

daniel

12/09/2021, 3:30 PM
got it - will take a look then. I believe that should be possible, hopefully its just a small tweak we need to make.
t

Tadas Barzdžius

12/09/2021, 3:42 PM
@daniel I just took out my work pc and tried same stuff again. It worked… Sorry for the disturbance, I have no idea what I did wrong couple hours ago… 🙂
d

daniel

12/09/2021, 3:42 PM
no prob 🙂
we should really make it so that you can put it in the run launcher config just like with the non-celery run launcher (so that switching is easy), we can look into that
t

Tadas Barzdžius

12/09/2021, 3:44 PM
Yea It would be cool. Mounting secret storage is a bit strange with celery too. I had to add secrets to execution block too. I didn’t work when I tried setting it via envSecrets. But now I doubt myself so I think I should double check it.
Is there a reason why you can’t volumes via tags, like you can do with volumeMounts ?
d

daniel

12/09/2021, 3:52 PM
I believe you can do that as well actually - it's just a little tricky since the volumes go one tag (for the pod spec) and the volumeMounts go on another tag (for the container). Let me see if I can pull up an example
t

Tadas Barzdžius

12/10/2021, 7:37 AM
Ohhhh, gotcha.
d

daniel

12/10/2021, 3:33 PM
Just landed a change that lets you set volumes and volumeMounts for the run launcher when using celery-k8s, just like you can when using k8s
that will go out next Thursday
t

Tadas Barzdžius

12/10/2021, 3:59 PM
Thanks!