```> dagster - ERROR - pyspark_pipeline - 1d12e...
# announcements
s
Copy code
> dagster - ERROR - pyspark_pipeline - 1d12e4e7-15e0-49f8-a394-27dcf29ef9cb - PIPELINE_INIT_FAILURE - Pipeline failure during initialization of pipeline "pyspark_pipeline". This may be due to a failure in initializing a resource or logger.
 event_specific_data = {"error": ["py4j.protocol.Py4JError: An error occurred while calling <http://None.org|None.org>.apache.spark.api.python.PythonAccumulatorV2. Trace:\npy4j.Py4JException: Constructor org.apache.spark.api.python.PythonAccumulatorV2([class java.lang.String, class java.lang.Integer, class java.lang.String]) does not exist\n\tat py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:179)\n\tat py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:196)\n\tat py4j.Gateway.invoke(Gateway.java:237)\n\tat py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)\n\tat py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:238)\n\tat java.lang.Thread.run(Thread.java:748)\n\n\n", ["  File \"/opt/anaconda3/envs/dask_pyarrow/lib/python3.7/site-packages/dagster/core/errors.py\", line 161, in user_code_error_boundary\n    yield\n", "  File \"/opt/anaconda3/envs/dask_pyarrow/lib/python3.7/site-packages/dagster/core/execution/resources_init.py\", line 138, in single_resource_event_generator\n    resource = next(gen)\n", "  File \"/opt/anaconda3/envs/dask_pyarrow/lib/python3.7/site-packages/dagster_pyspark/resources.py\", line 66, in pyspark_resource\n    pyspark = SystemPySparkResource(init_context.resource_config['spark_conf'])\n", "  File \"/opt/anaconda3/envs/dask_pyarrow/lib/python3.7/site-packages/dagster_pyspark/resources.py\", line 30, in __init__\n    self._spark_session = spark_session_from_config(spark_conf)\n", "  File \"/opt/anaconda3/envs/dask_pyarrow/lib/python3.7/site-packages/dagster_pyspark/resources.py\", line 25, in spark_session_from_config\n    return builder.getOrCreate()\n", "  File \"/opt/anaconda3/envs/dask_pyarrow/lib/python3.7/site-packages/pyspark/sql/session.py\", line 173, in getOrCreate\n    sc = SparkContext.getOrCreate(sparkConf)\n", "  File \"/opt/anaconda3/envs/dask_pyarrow/lib/python3.7/site-packages/pyspark/context.py\", line 367, in getOrCreate\n    SparkContext(conf=conf or SparkConf())\n", "  File \"/opt/anaconda3/envs/dask_pyarrow/lib/python3.7/site-packages/pyspark/context.py\", line 136, in __init__\n    conf, jsc, profiler_cls)\n", "  File \"/opt/anaconda3/envs/dask_pyarrow/lib/python3.7/site-packages/pyspark/context.py\", line 207, in _do_init\n    self._javaAccumulator = self._jvm.PythonAccumulatorV2(host, port, auth_token)\n", "  File \"/opt/anaconda3/envs/dask_pyarrow/lib/python3.7/site-packages/py4j/java_gateway.py\", line 1525, in __call__\n    answer, self._gateway_client, None, self._fqn)\n", "  File \"/opt/anaconda3/envs/dask_pyarrow/lib/python3.7/site-packages/py4j/protocol.py\", line 332, in get_return_value\n    format(target_id, \".\", name, value))\n"], "Py4JError", null]}
We tried to play with
SPARK_HOME
and tried to use findspark in the
dagster_pyspark/resources.py
file but had no success . Any suggestions?
m
is the config the same in the two cases?
and same virtualenv/conda env?
n
not sure why this is only happening w/ dagster vs. with dagit, but this might be related https://stackoverflow.com/questions/53946519/pyspark-py4j-error-using-canopy-pythonaccumulatorv2class-java-lang-string-cl
is the pyspark in your conda environment the same version as in your SPARK_HOME?
s
1. The configs are exactly the same. 2. In both cases we are running with the same conda environment 3. pyspark version is 2.4.5, 4. Spark2 (using Cloudera Parcels) 2.3.0
Unfortunately I can't upgrade SPARK - so we may need to try and downgrading the pyspark. Still it is strange that it works with
dagit
n
@sephi yes we don’t depend on any specific version of pyspark anywhere, so you should be able to use pyspark 2.3.0 in dagster without any issues. But please let us know if you hit anything
s
Well - we downgraded to
pyspark 2.3.0
(which also requires
python 3.6
) - but still have the same problems. when running
dagit
there is no problem, but when running
dagster
we get a different error:
java.util.NoShuchElementException: key not found: _PYSPARK_DRIVER_CONN_INFO_PATH
I'm still trying to understand where you define SPARK_HOME in the conda envrionment.
We managed to solve the problem by adding at the top of our
composit_solids.py
file
findspark.init(spark_home='/path/to/spark/lib/')
m
@nate ^^
@sandy