https://dagster.io/ logo
#ask-community
Title
# ask-community
d

Daniel Galea

12/05/2022, 10:06 AM
Hi, good morning everyone! I am trying to build an pipeline in Dagster which does the following: 1. Launch an EMR cluster using the EmrJobRunner class, by using its run_job_flow function. 2. Add one or more steps to that cluster to process data in PySpark by using the emr_pyspark_step_launcher resource. 3. Shut down the cluster once all steps are finished. I followed this tutorial first, which assumes that you have an EMR cluster running and you hard code the EMR cluster ID as part of the Job specification. This way worked, as I could see my steps being run on EMR. However, when I try to automate the process I noticed that PySpark was running locally and not on EMR. I tried to wrap the emr_pyspark_step_launcher as a Resource which sets the cluster ID as part of the pipeline. The cluster ID can be obtained by using a function in the EmrJobRunner class which returns a cluster ID when providing a cluster name. I am trying to dynamically add the cluster ID during the job after launching the cluster but this isn't working as expected. Has someone tried to setup this type of pipeline and did it successfully? If so, how did you do it? I posted my code here, any help would be appreciated! Thank you 🙂