Yan
03/08/2021, 11:35 AMschrockn
03/08/2021, 4:13 PM@resource(…)
def pyspark_resource(init_context):
pyspark = SystemPySparkResource(init_context.resource_config['spark_conf'])
try:
yield pyspark
finally:
if init_context.resource_config['stop_session']:
pyspark.stop()
The code after the yield is executed during execution spin down. With the in-process executor that would happen at the end of pipeline executor. For other executors this happens at process spin down, likely on a per-solid basis.Yan
03/08/2021, 4:20 PMDaniel Michaelis
11/03/2021, 12:17 PMpyspark_resource
that automatically spins down after execution and I had to make some adjustments for the code to work. Could someone review if the following code looks good?
from dagster import resource, Field, BoolSource
from dagster_pyspark.resources import PySparkResource
from dagster_spark.configs_spark import spark_config
@resource(config_schema={"spark_conf": spark_config(),
"stop_session": Field(BoolSource,
description=="If true, automatically stops the Spark session after execution",
default_value=True,
is_required=False)})
def pyspark_resource_stoppable(init_context):
pyspark = PySparkResource(init_context.resource_config['spark_conf'])
try:
yield pyspark
finally:
if init_context.resource_config.get('stop_session'):
pyspark.spark_session.stop()
I'm thinking that maybe this should even be default behavior for the pyspark_resource
provided in dagster_pyspark
such that there are no open sessions after a job is run:
@resource({"spark_conf": spark_config()})
def pyspark_resource(init_context):
pyspark = PySparkResource(init_context.resource_config['spark_conf'])
try:
yield pyspark
finally:
pyspark.spark_session.stop()
What's the Dagster team's opinion on this?yuhan
11/03/2021, 4:49 PMDagster Bot
11/03/2021, 4:49 PMyuhan
11/03/2021, 4:54 PM