I have a multi-model IO manager for duckdb (parque...
# ask-community
I have a multi-model IO manager for duckdb (parquet files) which can work via pandas and pyspark. For some ingest process I know for sure that I only want to use pandas (and not the pyspark resource). Currently, I must list pyspark under the
. As a result, pyspark must be instanciated first (and this costs some overhead). I s there any way (without making a 2nd copy of the multimodal IO manager without the pyspark resource to get rid of this delay? When trying to pass
"pyspark": None,
dagster complains. So far, it looks for me like I need to specify a dummy resource instead. Is this the right way to go?
could you write and pass a pyspark resource that is lazily initialized? then you can initialize it only when you need it
But this would mean I need my own (custom resource) and cannot use the native one from dagster right? Would this be any better than having a mock resource I can pass instead?
But perhaps yes - as then the API to the outside is transparent
do you have any example for lazy instanciation?
The simplest thing I can think of is just having the resource be a function that returns a spark session (although not sure if resources can be functions, would be interesting to try):
Copy code
def lazy_pyspark_session():
    def _inner():
        from pyspark.sql import SparkSession
        return SparkSession.builder.getOrCreate()
    return _inner

    required_resource_keys={'pyspark': lazy_pyspark_session}
def spark_op(context):
    spark = context.resources.pyspark()
But then all the documentation of the existing one and the configuration handling no longer works / there is no auto-pass-through
ah what might be better is subclassing dagster_pyspark.resources.PySparkResource to have the spark_session method actually instantiate the spark session:
Copy code
from dagster_spark.configs_spark import spark_config
from dagster_pyspark.resources import spark_session_from_config

class LazyPySparkResource:
    def __init__(self, spark_conf):
        self._spark_session = None
        self._spark_conf = spark_conf
    def _init_session(self):
        if self._spark_session is None:
            self._spark_session = spark_session_from_config(self._spark_conf)
    def spark_session(self):
        return self._spark_session
    def spark_context(self):
        return self._spark_session.sparkContext

@resource({"spark_conf": spark_config()})
def lazy_pyspark_resource(init_context):
    """This resource provides access to a PySpark SparkSession for executing PySpark code within Dagster.


    .. code-block:: python

        def my_op(context):
            spark_session = context.resources.pyspark.spark_session
            dataframe = spark_session.read.json("examples/src/main/resources/people.json")

        my_pyspark_resource = pyspark_resource.configured(
            {"spark_conf": {"spark.executor.memory": "2g"}}

        @job(resource_defs={"pyspark": my_pyspark_resource})
        def my_spark_job():

    return LazyPySparkResource(init_context.resource_config['spark_conf'])
no idea if that works
actually I guess you don't even really need to subclass in this case, just duck type
so you mean mock? I.e. as I initially suggested?
But that mocked one would have no function at all
I liked your lazy approach more as this is then transparent for the user
cc @chris, is there a way to make resources optional?
There’s no first-class support for optional resources
I filed an issue to track adding a dagster-native lazy pyspark resource: https://github.com/dagster-io/dagster/issues/7585
🎉 1
I also filed a broader issue to consider adding lazy resource loading as a core dagster feature: https://github.com/dagster-io/dagster/issues/7586
🎉 2
I have a question on issue 7586: wouldn't it make sense (at least for certain executors) to always initialize resources only for the ops which require them? Some of my ops in the past were much slower because they initialized many resources for the entire pipeline which they didn't require. I can't reproduce all details but I think this also gave me unwanted behavior e.g. for the dagster-mlflow resource, as runs were started multiple times instead of only for the relevant ops.