I have a multi-model IO manager for duckdb (parque...
# ask-community
g
I have a multi-model IO manager for duckdb (parquet files) which can work via pandas and pyspark. For some ingest process I know for sure that I only want to use pandas (and not the pyspark resource). Currently, I must list pyspark under the
required_resource_keys
. As a result, pyspark must be instanciated first (and this costs some overhead). I s there any way (without making a 2nd copy of the multimodal IO manager without the pyspark resource to get rid of this delay? When trying to pass
"pyspark": None,
dagster complains. So far, it looks for me like I need to specify a dummy resource instead. Is this the right way to go?
z
could you write and pass a pyspark resource that is lazily initialized? then you can initialize it only when you need it
g
But this would mean I need my own (custom resource) and cannot use the native one from dagster right? Would this be any better than having a mock resource I can pass instead?
But perhaps yes - as then the API to the outside is transparent
do you have any example for lazy instanciation?
z
The simplest thing I can think of is just having the resource be a function that returns a spark session (although not sure if resources can be functions, would be interesting to try):
Copy code
@resource
def lazy_pyspark_session():
    def _inner():
        from pyspark.sql import SparkSession
        return SparkSession.builder.getOrCreate()
    return _inner

@op(
    required_resource_keys={'pyspark': lazy_pyspark_session}
)
def spark_op(context):
    spark = context.resources.pyspark()
g
But then all the documentation of the existing one and the configuration handling no longer works / there is no auto-pass-through
z
ah what might be better is subclassing dagster_pyspark.resources.PySparkResource to have the spark_session method actually instantiate the spark session:
Copy code
from dagster_spark.configs_spark import spark_config
from dagster_pyspark.resources import spark_session_from_config

class LazyPySparkResource:
    
    def __init__(self, spark_conf):
        self._spark_session = None
        self._spark_conf = spark_conf
    
    def _init_session(self):
        if self._spark_session is None:
            self._spark_session = spark_session_from_config(self._spark_conf)
    @property
    def spark_session(self):
        self._init_session()
        return self._spark_session
    
    @property
    def spark_context(self):
        self._init_session()
        return self._spark_session.sparkContext

@resource({"spark_conf": spark_config()})
def lazy_pyspark_resource(init_context):
    """This resource provides access to a PySpark SparkSession for executing PySpark code within Dagster.

    Example:

    .. code-block:: python

        @op(required_resource_keys={"pyspark"})
        def my_op(context):
            spark_session = context.resources.pyspark.spark_session
            dataframe = spark_session.read.json("examples/src/main/resources/people.json")

        my_pyspark_resource = pyspark_resource.configured(
            {"spark_conf": {"spark.executor.memory": "2g"}}
        )

        @job(resource_defs={"pyspark": my_pyspark_resource})
        def my_spark_job():
            my_op()

    """
    return LazyPySparkResource(init_context.resource_config['spark_conf'])
no idea if that works
actually I guess you don't even really need to subclass in this case, just duck type
g
so you mean mock? I.e. as I initially suggested?
But that mocked one would have no function at all
I liked your lazy approach more as this is then transparent for the user
j
cc @chris, is there a way to make resources optional?
c
There’s no first-class support for optional resources
s
I filed an issue to track adding a dagster-native lazy pyspark resource: https://github.com/dagster-io/dagster/issues/7585
🎉 1
I also filed a broader issue to consider adding lazy resource loading as a core dagster feature: https://github.com/dagster-io/dagster/issues/7586
🎉 2
d
I have a question on issue 7586: wouldn't it make sense (at least for certain executors) to always initialize resources only for the ops which require them? Some of my ops in the past were much slower because they initialized many resources for the entire pipeline which they didn't require. I can't reproduce all details but I think this also gave me unwanted behavior e.g. for the dagster-mlflow resource, as runs were started multiple times instead of only for the relevant ops.