https://dagster.io/ logo
Title
d

Daniel Michaelis

04/30/2021, 1:35 PM
Hi all, I'm working on a project where we want to run Spark jobs on Kubernetes, read from S3 and save intermediate results there (especially parquet) and orchestrate via Dagster. My experience with all these tools is still fairly basic and I'm trying to figure out the best way to combine them effectively while avoiding overcomplication. So far I managed to run Spark on a local Kubernetes cluster (Microk8s) and read a csv file from S3 into a Spark Dataframe (with Dagster). In general I also managed to deploy Dagster to Kubernetes and run pipelines from there. I am currently planning my next steps and I was wondering if anyone could give me feedback on my planned approaches: 1. As a next step I would like to combine what I have so far and read the csv from S3 from within the Dagster deployment on Kubernetes. Locally I manually login via
aws sso login
which creates temporary credentials. I created a custom
pyspark_s3_resource
which accesses these credentials via
boto3.Session().get_credentials()
and adjusts the `pyspark_resource`'s hadoop config, so it can read from S3. However, I am unsure how I can access these temporary credentials from within the Dagster Pod on Kubernetes. I was suggested to mount the folder with the credentials into the Dagster user code Pod via hostPath but I'm unsure how to do that and if it's a valid solution. Any thoughts on that? (I'm only interested in a quick workaround solution for my local cluster as the AWS authentication will be solved differently in our Production cluster on EKS.) 2. Are there any best practices on how to run Spark jobs efficently with Dagster? A naive approach would be to save all intermediate results of each solid (especially Dataframes as parquet) on S3, however saving ALL intermediates and starting new Spark sessions in every solid effectively negates the advantages of Spark, i.e. lazy evaluation, caching, etc. This could be avoided by combining several solids into a monolith solid but this would contradict the single-responsibility principle (each step only does one thing). Is it possible to share a single Spark session in several consecutive solids within a pipeline, and e.g. pass the results from one solid to another via a custom IOManager that caches the results instead of saving them, or only passes them without doing anything? 3. As my pipeline will contain several steps that don't depend on one another, I would like to run solids in parallel as well. This means running independent Spark jobs in parallel. As I'm not a Spark expert, I don't know what's the best approach to do so, especially on Dagster and on Kubernetes. Is this something the celery-kubernetes executor can solve? (is it recommended to combine Spark and Celery at all)? I know this is a lot at once but even partial help on any of these questions would be greatly appreciated as the entire framework is starting to get a bit overwhelming, based on the more and more complex infrastructure requirements from our core developer and DevOps team.
j

johann

04/30/2021, 2:16 PM
Hi Daniel, thanks for the in-depth questions! I’ll try my best: 1. There’s a few ways to do this. I think you could solve it by loading the env vars in to the pods: https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-envvars.html. This can be passed to a k8s pod by creating a generic k8s secret, and then setting
env_secrets: ['secret-name']
in your celery-k8s executor config, per https://github.com/dagster-io/dagster/blob/master/python_modules/libraries/dagster-k8s/dagster_k8s/job.py#L210. Happy to provide more details here, looking at it I’m noticing that it could definitely be better documented on our side. 2. cc @sandy 3. Yes, solids that don’t depend on eachother, e.g
@pipeline
def pipeline:
  solid_a()
  solid_b()
will execute in parallel with the celery-k8s executor. I think the only executor that wouldn’t is the in-process executor.
s

sandy

04/30/2021, 4:35 PM
Regarding your second question, it's highly related to what executor you're using. You have the choice to user either the in-process executor, which executes all solids in a pipeline in the same process, or an executor like the multiprocess executor, which executes each solid in a separate process. If you're using a multiprocess executor, you are essentially stuck with saving all the intermediate results on S3 (or other persistent storage), because Spark provides no way to serialize a DataFrame object without serializing out its full contents. If you're using the single-process executor, then the approach you described of sharing a single spark session between many solids in the same pipeline is definitely possible and advantageous for the reasons you described. I think both approaches are "good practice" - it just depends on what exactly you're trying to do.
j

johann

04/30/2021, 4:38 PM
To add to Sandy’s response- the celery-k8s executor that you are using falls under a ‘multiprocess’ executor, since the steps are happening in isolation. It is also possible to use the in-process executor on k8s- If you use the K8sRunLauncher (all steps will run within one k8s pod). Tradeoffs in either approach.
Most production use cases will want a multi-process executor for isolation and parallelism, but as you point out it does incur overhead