Nicholas Buck
08/28/2023, 8:15 PM@static_partitioned_config(partition_keys=companies)
def mpa_config(partition_key: str):
order = ordered_list
return {
"execution": {
"config": {
"multiprocess": {
"max_concurrent": 3,
"tag_concurrency_limits":[
{
"key": "MPA_Multi",
"value": "Single",
"limit": 1,
},
{
"key": "MPA_Multi",
"value": "Double",
"limit": 2,
},
{
"key": "MPA_Multi",
"value": "Triple",
"limit": 3,
}]
},
}
},
"ops": {"op_a": {"config": {"CID": partition_key,
"order" : order,
"tags":{"dagster/priority":str(order*-1),"MPA_Multi":"Single"}}},
"op_b": {"config": {"CID": partition_key,
"order" : order,
"tags":{"dagster/priority":str(order*-1),"MPA_Multi":"Double"}}},
"op_c": {"config": {"CID": partition_key,
"order" : order,
"tags":{"dagster/priority":str(order*-1),"MPA_Multi":"Double"}}},
"op_d":{"config": {"CID": partition_key,
"order" : order,
"tags":{"dagster/priority":str(order*-1),"MPA_Multi":"Double"}}},
}}
class CompanyOpConfig(Config):
CID: str
order: int
tags : dict
# step 1
@op
def op_a(context,config:CompanyOpConfig,ins=None or {"start": In(Nothing)},**kwargs):
do_op_a_stuff()
# step 2
@op
def op_b(context,config:CompanyOpConfig,ins=None or {"start": In(Nothing)},**kwargs):
do_op_b_stuff()
# step 3
@op
def op_c(context,config:CompanyOpConfig,ins=None or {"start": In(Nothing)},**kwargs):
do_op_c_stuff()
# step 4
@op()
def op_d(context,config:CompanyOpConfig,ins=None or {"start": In(Nothing)},**kwargs):
do_op_d_stuff()
@job(config=mpa_config)
def MorningProcessingP05():
op_a_result = op_a
finalresult = [op_b(op_a_result),op_c(op_a_result),op_d(op_a_result)]
finalresult
The tags do show up in the dagster ui tags section. My dagster version is 1.3.3daniel
08/28/2023, 8:25 PMNicholas Buck
08/28/2023, 8:26 PMdaniel
08/28/2023, 8:27 PM@op(tags={"dagster/priority":str(order*-1),"MPA_Multi":"Double"})
def op_c(context,config:CompanyOpConfig,ins=None or {"start": In(Nothing)},**kwargs):
do_op_c_stuff()
Nicholas Buck
08/28/2023, 8:28 PMNicholas Buck
08/28/2023, 8:32 PM