geoHeil
04/20/2022, 9:57 AMrequired_resource_keys
. As a result, pyspark must be instanciated first (and this costs some overhead). I s there any way (without making a 2nd copy of the multimodal IO manager without the pyspark resource to get rid of this delay? When trying to pass "pyspark": None,
dagster complains. So far, it looks for me like I need to specify a dummy resource instead. Is this the right way to go?Zach
04/20/2022, 12:34 PMgeoHeil
04/20/2022, 12:35 PMgeoHeil
04/20/2022, 12:36 PMgeoHeil
04/20/2022, 12:36 PMZach
04/20/2022, 12:43 PM@resource
def lazy_pyspark_session():
def _inner():
from pyspark.sql import SparkSession
return SparkSession.builder.getOrCreate()
return _inner
@op(
required_resource_keys={'pyspark': lazy_pyspark_session}
)
def spark_op(context):
spark = context.resources.pyspark()
geoHeil
04/20/2022, 1:09 PMZach
04/20/2022, 1:20 PMfrom dagster_spark.configs_spark import spark_config
from dagster_pyspark.resources import spark_session_from_config
class LazyPySparkResource:
def __init__(self, spark_conf):
self._spark_session = None
self._spark_conf = spark_conf
def _init_session(self):
if self._spark_session is None:
self._spark_session = spark_session_from_config(self._spark_conf)
@property
def spark_session(self):
self._init_session()
return self._spark_session
@property
def spark_context(self):
self._init_session()
return self._spark_session.sparkContext
@resource({"spark_conf": spark_config()})
def lazy_pyspark_resource(init_context):
"""This resource provides access to a PySpark SparkSession for executing PySpark code within Dagster.
Example:
.. code-block:: python
@op(required_resource_keys={"pyspark"})
def my_op(context):
spark_session = context.resources.pyspark.spark_session
dataframe = spark_session.read.json("examples/src/main/resources/people.json")
my_pyspark_resource = pyspark_resource.configured(
{"spark_conf": {"spark.executor.memory": "2g"}}
)
@job(resource_defs={"pyspark": my_pyspark_resource})
def my_spark_job():
my_op()
"""
return LazyPySparkResource(init_context.resource_config['spark_conf'])
Zach
04/20/2022, 1:21 PMZach
04/20/2022, 1:22 PMgeoHeil
04/20/2022, 1:26 PMgeoHeil
04/20/2022, 1:26 PMgeoHeil
04/20/2022, 1:26 PMjohann
04/20/2022, 2:08 PMchris
04/20/2022, 3:08 PMsandy
04/26/2022, 3:56 PMsandy
04/26/2022, 3:58 PMDaniel Michaelis
05/03/2022, 1:24 PM