https://dagster.io/ logo
#ask-community
Title
# ask-community
m

Mark Fickett

05/03/2022, 8:35 PM
I'm running a job with something like ~20k steps. Should that perform well? I'm using the local multiprocessing executor with
max_concurrent=36
, and the default (SQLite?) event storage. I think the bulk of the logging messages are Dagster events. I'm finding that it takes at least twice as long as the implementation I'm porting into Dagster, which used
concurrent.futures
. The Dagit UI is also very laggy, in terms of loading the run view, showing which ops are executing/preparing, and loading logs. Many of the ops are very lightweight: just read 1 string input and make 1 API call and then return nothing; but they do benefit from being run in parallel. I'm wondering if trying to reduce the number of ops would help, but while that could reduce the number of steps somewhat, I really do want a big fan-out. I could do some chunking though if op startup/shutdown is likely taking a lot of time, so each op does 10-100 work items. Is the default event storage likely slowing things down? I could set up a local postgres but don't want to bother with that if it's unlikely to improve performance. I'll also be moving this to Dagster Cloud with Kubernetes, so I want to prepare for good performance / resource usage there, too.
d

daniel

05/03/2022, 8:37 PM
Hi Mark - I would expect Postgres to perform a lot better than sqlite here, yeah. Especially with lots of concurrent work happening at once.
m

Mark Fickett

05/03/2022, 8:38 PM
OK, I'll give that a go.
d

daniel

05/03/2022, 8:38 PM
I can't promise it will immediately address everything you've listed here, and happy to dig more into those as needed, but its definitely the first thing i would try
ty thankyou 1
m

Mark Fickett

05/06/2022, 2:26 PM
Switching to postgres definitely improved performance. A job processing a few hundred tests (maybe a thousand steps) runs in ~20m, similar to the previous implementation. But running a full scale job (~6k tests) still runs very slowly. An early stage of the pipeline usually takes < 1h, and it hadn't completed in 4+ hours. As a gut check, is a job with 20k steps (mostly from DynamicOuts) something people typically run in Dagster?
d

daniel

05/06/2022, 2:28 PM
That's definitely within the range of what we support, yes. Happy to help dig into the source of the slowdown with you. Are there any indication from the event log about which steps are now taking longer? Like if you compare the job with 1k steps to 20k steps, is there a clear piece of the puzzle that gets incrementally slower per test?
ty thankyou 1
m

Mark Fickett

05/06/2022, 2:33 PM
Good to know it's not unreasonable. I'll see if I can find some patterns. I may also see if I can stub out the real pipeline work and replicate the slowdown with something I can share.
a

alex

05/06/2022, 2:33 PM
what are the resources on the machine you are running this? you could send over a debug file too so we can see what the timings look like
I forget if we discussed this in another thread, but have you found the
start_method: forkserver
config option on the multiprocess executor?
m

Mark Fickett

05/06/2022, 2:36 PM
Nope, tell me more?
Currently it's running on a Mac Mini, 64GB RAM, i7 3.2GHz, 2TB SSD.
a

alex

05/06/2022, 2:39 PM
today is the day i put aside other work and finally write some docs for this here is the description which should be viewable in the dagit playground as well https://github.com/dagster-io/dagster/blame/master/python_modules/dagster/dagster/core/definitions/executor_definition.py#L276-L299 basically its a tool to reduce the per-process init costs for the multiprocess executor. We can’t default it to on since it can sometimes cause weird bugs in some libraries.
m

Mark Fickett

05/06/2022, 2:48 PM
I tried just adding
start_method: forkserver: {}
and it crashed, but glad to experiment more if that seems useful. Here's the debug download.
😢 1
Though running with
forkserver
on my Linux dev machine is chugging along merrily, and looks faster (I don't get time to click on a preparing op before it's already gone, whereas usually I have a second or two).
a

alex

05/06/2022, 2:54 PM
was the mac crash some objective-c code with
requests
in the call stack?
👀 1
chrome is having trouble trying to view this run so i would say you are definitely stressing our software the per op overhead (which is per process in multiprocess) is non zero, so a batching approach may be the best bet. It will also reduce overall event volume which should make dagit more usable. I would also say to be careful with
context.log
volume for this much fan-out. in the hopefully not-to-distant future you use an executor with in-process parallelism (likely async/await ) would probably be a great fit given your description. tracking issue https://github.com/dagster-io/dagster/issues/4041 here today op functions can be
async
and we will drive them to completion, so you can use asyncio tools to manage concurrency within an op for your batched work.
m

Mark Fickett

05/06/2022, 4:05 PM
Thanks, I'll see about batching and reduced logging.
Here's the Python crash report:
I tried running the job with pretty much all logging removed. (I had been pulling the root Python logger into Dagster's managed logging, and I removed that, which removed logging from the highly parallelized part of the job.) That still ran very slowly, attached is the debug output. I'm trying the approach of running in chunks instead of fully parallelizing and that seems to work better. Hopefully I can add back in logging and maintain performance. For now, reducing parallelism is OK, but it would be really nice to be able to throw thousands of DynamicOutputs at Dagster and have it deal with the parallelism instead of having to chunk based on some heuristics and hope the chunks finish at close enough to the same time.
Running in chunks did turn out to work much better. I ended up with 1728 steps executed after my job ran successfully. I tried turning logs back on, and had 51972 log messages. Having the log messages didn't seem to affect job execution time. However that made the Dagit UI load really slowly whenever I opened the job / while it was running; on the other hand it does eventually load and then I can filter to logs by each step. For completeness, attached is a debug dump from that. Chunking is a fine workaround for now, but it would be really nice to be able to do the full fan-out of dynamic outputs in Dagster, and to understand where the issue stems from. That async executor does sound interesting; we were wondering whether doing some lightweight parallelism inside of heavyweight k8s ops would be a good long-term approach. Less importantly, it'd be nice to have a clearer sense of how much logging Dagit can handle. Being able to browse/filter logs by op in the Dagit UI is one of Dagster's shining features, I think. Is the limit the overall number of log messages (like it's loading all logs into memory before rendering the Dagit UI), or logs per op (about 25/op in this case though definitely some more than others), or something else? Looking to get a sense of if there's a way to work around the limit other than just reducing the total log volume.
a

alex

05/09/2022, 10:16 PM
it’s loading all logs into memory before rendering the Dagit UI
ya this is the current implementation, so total volume is what effects user experience
m

Mark Fickett

05/10/2022, 3:10 AM
Thanks Alex, good to know. I filed https://github.com/dagster-io/dagster/issues/7821 as a FR for handling larger log volumes, but I'll pay attention to our highest-volume logs and see what I can pare down.
2 Views