hello, i'm trying to figure out how to use dagster...
# announcements
f
hello, i'm trying to figure out how to use dagster-pyspark. i run spark on k8s so i have to set a lot of spark config variables (k8s serviceaccount, ...) i try pyspark_resource.configured({"spark_conf":{}}) but it seems to ignore a lot of my variables, eg spark.kubernetes.authenticate.driver.serviceAccountName
👀 1
c
Cc @bob @johann
b
@Frank Dekervel currently, dagster doesn’t recognize k8s config that’s specified in
spark_conf
. that’s why your
spark.kubernetes.*
variables are being ignored alternatively, a solution would be to specify user-defined k8s config on your solid or pipeline that uses the
pyspark_resource
. this is done with the
tags
argument in the
@solid
and
@pipeline
decorator i made a quick example below for specifying the k8s ServiceAccountName:
Copy code
@pipeline(
    # ...other parameters,
    tags={
        'dagster-k8s/config': {
            'pod_spec_config': {
                'serviceAccountName': 'my-k8s-service-account',
            },
        },
    },
)
def my_pipeline_that_uses_pyspark():
    pass
if you have additional variables that usually go in
spark.kubernetes.*
, you might be able to fit them in
tags['dagster-k8s/config]
another example can be found in the changelog announcement of “user-defined k8s config”
f
is there a reason why it doesn't simply pass through all spark_conf variables ? because i can imagine that it will be hard to be complete here ... other users might want to set spark.hadoop.* variables eg for S3 access too
i think the above solution is more for dagster-k8s ... i'm not using dagster-k8s, the k8s configuration is needed by the spark driver to launch worker nodes and access hadoop
so dagster doesn't interact directly with k8s.. dagster spins up a spark session and spark uses k8s as a cluster manager
b
i see. for reference, here is the Python script that generates the schema of “spark_conf”. it lists out the config variables that would be recognized
f
so in order to support the spark k8s vars they would need to be added to CONFIG_TYPES ?
i think that will be impossible to support in a static way ... for instance, on openshift, the following config is needed:
so that the spark docker image can be taken from an openshift image stream
spark.kubernetes.executor.annotation.[blabla] is supported as variable name. so its impossible to statically check the variable name here
b
regarding your question about adding spark k8s vars to
CONFIG_TYPES
, that wouldnt be sufficient 😓
parse_spark_configs.py
fetches the markdown for the Spark config doc website and generates the Dagster config*,* but the config variables for Spark kubernetes stuff is on a different page :///
CONFIG_TYPES
is there for typing non-string config variables. by default, all the Spark config variables will be
str
typed
f
ok, fetching and parsing another page wouldn't be too hard ... but that doesn't solve the problem with k8s annotations
👀 1
b
altho, many of the fields in
CONFIG_TYPES
are still
ConfigType.String
🤔 hmm
f
wouldn't it make more sense to just pass thru all key/values that cannot be typechecked ?
the typechecking in my case didn't result in an error either, just in the values being ignored
j
cc @sandy
b
yeah, it looks like the key/values of
spark_config
end up being passed to `SparkSession.builder` which should recognize annotations or whatnot
s
hey @Frank Dekervel - if you do the following, "spark.some.config" should end up in the configuration for the SparkSession instantiated by the pyspark resource, even if "spark.some.config" isn't on the list we scraped from the spark website
Copy code
pyspark_resource.configured({"spark_conf":{"spark.some.config": "some_value"}})
f
and dagster is complaining about wrong service account ...
but apparently it saw the spark.master variable (since it tried to connect to kubernetes, just with the wrong account)
(ow and i have a duplicate var in my conf, removing it, spark.executor.memory)
but i'm also puzzled why it doesn't work ... there are Permissive everywhere in the configs_spark.py
s
if you generate a SparkSession on your own with that config, do you encounter that same error?
f
well i use the same set of variables on spark-submit and with polynote and there it works
its hard to debug on openshift, i'll try to create a spark_conf object locally and see what's in there
hmm can i introspect a ResourceDefinition object that is the result of .configured ?
s
using the pyspark resource should be exactly equivalent to instantiating a SparkSession inside your solid and providing it with the configuration you're describing
(it's possible that it's not working as expected, but trying out the SparkSession would determine that)
even if that config is making it inside the spark_conf, it's possible that it's not making it to the SparkSession
f
yeah its making it in the spark_conf .. i just did resource._configured_config_mapping_fn({}) and i still see the serviceaccount config there in the result
ok got it
it was my fault 😞
i also need to make sure the pod running dagster also has access to that service account
s
spark + k8s is a very complex beast
f
yeah ... after fixing this spark lauches the worker nodes ... which fail to connect to the driver node, so that's my next problem
but at least the problem isn't dagster related
tx for the help
s
glad you're making progress!
f
ok, i have the EMR example now working on an on-premises openshift cluster
🙌 2