jamie
12/09/2022, 7:07 PMdagster-snowflake
API docsVinnie
12/13/2022, 3:47 PMCOPY INTO
command to materialize the assets. Would love to see what other approaches people have found.Slackbot
12/13/2022, 8:04 PMJason
12/13/2022, 11:58 PMwrite_pandas
and somewhere on the body of the func. And we have some legacy S3 to staging stuff.
However, as a data eng, anything new I build (that’s an asset) I prefer to use the snowflake IO manager and directly to the table.Vinnie
12/14/2022, 8:49 AMStephen Bailey
12/14/2022, 4:06 PMJason
12/14/2022, 9:07 PMStephen Bailey
12/15/2022, 1:37 AMpandas.DataFrame -> snowflake_table
is pretty easy to grok, but it's a little less clear for some other types of assets. dbt cloud job, Sagemaker training job, Rockset query lambda, etc. A good comparison here is dbt -- they essentially offer a single IO Manager per project, and you get a ton of mileage out of itVinnie
12/15/2022, 9:47 AMpd.DataFrame
or json object and I can ensure everything integrates with the rest of the datalake in s3, but I’m curious to see what kinds of conversations I’ll have.Binoy Shah
12/19/2022, 4:26 PMGraham Wetzler
01/09/2023, 4:08 AMdagster_snowflake_pandas
working on my local machine but I keep getting this error:
snowflake.connector.errors.OperationalError: 250001: Could not connect to Snowflake backend after 0 attempt(s).Aborting
I’m able to successfully connect using snowflake.connector
in a REPL using the same account, user, and password so I really don’t know what to do next.Sam Werbalowsky
01/13/2023, 8:21 PMBinoy Shah
01/18/2023, 5:17 PMdagster-snowflake-pandas
, when an @asset
function returns a Data Frame, does Dagster download the data into Dataframe and hold it in memory?
If yes, what mechanism does it use to download and build a Panda dataframe?
Can it be possible to convert into a Polars dataframe ?Graham Wetzler
02/01/2023, 11:58 PM@asset
def dagster_df_dates():
return pd.DataFrame(pd.date_range("2023-01-01", "2023-01-31"), columns=["A"])
In Snowflake this column is created as a varchar. What am I doing wrong?clay
02/02/2023, 7:29 PMdefine_asset_job()
deprecated? I can't find it in the documentation. I'm wondering how the "all_assets_job"
is created on that line...clay
02/02/2023, 7:35 PMdefine_asset_job()
?clay
02/03/2023, 3:41 PMsnowflake_resource
to run bootstrap queries each time it connects? For example, each time I connect, I need to run USE SECONDARY ROLES ALL;
because it provides me the read access I need for pulling data from schemas that are not my default schema.jamie
02/03/2023, 4:11 PMdagster-snowflake-pyspark
package and use the snowflake_pyspark_io_manager
API docs are here, with more docs coming soon!
Here’s a full code example to get you started:
from dagster_snowflake_pyspark import snowflake_pyspark_io_manager
from pyspark import SparkFiles
from pyspark.sql import (
DataFrame,
SparkSession,
)
from pyspark.sql.types import (
DoubleType,
StringType,
StructField,
StructType,
)
from dagster import Definitions, asset
SNOWFLAKE_JARS = "net.snowflake:snowflake-jdbc:3.8.0,net.snowflake:spark-snowflake_2.12:2.8.2-spark_3.0"
@asset
def iris_dataset() -> DataFrame:
spark = SparkSession.builder.config(
key="spark.jars.packages",
value=SNOWFLAKE_JARS,
).getOrCreate()
schema = StructType(
[
StructField("Sepal length (cm)", DoubleType()),
StructField("Sepal width (cm)", DoubleType()),
StructField("Petal length (cm)", DoubleType()),
StructField("Petal width (cm)", DoubleType()),
StructField("Species", StringType()),
]
)
url = "<https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data>"
spark.sparkContext.addFile(url)
return spark.read.schema(schema).csv("file://" + SparkFiles.get("iris.data"))
defs = Definitions(
assets=[iris_dataset],
resources={
"io_manager": snowflake_pyspark_io_manager.configured(
{
"account": "abc1234.us-east-1",
"user": {"env": "SNOWFLAKE_USER"},
"password": {"env": "SNOWFLAKE_PASSWORD"},
"database": "FLOWERS",
"warehouse": "PLANTS",
"schema": "IRIS",
}
)
},
)
as always, if you have an feedback, bug reports, or feature requests, let me know here!clay
02/03/2023, 7:02 PMdagster-snowflake
for things like context.resources.snowflake.execute_queries()
as you can see here, when the use_pandas_result
flag is True https://docs.dagster.io/_modules/dagster_snowflake/resources#SnowflakeConnection.execute_queriesclay
02/03/2023, 7:04 PMexecute_queries
is a bit problematic when you're appending to the same data frame. That might be a bug? Seems like it's assuming that each query will return a separate dataframe and that those will be appended to a list -- which is similar to how it works if the use_pandas_result
is False. However, result
is initialized as a DataFrame and not as a list when the flag is trueclay
02/03/2023, 7:05 PMclay
02/03/2023, 7:06 PMuse_pandas_result
is True, it's probably best to just return a list of DataFramesclay
02/03/2023, 7:25 PMSeth Kimmel
02/06/2023, 5:57 PMVinnie
02/08/2023, 9:40 AM@usable_as_dagster_type
class UtilsSnowInput(pydantic.BaseModel):
class Config:
arbitrary_types_allowed = True
dest_namespace: str
data: pd.DataFrame
# s3_io_manager enhanced:
def handle_output(self, context: OutputContext, obj):
... # s3 load specific logic
if isinstance(obj, UtilsSnowInput):
context.log.debug(f"Attempting snowflake upload")
parquet_path = self._upload_df(obj) # loads obj.data as parquet
yield MetadataEntry(
"S3 parquet storage path",
value=MetadataValue.path(f"s3://{self.bucket}/{parquet_path}"),
)
# utils_snow is instantiated with a snowflake stage path as an optional parameter, runs a COPY INTO command into dest_namespace
yield from self.utils_snow.copy_into_landing_area(
context,
parquet_path,
)
# utils_snow.copy_into_landing_area wraps a COPY INTO command, cleans up the landing area, and yields some more metadata such as number of rows
def _get_copy_into_statement(
self,
remote_filepath: str,
dest_namespace: str,
):
return (
f"COPY INTO {dest_namespace}(RAW) FROM @{self.stage}\n"
f"FILES =('{remote_filepath}')\n"
f"FILE_FORMAT = (type = '{remote_filepath.split('.')[-1]}');"
)
This effectively means the users are free to write logic as they see fit, the only requirement is returning a UtilsSnowInput
with the required parametersclay
02/08/2023, 7:19 PMkey_prefix
to try to specify a schema for storing data in Snowflake. That probably should be mentioned in the documentation https://docs.dagster.io/integrations/dbt/reference#adding-a-prefix-to-asset-keysclay
02/08/2023, 7:20 PMclay
02/08/2023, 7:20 PMdagster_
prefix in the schema I already had specified in my connection parameters