https://dagster.io/ logo
#ask-community
Title
# ask-community
i

Ilya Pogorelsky

08/31/2023, 2:51 PM
I have a scenario where a sensor (ftp) might get several new files, we need to make sure that we process them in a specific order, sequentially. I was thinking of setting the concurrency limit via executer: mulitprocessing setting on the job that does the file processing as described here: https://docs.dagster.io/guides/limiting-concurrency-in-data-pipelines#op-based-jobs-1 but the sample provided on the page does not seem to work as-is, I am doing something like this
Copy code
SYNC_JOB_CONFIG = {
    "execution": {
        "config": {
            "multiprocess": {
                "max_concurrent": 1,
            },
        }
    }
}
@job(name="sync", config=SYNC_JOB_CONFIG)
def sync_job():
   op3(op2(op1()))
but getting errors from dagster on startup:
Copy code
re.errors.DagsterInvalidConfigError: Error in config when building job 'sync' 
    Error 1: Missing required config entry "ops" at the root. Sample config for missing entry: {'ops': {'op1': {'config': {'s3_bucket': '...'}}, 'op2': {'config': {'filename': '...', 's3_bucket': '...'}}}}
I don’t want to specify any configuration for these ops, as the config will be injected by the sensor
RunRequest
may be it’s addressed in a similar thread from a month ago; will try that approach
z

Zach

08/31/2023, 3:43 PM
You could also use the in_process executor and then not have to mess with concurrency settings
🙏 1
i

Ilya Pogorelsky

08/31/2023, 4:07 PM
in production i am actually running on kubernetes, and we set executor to
k8s_job_executor
we actually wanted to have
in-process
like behavior for some of the jobs we launch inside a pod. need to experiment a bit.
So it looks like neither
in_process
nor
multiprocess_executor.configured({"max_concurrent": 1})
is constraining the number of job runs that happen concurrently. may be these are meant to constraint how many ops/assets can run within a given job run instance, but not across ALL job run requests.
z

Zach

08/31/2023, 5:58 PM
Ah I see, are you trying to constrict the number of instances of a specific op being run across all jobs? Or the number of instances of a specific job?
i

Ilya Pogorelsky

08/31/2023, 5:59 PM
number of instances of a given job. only one instance of a job should be in-flight at any given time
z

Zach

08/31/2023, 6:00 PM
Hmm yeah I don't know if there's a way to constrain that for a specific job. I know you can limit the number of runs within a Dagster instance via the QueuedRunCoordinator config, but it sounds like you need to restrict the number of jobs for a specific job.
i

Ilya Pogorelsky

08/31/2023, 6:02 PM
yeah odd - you would think a “job” scheduler should have this as a core api; seems like dagster focused on a lot of flexibility within a job, but not so much at job level.
z

Zach

08/31/2023, 6:02 PM
Actually there's this blurb about limiting specific runs using tags
i

Ilya Pogorelsky

08/31/2023, 6:02 PM
thanks the link above looks useful - i will review
z

Zach

08/31/2023, 6:03 PM
Hopefully that's what you're looking for
1
2 Views