Even with configurable executors, I'm having troub...
# dagster-feedback
r
Even with configurable executors, I'm having trouble understanding how to optimize Dagster for performance - would be great with a doc page with some tips! Some particular questions of mine: 1. If I have a large asset with downstream dependencies persisted to e.g. parquet, and I run the assets as a joint job, will all downstream assets reload it from disk (expensive) or does it get passed in-memory? 2. In a multi-processing execution setting, how many times are expensive resources loaded and how are they (and intermediate assets/op results) shared? Is the process e.g. forked after they are instantiated? 3. I seem to have a large step_worker overhead from the Dagit execution logs - what does this represent and can it be minimized? (e.g. is it affected by the size of my environment?) 4. Is there significant overhead from the multiprocessing, and can it be beneficial to use an in-process executor?
g
(1) this depends on the IO manager - normally there is IO. This gets particularly interesting when using external compute with i.e. spark. There, you must ensure to use a suitable in-memory IO manager to only persist the result and not the steps. The default databricks runner makes this already a bit easier than when using the OSS yarn cloudera spark
r
Reloading massive dataframes seems like a pretty big overhead 😬 Checked the source for the standard memory_io_manager, and it literally just stores results to a dictionary and then reads it back. Would I need to add this sort of internal registry to all my IO managers to bypass the disk (I still want to save to disk, just not load if I can avoid it)? Does storing things in a simple dictionary on the manager even work with multiple processes and workers?
z
it does not work with multiple processes and workers, because in order to share objects between processes they have to be written to disk - it's just a python process nuance / limitation, not really specific to dagster. if you want to keep I/O between steps in memory you have to use the in-process executor. it's the same deal with resources when using the multi-process executor - they're spun up for every process, they do not get shared. I'm not super sure about 3), if you're not using a custom step_worker then I think all it's really doing is loading your step and associated objects. for 4), it seems pretty dependent on your code - if you have lots of heavy resources, they'll be instantiated for every step that requires them. if you don't have highly parallel workflows that might not matter much. it may be more efficient in some cases to use an in-process executor (particularly when passing large Dataframes, like geoHeil said), but you lose any chance at step-level parallelism
r
ah I can see I have been living with a misapprehension about resources, was certain that there was a performance benefit, but I see now that it's just structural. If I use the in-process executor with an IO manager that stores as parquet, won't downstream assets still reload the parquet file instead of passing it directly? Now I'm really starting to miss some best practices about how to manage large data files and in-memory resources (e.g. ML models) in Dagster outside of a warehouse integration.
z
it depends on how you write the IO manager. you could write it so that its handle_output method saves the dataframe both to a parquet file and keeps it in memory (like in a dictionary), and then the load_input method just load it from memory instead of from parquet (of course this only works with the in-process executor). but yes, I've ran into similar issues in a Spark / big data context. so far my experience has been that Dagster doesn't orchestrate big data pipelines natively well and that you're likely to need to farm out to external compute to optimize those tasks. one could theoretically get fancy with some kind of separate on-worker in-memory cache like Redis to back an IOManager with but there'd be a number of hoops to jump through serialization etc.
r
I guess base Dagster is exploiting that pickles load fairly fast to pass outputs around between processes? Is there a way to mix in-process and multi-process execution modes?
z
the pickle IO manager is just a default. if you use Dagster for very long you're very likely to write your own IO managers, and the framework is very much built expecting that to be the case. if you're going to use multiple processes you need some way of serializing the data between them, it's just a fact when it comes to sharing data between processes. one of the biggest overheads incurred by Spark is serializing data to share it across executors. it's not a problem unique to Dagster - when it comes to loading / writing huge dataframes it's kind of just a classic trade-off you face with big data pipelines - do you want to keep everything in-memory in a single DAG, but not be able to recover in case of failure? or do you want to write out intermediate steps (and significantly slow down) so you don't have to recalculate the whole DAG if a node fails? you face the same issue if you're writing a single spark job. I don't think you can mix in-process and multi-process within a single job.
actually interestingly enough I just saw this thread about using
execute_in_process
from within a multi-processed job, which might be along the lines of what you're looking for in terms of mixing in-process and multi-process - https://dagster.slack.com/archives/C01U954MEER/p1676646777536959
r
yes, already did build my own IO managers for parquet/yaml/json etc, but different formats can be deserialized at different speeds, so from a pipeline point of view it might be a problem to use parquet as intermediate storage since it's compressed and is slower to deserialize than a pickle? But pickles are OTOH poor for permanent storage. Do you know anything about the new caching mechanism? How does it play into this? Is it it maintaining a an additional cache copy of the output, or just calling the IO manager to retrieve?
z
I don't know much about the new caching mechanism, haven't really gotten a chance to check it out. I'm not completely sure what would be the fastest intermediate storage for a pipeline, it really depends on the problem. in some cases parquet might be faster as you can load individual columns / partitions instead of reading the whole file.