is op- tag concurrency limit intended to work with...
# ask-community
n
is op- tag concurrency limit intended to work with a partitioned job? if so please provide a simple toy example? i was able to get max concurrent = 3 to work, but the tag concurrency is being ignored -- for example Mpa_Multi should be limited 2, however 3 will run at the same time. my project looks something like --
Copy code
@static_partitioned_config(partition_keys=companies)
def mpa_config(partition_key: str):
    order = ordered_list
    return {
           "execution": {
                "config": {
                    "multiprocess": {
                        "max_concurrent": 3,
                        "tag_concurrency_limits":[
                             {
                                  "key": "MPA_Multi",
                                   "value": "Single",
                                   "limit": 1,
                             },
                             {
                                  "key": "MPA_Multi",
                                   "value": "Double",
                                   "limit": 2,
                             },
                             {
                                  "key": "MPA_Multi",
                                   "value": "Triple",
                                   "limit": 3,
                             }]  
                },
            }
        },
           "ops": {"op_a": {"config": {"CID": partition_key,
                                                "order" : order,
                                                "tags":{"dagster/priority":str(order*-1),"MPA_Multi":"Single"}}},
                    "op_b": {"config": {"CID": partition_key,
                                                "order" : order,
                                                "tags":{"dagster/priority":str(order*-1),"MPA_Multi":"Double"}}},
                    "op_c": {"config": {"CID": partition_key,
                                                "order" : order,
                                                "tags":{"dagster/priority":str(order*-1),"MPA_Multi":"Double"}}},
                    "op_d":{"config": {"CID": partition_key,
                                                "order" : order,
                                                "tags":{"dagster/priority":str(order*-1),"MPA_Multi":"Double"}}},
                    }}


class CompanyOpConfig(Config):
    CID: str
    order: int
    tags : dict

# step 1
@op
def op_a(context,config:CompanyOpConfig,ins=None or {"start": In(Nothing)},**kwargs):
        do_op_a_stuff()
# step 2
@op
def op_b(context,config:CompanyOpConfig,ins=None or {"start": In(Nothing)},**kwargs):
        do_op_b_stuff()

# step 3

@op
def op_c(context,config:CompanyOpConfig,ins=None or {"start": In(Nothing)},**kwargs):
        do_op_c_stuff()
  
# step 4
@op()
def op_d(context,config:CompanyOpConfig,ins=None or {"start": In(Nothing)},**kwargs):
        do_op_d_stuff()


@job(config=mpa_config)
def MorningProcessingP05():
    op_a_result = op_a
    finalresult = [op_b(op_a_result),op_c(op_a_result),op_d(op_a_result)]
	finalresult
The tags do show up in the dagster ui tags section. My dagster version is 1.3.3
d
Hi Nicholas - I don't think setting tags programmatically on ops in this manner at runtime is currently supported. You have a 'tags' key on your config object, but that's not the same thing as setting a tag on the op when it's declared in code, which is what tag_concurrency_limits looks at
n
thanks for the tip - know of any workaround?
d
The only workaround that I'm aware of would be setting the tags when the op is defined:
Copy code
@op(tags={"dagster/priority":str(order*-1),"MPA_Multi":"Double"})
def op_c(context,config:CompanyOpConfig,ins=None or {"start": In(Nothing)},**kwargs):
        do_op_c_stuff()
D 1
n
huh. iirc it was giving me an error when i had that, let me try...
tried modifying one op, no errors, so far so good!
🎉 2