https://dagster.io/ logo
Title
c

cvb

08/16/2021, 10:20 AM
Hi, guys, I'm having this pipeline that processes different N classes of input, and I'm trying limit concurrency for pair
(pipeline, input-class-N)
, I don't mind if runs for
(pipeline, input-class-I)
and
(pipeline, input-class-J)
runs in parallel, but there should be only 1
(pipeline, input-class-I)
at any time And I can't really figure out how to do that, I can do that with
QueuedRunCoordinator
specifying
tagConcurrencyLimits
for each input class, but it looks ugly. Wouldn't it be much better, if I could define something like
type: QueuedRunCoordinator
      config:
        queuedRunCoordinator:
          tagConcurrencyLimits:
            - key: non-concurrent
              limit: 1
and then for each run define tags like
tags={"non-concurrent": "pipeline-input-class-I"}
? And it looks like it wouldn't be that hard to do in _TagConcurrencyLimitsCounter that will change the api unfortunately, so maybe it would be better to be able to use custom RunCoordinatorDaemon? What do you think, is there any plans for that? Does any of that sounds good enough so I could make pull request?
a

alex

08/16/2021, 5:08 PM
cc @johann
j

johann

08/16/2021, 6:54 PM
@cvb just to be sure I understand the question- you could do
tagConcurrencyLimits:
            - key: non-concurrent
              value: pipeline-input-class-I
              limit: 1
            - key: non-concurrent
              value: pipeline-input-class-II
              limit: 1
but you don’t want to have to enumerate all the possible values (or they’re unbounded)?
c

cvb

08/16/2021, 7:04 PM
Hi @johann, yes, that's correct. Enumerating this values is possible, but inconvenient, it could be a lot of them, and I have to keep them in the dagster config and update whole dagster deployment if I want to add new value, and dagster and pipelines are in different projects. Now imaging adding new pipeline, that requires this kind of rstrictions, that would be a nightmare.
j

johann

08/16/2021, 7:08 PM
That makes sense. I’d be happy to look at a PR. My biggest concern is around the api for specifying these constraints. We need something different than your example above since that already means place that limit without regards to the value. Nothing great is jumping out to me, but maybe something like
type: QueuedRunCoordinator
      config:
        queuedRunCoordinator:
          tagConcurrencyLimits:
            - key: non-concurrent
              matchByValue: true
              limit: 1
c

cvb

08/16/2021, 7:11 PM
no great ideas for API here to 😞 , but API could be changed, maybe some other people will tell their experience, I don't believe I can be the only one who needs this
anyway will try to find some time to implement some kind of prototype