https://dagster.io/ logo
Title
s

Sundara Moorthy

02/15/2022, 12:48 PM
Hi Team, In pyspark-step-launcher file, in this line
emr_step_def = self._get_emr_step_def(run_id, step_key, step_context.solid.name)
It is submitting the spark jobs for each solid. How it is segregating the solid code from the whole file to run as a separate job?? for example
@op(required_resource_keys={"pyspark", "pyspark_step_launcher"})
def make_people(context) -> DataFrame:
    schema = StructType([StructField("name", StringType()), StructField("age", IntegerType())])
    rows = [Row(name="Thom", age=51), Row(name="Jonny", age=48), Row(name="Nigel", age=49)]
    return context.resources.pyspark.spark_session.createDataFrame(rows, schema)


@op(required_resource_keys={"pyspark_step_launcher"})
def filter_over_50(people: DataFrame) -> DataFrame:
    return people.filter(people["age"] > 50)

@graph
def make_and_filter_data():
    filter_over_50(make_people())
In this code, assuming we are using emr-step-launcher, how the filter_over_50 and make_people is segregated and executed as separate jobs?
d

daniel

02/15/2022, 2:42 PM
Hi Sundara - the command that is run by the step launcher in the EMR cluster includes instructions for which op to run. All the code is available in EMR, we don't do anything to isolate the code for a specific op - it just knows which op it's supposed to run, and only runs that single op.
s

Sundara Moorthy

02/15/2022, 3:13 PM
Which means dagster complete code will on the EMR cluster. By passing the op-name it will trigger that function. And Intermediate data between op's will be stored in s3 ? Correct me if i am wrong @daniel.
d

daniel

02/15/2022, 3:20 PM
I believe that's correct
s

Sundara Moorthy

02/15/2022, 3:22 PM
Okay, can we use the EMR steplauncher, without storing the intermediate data between op's ?
d

daniel

02/15/2022, 3:23 PM
I think you need some kind of io manager to pass data between ops, since it runs each op isolated
s

Sundara Moorthy

02/15/2022, 3:24 PM
Because, Assuming we are processing 1 TB data, in this case, our intermediate storage will be approx 1 TB + extra read/write which will affect the performance of the pipeline. @daniel