https://dagster.io/ logo
#ask-community
Limiting Concurrency in a Job
# ask-community
s

Srihari

03/12/2024, 11:28 AM

Limiting Concurrency in a Job

my code location as below defs which used k8s_job_executor
Copy code
defs = Definitions(
    assets=[the_asset], jobs=[asset_job, op_job], executor=k8s_job_executor
)
All jobs/schedules running fine with above def. Now i want to create an op based job, the op should not run in parallel, if mutiple are there then it must run in sequence. so i tried create a job with below code as in doc https://docs.dagster.io/guides/limiting-concurrency-in-data-pipelines#limiting-overall-concurrency-in-a-job Here i used multiprocess_executor bcz multiprocess works with multiprocess_executor as mentioned in the doc.
Copy code
@job(
    executor_def=multiprocess_executor,
    config={
        "execution": {
            "config": {
                "multiprocess": {
                    "max_concurrent": 1,
                "tag_concurrency_limits": [
                        {
                            "key": "database",
                            "value": "neo4j",
                            "limit": 1,
                        }
                    ],
                },
            }
        }
    },
)
def data_load_job() -> None:
But code is giving error

error

Copy code
dagster._core.errors.DagsterInvalidConfigError: Invalid default_value for Field.
    Error 1: Received unexpected config entry "multiprocess" at path root:config. Expected: "{ max_concurrent?: Int? retries?: { disabled?: { } enabled?: { } } start_method?: { forkserver?: { preload_modules?: [String] } spawn?: { } } tag_concurrency_limits?: [{ key: String limit: Int value?: (String | { applyLimitPerUniqueValue: Bool }) }] }".
can you help what is wrong in job defenition?
14 Views