https://dagster.io/ logo
Title
j

Jonathan Lamiel

11/23/2022, 12:15 PM
Hello there, hoping this is the right place to ask about it. I’m new to Dagster, so I might not be aware of lot of works going on but so far I really like what has been done here. I’m looking at the architecture of Dagster and the scalability of it. I’m surprised about the
Dagster deamon
which can’t (per the doc) handle replicas for the moment. As it’s running sensors / schedule etc… I was wondering: 1. If there was benchmarks of workload it can handle in //? 2. Any plan to make it scalable? (Didn’t found any issues mentioning it)
p

prha

11/23/2022, 10:02 PM
Hi Jonathan. We don’t have any benchmarks for the sensor / schedule daemon, but we are operating it in production for our Dagster Cloud customers. For making it horizontally scalable, this is something that we’ve discussed so that we can be more resilient, haven’t prioritized as yet. We have made improvements to the existing sensor/schedule daemons to enable threading, so that there’s a bit of isolation between different sensors (e.g. 1 sensor timing out should not affect the evaluation of other sensors). The daemon process should also be isolated from the actual sensor/schedule code being evaluated, by making API calls to a gRPC server that’s hosting the repository definition.
t

Tomas H

12/12/2022, 2:03 PM
Hello @prha, I'm glad I found a relevant discussion regarding Dagster performance. I'm currently running my own performance/stress tests and I hit several weak points. The first is already fixed and merged into master (https://github.com/dagster-io/dagster/pull/10886). However, there are several other issues related to Dagster scalability which are already captured in following issues (https://github.com/dagster-io/dagster/issues/7763, https://github.com/dagster-io/dagster/issues/4311, https://github.com/dagster-io/dagster/issues/9623, and https://github.com/dagster-io/dagster/issues/4333). Although, we enabled useThreads for both sensors and schedules, sensors scales pretty badly (I have numbers to share). The same is valid for QueuedRunCoordinator which seems to be served sequentially limiting the total number of submitable jobs to k8s. My question is whether these issues are on the roadmap, and if so, when? We really adore Dagster but with plenty of small and latency sensitive jobs it seems the execution model is not suitable for us.
p

Pieter Custers

12/14/2022, 3:40 PM
Subscribe myself to this thread
d

daniel

12/14/2022, 4:45 PM
Hey Tomas - I'd definitely be curious what your numbers are on sensors - I'm also curious about what your performance requirements are and what the underlying use case is, if that's something you're able to share
Of the issues you linked I think https://github.com/dagster-io/dagster/issues/7763 (and https://github.com/dagster-io/dagster/issues/4311 which is similar) are next on our list to tackle
Actually some concurrency improvements for those two issues are shipping in the release tomorrow
t

Tomas H

12/15/2022, 3:42 PM
Hi Daniel, let me try the new release first and I'll happily share my sensors benchmark with you. I can't wait for the new release. Our use-cases are quite common. We are collecting plenty of data from various providers, usually fundamental data for ML models, of various formats. However, we need to collect the data at high frequency from some providers and these actions are rather time sensitive. Faster we get the data the better. So far, the common approach was to download the data at regular intervals, but sensors seemed to be a great way to just check there is actually something new to download. I can imagine different patterns, but in the end it almost always ends up with large number of rather quick jobs. Next to the robustness, the throughput of the orchestration tool, resulting in proper scalability, is our primary criteria. My observations so far were that we are limited by low throughput of QueuedRunCoordinator (used to protect the k8s cluster from infinite number of request), insufficiently scaling sensor processing (I believe at the daemon side) and quickly filling event_logs and run_tags table (just a hypothesis why dagit is becoming unresponsive, I do not know whether there is some impact on the daemon). I was able to partly overcome the issue with QueudRunCoordinator by the following snippet in my benchmark. The schedule is simply skipped if the previous run of the job is not done.
def execute_if_prev_job_finished(context: ScheduleEvaluationContext,
                                 job_name: str) -> Union[SkipReason, RunRequest]:
    logger = context.log
    logger.debug(f"Checking job run {job_name}")
    start_timestamp = datetime.now()
    run_records = context.instance.get_run_records(
        RunsFilter(job_name=job_name,
                   statuses=[DagsterRunStatus.STARTED,
                             DagsterRunStatus.QUEUED,
                             DagsterRunStatus.STARTING,
                             DagsterRunStatus.NOT_STARTED])
    )
    checked_in = str(datetime.now() - start_timestamp)
    logger.debug(f"Previous run of job {job_name} checked in {checked_in}")
    if len(run_records) == 0:
        logger.debug(f"Submitting run request for job {job_name}")
        yield RunRequest(tags={"run_requested_at": str(datetime.utcnow())})
    else:
        yield SkipReason(f'The previous job {job_name} have not finished yet.')

sleep_schedules_perm = [ScheduleDefinition(
    job=job,
    cron_schedule="* * * * *",
    execution_fn=functools.partial(execute_if_prev_job_finished, job_name=job.name))
    for job in sleep_jobs_perm]
d

daniel

12/15/2022, 3:45 PM
If there's any way to put more concrete numbers to what you're imagining for "large number of rather quick jobs" and what your latency requirements are - that would be really helpful for us as we think about future benchmarks for potential daemon improvements. I don't think we have any immediate plans to get to, say, p99 sub-second latency like a webserver, but we'd definitely like to improve the overall latency and throughput of the system, so knowing what kind of real data use cases are out there that require that is really useful
Oh, for the "quickly filling up the DB" thing - that's very plausible, I'd be curious what kind of DB size you're using
t

Tomas H

12/15/2022, 4:18 PM
I see, I can indicate some numbers we are working with. Currently, the rather obsolete tool we use runs concurrently 200-400 threads (depending on the daytime), each doing some kind collection (files, REST APIs) and transformation job. It usually takes from second to dozens of seconds to finish a single pipeline depending on its complexity. It actually support cron with a second resolution. We do not expect that the orchestration tool will be able to handle sub-second start-up times and we are planning to solve this in different way, as we are aware of the latency introduced by k8s scheduler (but this part scales pretty well). What do you mean exactly by DB size? For the benchmark purposes, I spun up smaller sized instance with 2 CPU a 4 GB RAM. However, I did a complete database dump, restored it on a local instance with 8 CPUs and ran a following query I captured (scheduler status) and the response time was like 20% lower.
SELECT runs.id, runs.run_body, runs.status, runs.create_timestamp, runs.update_timestamp, runs.start_time, runs.end_time 
FROM runs 
WHERE runs.run_id IN (SELECT run_tags.run_id 
FROM run_tags 
WHERE run_tags.key = 'dagster/schedule_name' AND run_tags.value = 'sleep_job_perm19_schedule' INTERSECT SELECT run_tags.run_id 
FROM run_tags 
WHERE run_tags.key = '.dagster/repository' AND run_tags.value = 'load_repo@code-load') ORDER BY runs.id DESC 
 LIMIT 1
🙏 1
d

daniel

12/15/2022, 4:20 PM
Yeah, I meant the size of the instance where the DB was running
👍 1
t

Tomas H

12/15/2022, 4:24 PM
In case, we could help with anything regarding performance testing/tuning or so, I am open.
a

alex

12/15/2022, 5:52 PM
insufficiently scaling sensor processing
Did you put all the sensors in the same code location / user code server? The daemon reaches out to this/these servers to evaluate the sensor functions. Splitting these out and/or tuning the
--max_workers
flag on
dagster api grpc
would likely yield higher throughput.
t

Tomas H

12/29/2022, 5:28 PM
Hi all, it took me some time but I finished my benchmarks. You can find my comments and findings at relevant issues and following links https://github.com/dagster-io/dagster/issues/4311#issuecomment-1367428270
d

daniel

12/29/2022, 5:42 PM
Thanks Tomas - what were the resource limits (# of VCPUs / memory / etc.) when you were you running your tests?