https://dagster.io/ logo
Title
r

Rohan Ahire

08/02/2021, 6:11 PM
Hi, I am running pyspark job as a solid, by defining and configuring a spark resource with all configurations and the job is running fine as expected. However, I am not able to see the spark logs, that I would generally see if I would have run the job as a spark-submit without the use of dagster. Like for example, I do not see the spark job application ID to look up the job on the spark cluster. Please suggest a way too see all logs generated by the spark application.
s

sandy

08/02/2021, 6:48 PM
Hi Rohan - are you executing the job from Dagit? If so, the Spark logs will likely show up in the raw text output for the step. If you're in the run view looking at logs, you can access get to that by clicking on the right button of the two buttons in the top left of the log view:
r

Rohan Ahire

08/02/2021, 8:21 PM
Yes, I am using dagit and then running the pipeline from the Playground. In the attached screenshot, I do not see the application ID that can help me trace the job back to the hadoop resource manager.
s

sandy

08/02/2021, 8:34 PM
Hey Rohan - what you want to get to is the second of the two images I sent. To get there, you need to click the button to see the "Raw Console Output". That button is the second from the left in the image that I sent
r

Rohan Ahire

08/02/2021, 8:41 PM
The Stdout is blank. The stderr has the same information as shown in the structured event logs. I do not see detailed logs from the yarn or spark engine.
s

sandy

08/02/2021, 8:46 PM
Ah - I suspect that what's happening is that the Spark job is not running inside the cluster and is just running inside the local process? What kind of cluster are you intending it to run against? E.g. EMR, Databricks?
r

Rohan Ahire

08/02/2021, 8:49 PM
the spark job is running on the cluster (on premise cluster) because I can see the application being submitted and completed in the resource manager UI.
s

sandy

08/02/2021, 8:51 PM
Hmm - that is unexpected. Any chance you have a minimal example we could try to reproduce with?
r

Rohan Ahire

08/02/2021, 8:51 PM
cluster_mode = ModeDefinition(
name="cluster",
resource_defs={
"pyspark_step_launcher": no_step_launcher,
"pyspark": pyspark_resource.configured(
{
"spark_conf": {
"spark.master": "yarn",
"spark.deployMode": "cluster",
"spark.pyspark.driver.python": "./envs/dagster_python_37/bin/python",
"spark.pyspark.python": "./envs/dagster_python_37/bin/python",
"spark.default.parallelism": 1,
"spark.archives": "/raid/u/u409754/dagster_python_37.tar.gz#envs",
}
}
),
},
)
@solid(required_resource_keys={"pyspark", "pyspark_step_launcher"})
def get_table_row_count_from_datalake(context) -> None:
# Need to export this the environment
# export SPARK_HOME=/usr/hdp/current/spark2-client
spark = context.resources.pyspark.spark_session
database_name = context.solid_config["database_name"]
table_name = context.solid_config["table_name"]
<http://context.log.info|context.log.info>(f"The database and table name are {database_name} and {table_name}")
table_df = spark.read.table(f"{database_name}.{table_name}")
table_df.registerTempTable(table_name)
<http://context.log.info|context.log.info>(
f"Registered the hive table - {database_name}.{table_name} as a spark table that can be queried in spark sql"
)
count_df = spark.sql(f"select count(*) from {table_name}")
count = count_df.collect()
<http://context.log.info|context.log.info>(f"The row count is {count}")
@pipeline(mode_defs=[cluster_mode])
def read_table_row_count_from_datalake():
get_table_row_count_from_datalake()
so this is all the code I am using and one more thing. I am using dagster version 0.12.2
with Python 3.7 and spark version 2.3
the same issue persists if I run from dagster cli
s

sandy

08/02/2021, 9:25 PM
I spent a little time trying to reproduce this, but I'm having some trouble, because I don't have a yarn cluster to run against. If you run your code outside of Dagster - i.e. construct the SparkSession directly using
SparkSession.builder.getOrCreate(you_conf)
, does the output that you're looking for get printed out to the console?
r

Rohan Ahire

08/02/2021, 9:28 PM
Yes, if I run my code using spark-submit without dagster then I get my desired output. Spark jobs are usually very verbose, it seems all that content is being turned off by dagster somethere.
s

sandy

08/02/2021, 9:31 PM
what if you execute a python file with code that sets up the spark session? that should behave differently from spark-submit. when you launch a job via spark-submit, it looks at the cli flags and requests that the remote cluster manager launches a remote driver on its behalf. when you instantiate a SparkSession inside a Python process (which is what happens when you run the code you included), the cluster manager does not start a new remote driver, and the process that instantiated the SparkSession is instead used as the driver
r

Rohan Ahire

08/02/2021, 9:50 PM
PYSPARK_DRIVER_PYTHON=./envs/dagster_python_37/bin/python \
PYSPARK_PYTHON=./envs/dagster_python_37/bin/python \
spark-submit \
--master yarn \
--deploy-mode cluster \
--archives /raid/u/u409754/dagster_python_37.tar.gz#envs \
/raid/u/u409754/projects/dagster_test/scripts/pyspark_dagster_using_spark_submit.py
this works for me. I am able to get the logs that I want. But again this is using spark-submit but using yarn and running on the cluster
from pyspark.sql import Row, SparkSession, SQLContext
from pyspark.sql.functions import *
# Create Spark session with Hive supported.
spark = SparkSession.builder.enableHiveSupport().getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("INFO")
log4jLogger = sc._<http://jvm.org|jvm.org>.apache.log4j
log = log4jLogger.LogManager.getLogger(__name__)
log.warn("Hello World!")
I was also able to run this code
just using python
and see the desired logs
s

sandy

08/02/2021, 11:19 PM
I tried wrapping that code inside of a spark solid, like this:
@solid
def do_spark_stuff():
    from pyspark.sql import Row, SparkSession, SQLContext

    # Create Spark session with Hive supported.
    spark = SparkSession.builder.enableHiveSupport().getOrCreate()
    sc = spark.sparkContext
    sc.setLogLevel("INFO")
    log4jLogger = sc._jvm.org.apache.log4j
    log = log4jLogger.LogManager.getLogger(__name__)
    log.warn("Hello World!")

@pipeline
def my_pipeline():
    do_spark_stuff()
when I ran it, I got the below output. is that what you would expect?
r

Rohan Ahire

08/02/2021, 11:34 PM
yes, this output is what I am expecting
but this is not the way we should use spark with dagster right?
and why is everything showing up in stderr? it should be in stdout right?
s

sandy

08/04/2021, 2:54 PM
the way that the example that you provided runs is essentially similar to running all the code in the body of the solid, like the example I provided above - i.e. it doesn't invoke
spark-submit
, it just creates a
SparkSession
inside the Python process. if you want to run
spark-submit
, you have a couple options: • use the dagster-shell command to create a solid that runs spark-submit on the CLI: https://docs.dagster.io/_apidocs/libraries/dagster-shell • write a step launcher: https://github.com/dagster-io/dagster/discussions/3201
r

Rohan Ahire

08/04/2021, 3:02 PM
got it thanks
in your example and in mine, why does the output show in the stderr?
s

sandy

08/04/2021, 3:13 PM
I believe it's because that's where spark puts them: https://stackoverflow.com/questions/24571922/apache-spark-stderr-and-stdout