Rohan Ahire
08/02/2021, 6:11 PMsandy
08/02/2021, 6:48 PMRohan Ahire
08/02/2021, 8:21 PMsandy
08/02/2021, 8:34 PMRohan Ahire
08/02/2021, 8:41 PMsandy
08/02/2021, 8:46 PMRohan Ahire
08/02/2021, 8:49 PMsandy
08/02/2021, 8:51 PMRohan Ahire
08/02/2021, 8:51 PMcluster_mode = ModeDefinition(
name="cluster",
resource_defs={
"pyspark_step_launcher": no_step_launcher,
"pyspark": pyspark_resource.configured(
{
"spark_conf": {
"spark.master": "yarn",
"spark.deployMode": "cluster",
"spark.pyspark.driver.python": "./envs/dagster_python_37/bin/python",
"spark.pyspark.python": "./envs/dagster_python_37/bin/python",
"spark.default.parallelism": 1,
"spark.archives": "/raid/u/u409754/dagster_python_37.tar.gz#envs",
}
}
),
},
)
@solid(required_resource_keys={"pyspark", "pyspark_step_launcher"})
def get_table_row_count_from_datalake(context) -> None:
# Need to export this the environment
# export SPARK_HOME=/usr/hdp/current/spark2-client
spark = context.resources.pyspark.spark_session
database_name = context.solid_config["database_name"]
table_name = context.solid_config["table_name"]
<http://context.log.info|context.log.info>(f"The database and table name are {database_name} and {table_name}")
table_df = spark.read.table(f"{database_name}.{table_name}")
table_df.registerTempTable(table_name)
<http://context.log.info|context.log.info>(
f"Registered the hive table - {database_name}.{table_name} as a spark table that can be queried in spark sql"
)
count_df = spark.sql(f"select count(*) from {table_name}")
count = count_df.collect()
<http://context.log.info|context.log.info>(f"The row count is {count}")
@pipeline(mode_defs=[cluster_mode])
def read_table_row_count_from_datalake():
get_table_row_count_from_datalake()
sandy
08/02/2021, 9:25 PMSparkSession.builder.getOrCreate(you_conf)
, does the output that you're looking for get printed out to the console?Rohan Ahire
08/02/2021, 9:28 PMsandy
08/02/2021, 9:31 PMRohan Ahire
08/02/2021, 9:50 PMPYSPARK_DRIVER_PYTHON=./envs/dagster_python_37/bin/python \
PYSPARK_PYTHON=./envs/dagster_python_37/bin/python \
spark-submit \
--master yarn \
--deploy-mode cluster \
--archives /raid/u/u409754/dagster_python_37.tar.gz#envs \
/raid/u/u409754/projects/dagster_test/scripts/pyspark_dagster_using_spark_submit.py
from pyspark.sql import Row, SparkSession, SQLContext
from pyspark.sql.functions import *
# Create Spark session with Hive supported.
spark = SparkSession.builder.enableHiveSupport().getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("INFO")
log4jLogger = sc._<http://jvm.org|jvm.org>.apache.log4j
log = log4jLogger.LogManager.getLogger(__name__)
log.warn("Hello World!")
sandy
08/02/2021, 11:19 PM@solid
def do_spark_stuff():
from pyspark.sql import Row, SparkSession, SQLContext
# Create Spark session with Hive supported.
spark = SparkSession.builder.enableHiveSupport().getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("INFO")
log4jLogger = sc._jvm.org.apache.log4j
log = log4jLogger.LogManager.getLogger(__name__)
log.warn("Hello World!")
@pipeline
def my_pipeline():
do_spark_stuff()
when I ran it, I got the below output. is that what you would expect?Rohan Ahire
08/02/2021, 11:34 PMsandy
08/04/2021, 2:54 PMspark-submit
, it just creates a SparkSession
inside the Python process.
if you want to run spark-submit
, you have a couple options:
• use the dagster-shell command to create a solid that runs spark-submit on the CLI: https://docs.dagster.io/_apidocs/libraries/dagster-shell
• write a step launcher: https://github.com/dagster-io/dagster/discussions/3201Rohan Ahire
08/04/2021, 3:02 PMsandy
08/04/2021, 3:13 PM