https://dagster.io/ logo
#announcements
Title
# announcements
y

Yan

03/08/2021, 11:35 AM
Hi What is the best way to destroy resource (eg close spark session) after pipeline finish or failure?
s

schrockn

03/08/2021, 4:13 PM
Hey Yan thanks for the question. This depends on the executor and the resources involved. If you want a pyspark resource that spins down the context explicitly the code would look like this:
Copy code
@resource(…)
def pyspark_resource(init_context):
    pyspark = SystemPySparkResource(init_context.resource_config['spark_conf'])
    try:
        yield pyspark
    finally:
        if init_context.resource_config['stop_session']:
            pyspark.stop()
The code after the yield is executed during execution spin down. With the in-process executor that would happen at the end of pipeline executor. For other executors this happens at process spin down, likely on a per-solid basis.
y

Yan

03/08/2021, 4:20 PM
Great tnx Exactly what i need
👍 1
d

Daniel Michaelis

11/03/2021, 12:17 PM
Hi, I tried to make use of the
pyspark_resource
that automatically spins down after execution and I had to make some adjustments for the code to work. Could someone review if the following code looks good?
Copy code
from dagster import resource, Field, BoolSource
from dagster_pyspark.resources import PySparkResource
from dagster_spark.configs_spark import spark_config

@resource(config_schema={"spark_conf": spark_config(),
                         "stop_session": Field(BoolSource,
                                               description=="If true, automatically stops the Spark session after execution",
                                               default_value=True,
                                               is_required=False)})
def pyspark_resource_stoppable(init_context):
    pyspark = PySparkResource(init_context.resource_config['spark_conf'])
    try:
        yield pyspark
    finally:
        if init_context.resource_config.get('stop_session'):
            pyspark.spark_session.stop()
I'm thinking that maybe this should even be default behavior for the
pyspark_resource
provided in
dagster_pyspark
such that there are no open sessions after a job is run:
Copy code
@resource({"spark_conf": spark_config()})
def pyspark_resource(init_context):
    pyspark = PySparkResource(init_context.resource_config['spark_conf'])
    try:
        yield pyspark
    finally:
        pyspark.spark_session.stop()
What's the Dagster team's opinion on this?
y

yuhan

11/03/2021, 4:49 PM
@Dagster Bot issue tear down pyspark resource after a run ends
d

Dagster Bot

11/03/2021, 4:49 PM
y

yuhan

11/03/2021, 4:54 PM
Hi @Daniel Michaelis, the code looks good. In terms of the default behavior, I think you’re right that it should spin down the session after the execution - filing an issue and will prioritize it.
7 Views