I’m trying to limit concurrency on a Job which use...
# ask-community
s
I’m trying to limit concurrency on a Job which uses dagster_k8s’s
k8s_run_executor
. The documentation (https://docs.dagster.io/_apidocs/libraries/dagster-k8s#dagster_k8s.k8s_job_executor) has a suggestion but I’m having trouble understanding it. It says:
To use the k8s_job_executor, set it as the executor_def when defining a job:
Copy code
(a code example in Python)
> Then you can configure the executor with run config as follows:
Copy code
(a YAML document)
Where does that YAML run config go?
d
Hi Spencer - this section explains some of the different ways you can configure a run: https://docs.dagster.io/concepts/configuration/config-schema#specifying-runtime-configuration (The Dagit/Launchpad section is one place that this config can go)
s
Ah, thanks. Is there any way to pass it in to the Job, like through the @job decorator somehow?
d
oh yes, I thought i posted this here but instead it was just a non sequiter on an unrelated post, oops: You can also put the configuration right on the job itself in code: https://docs.dagster.io/concepts/ops-jobs-graphs/jobs#hardcoded-configuration
s
🙂 thanks!
Is the shape the same? Is a “run config” the same as the config object a Job takes?
d
same shape yeah
s
Oops, I mis-remembered, and I was using `define_asset_job`:
Copy code
nsc_extract_job = define_asset_job(
    "nsc_extract_job",
    selection=["nsc_source_dataframe"],
    config={"execution": {"config": {"max_concurrent": 5}}},
    partitions_def=nsc.PARTITIONING,
)
This produced an error to do with the config’s shape
kind of an ugly error:
Copy code
dagster._core.errors.DagsterInvalidConfigError: Error in config for job
Error 1: Received unexpected config entry "max_concurrent" at path root:execution:config. Expected: "{ in_process?: { marker_to_close?: String retries?: { disabled?: { } enabled?: { } } } multiprocess?: { max_concurrent?: Int retries?: { disabled?: { } enabled?: { } } start_method?: { forkserver?: { preload_modules?: [String] } spawn?: { } } tag_concurrency_limits?: [{ key: String limit: Int value?: (String | { applyLimitPerUniqueValue: Bool }) }] } }".
Copy code
"multiprocess": {"max_concurrent": 1}
seems like it would do something quite different, right?
d
Yeah you also need to configure the job to use the k8s job executor in code like in that example you first posted
s
Oh duhhhh
Thanks, of course