jamie
02/03/2023, 4:11 PMdagster-snowflake-pyspark
package and use the snowflake_pyspark_io_manager
API docs are here, with more docs coming soon!
Here’s a full code example to get you started:
from dagster_snowflake_pyspark import snowflake_pyspark_io_manager
from pyspark import SparkFiles
from pyspark.sql import (
DataFrame,
SparkSession,
)
from pyspark.sql.types import (
DoubleType,
StringType,
StructField,
StructType,
)
from dagster import Definitions, asset
SNOWFLAKE_JARS = "net.snowflake:snowflake-jdbc:3.8.0,net.snowflake:spark-snowflake_2.12:2.8.2-spark_3.0"
@asset
def iris_dataset() -> DataFrame:
spark = SparkSession.builder.config(
key="spark.jars.packages",
value=SNOWFLAKE_JARS,
).getOrCreate()
schema = StructType(
[
StructField("Sepal length (cm)", DoubleType()),
StructField("Sepal width (cm)", DoubleType()),
StructField("Petal length (cm)", DoubleType()),
StructField("Petal width (cm)", DoubleType()),
StructField("Species", StringType()),
]
)
url = "<https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data>"
spark.sparkContext.addFile(url)
return spark.read.schema(schema).csv("file://" + SparkFiles.get("iris.data"))
defs = Definitions(
assets=[iris_dataset],
resources={
"io_manager": snowflake_pyspark_io_manager.configured(
{
"account": "abc1234.us-east-1",
"user": {"env": "SNOWFLAKE_USER"},
"password": {"env": "SNOWFLAKE_PASSWORD"},
"database": "FLOWERS",
"warehouse": "PLANTS",
"schema": "IRIS",
}
)
},
)
as always, if you have an feedback, bug reports, or feature requests, let me know here!