Sundara Moorthy
02/15/2022, 12:48 PMemr_step_def = self._get_emr_step_def(run_id, step_key, step_context.solid.name)
It is submitting the spark jobs for each solid. How it is segregating the solid code from the whole file to run as a separate job?? for example
@op(required_resource_keys={"pyspark", "pyspark_step_launcher"})
def make_people(context) -> DataFrame:
schema = StructType([StructField("name", StringType()), StructField("age", IntegerType())])
rows = [Row(name="Thom", age=51), Row(name="Jonny", age=48), Row(name="Nigel", age=49)]
return context.resources.pyspark.spark_session.createDataFrame(rows, schema)
@op(required_resource_keys={"pyspark_step_launcher"})
def filter_over_50(people: DataFrame) -> DataFrame:
return people.filter(people["age"] > 50)
@graph
def make_and_filter_data():
filter_over_50(make_people())
In this code, assuming we are using emr-step-launcher, how the filter_over_50 and make_people is segregated and executed as separate jobs?daniel
02/15/2022, 2:42 PMSundara Moorthy
02/15/2022, 3:13 PMdaniel
02/15/2022, 3:20 PMSundara Moorthy
02/15/2022, 3:22 PMdaniel
02/15/2022, 3:23 PMSundara Moorthy
02/15/2022, 3:24 PM