https://dagster.io/ logo
#integration-snowflake
Title
# integration-snowflake
r

Rohan Meringenti

05/25/2023, 4:02 PM
Question about trying to update existing tables in snowflake. I'm trying to create workflow where I read from an existing table in snowflake, enrich by adding a few column, and storing the result in another table. Using the
snowflake_pandas_io_manager
as the docs say and time partitions to do this, but it looks like an
@asset
always recreates the table. Was wondering what the suggested route was to go about updating a table on a daily basis in snowflake?
o

owen

05/25/2023, 4:40 PM
hi @Rohan Meringenti! as you mention, an
@asset(partitions_def=DailyPartitionDefinition(...))
will accomplish this for you (each day's partition will be written independently). Is the goal here to do this without a partitions def? If so, do you mind describing the desired behavior?
r

Rohan Meringenti

05/25/2023, 4:43 PM
yeah totally fine using a partition here, but using the @asset decorator seems to always create a new table. In particular when I use the daily partitions along with the snowflake pandas io manager and each partition selects say 100 rows on that partition. Dagster fails bc of CREATE TABLE cannot be executed on an existing table. I think my issue is more that @asset is always creating a new table, not sure how to update an existing one
Here's an example code snippet:
Copy code
@asset(
    partitions_def=DailyPartitionsDefinition(start_date="2023-05-01"),
    metadata={"partition_expr": "_AIRBYTE_EMITTED_AT"},
)
def events_to_update(context, snowflake: SnowflakeResource) -> pd.DataFrame:
    partition_date_str = context.asset_partition_key_for_output()
    print("partition_date_str", partition_date_str,)

    with snowflake.get_connection() as conn:
        return (
            conn.cursor()
            .execute(
                'SELECT * FROM PAGE_VIEW_EVENTS limit 100'
            )
            .fetch_pandas_all()
        )
this fails when run on multiple partitions at a time
o

owen

05/25/2023, 4:59 PM
which part about it fails? do you have an error message?
r

Rohan Meringenti

05/25/2023, 5:12 PM
yup, the materialization: after i pick a 3 run backfill for partitions 5-22-5-24 on dagit, and hit materialize: Here's the message
Copy code
- dagster - ERROR - __ASSET_JOB_0 - 6abd0614-5360-4953-a884-4cfd00bba906 - 78568 - events_to_update - STEP_FAILURE - Execution of step "events_to_update" failed.

dagster._core.errors.DagsterExecutionHandleOutputError: Error occurred while handling output "result" of step "events_to_update"::

sqlalchemy.exc.ProgrammingError: (snowflake.connector.errors.ProgrammingError) 002002 (42710): SQL compilation error:
Object 'EVENTS_TO_UPDATE' already exists.
[SQL:
CREATE TABLE events_to_update (...
o

owen

05/25/2023, 5:58 PM
hm interesting -- @jamie have you seen this behavior before?
j

jamie

05/25/2023, 6:02 PM
huh i haven’t seen this before… What version of
dagster
dagster_snowflake
and
dagster_snowflake_pandas
are you on? Also the versions of
snowflake-connector-python
and
snowflake-sqlalchemy
would be helpful
do you have any more of the error message? that would help narrow down exactly where in the code this is happening
r

Rohan Meringenti

05/25/2023, 6:04 PM
Untitled
Copy code
dagit==1.3.5
dagster==1.3.5
dagster-graphql==1.3.5
dagster-snowflake==0.19.5
dagster-snowflake-pandas==0.19.5
snowflake-connector-python==3.0.3
snowflake-sqlalchemy==1.4.7
Here's my exact code:
Copy code
import pandas as pd
from dagster_snowflake_pandas import SnowflakePandasIOManager
from dagster_snowflake import SnowflakeResource

from dagster import AssetIn, Definitions, EnvVar, SourceAsset, asset, graph_asset, op, DailyPartitionsDefinition


pv_events = SourceAsset(key="page_view_events")
wc_events = SourceAsset(key="wallet_connect_events")


@asset(
    partitions_def=DailyPartitionsDefinition(start_date="2023-05-01"),
    metadata={"partition_expr": "_AIRBYTE_EMITTED_AT"},
)
def events_to_update(context, snowflake: SnowflakeResource) -> pd.DataFrame:
    partition_date_str = context.asset_partition_key_for_output()
    print("partition_date_str", partition_date_str)

    with snowflake.get_connection() as conn:
        return (
            conn.cursor()
            .execute(
                'SELECT * FROM PAGE_VIEW_EVENTS limit 100'
            )
            .fetch_pandas_all()
        )


defs = Definitions(
    assets=[events_to_update, pv_events, wc_events],
    resources={
        "io_manager": SnowflakePandasIOManager(
            account="redacted1",
            user="redacted",
            password="redacted",
            database="redacted",
            role="redacted",
            schema="redacted",
        ),
        "snowflake": SnowflakeResource(
            account="redacted",
            user="redacted",
            password="redacted",
            database="redacted",
            schema="redacted",
        )
    },
)
j

jamie

05/25/2023, 6:10 PM
ok. So we are explicitly telling pandas/snowflake to append to existing tables
Copy code
with_uppercase_cols.to_sql(
            table_slice.table,
            con=connection.engine,
            if_exists="append",
            index=False,
            method=pd_writer,
        )
but based on the stack trace, that is not being respected. The first thing I would try is downgrading
snowflake-connector-python
to something below
3.0.0
. I have
2.7.12
installed right now and haven’t run into any issues with appending to tables, and the
snowflake-connector-python
library seems to always have some gnarly bugs in it so i wouldn’t be surprised if the issue is with them The other thing i would look into is if the existing
events_to_update
table has a different set of columns than what’s listed in the stack trace. The issue could be that the pandas/snowflake functions see that the columns don’t match up and is trying to recreate the table or something like that
r

Rohan Meringenti

05/25/2023, 6:11 PM
got it, let me try downgrading
j

jamie

05/25/2023, 6:15 PM
your pandas version might also be at play here, what are you using for that?
r

Rohan Meringenti

05/25/2023, 6:19 PM
1.5.3
j

jamie

05/25/2023, 6:20 PM
cool. i’ll dig around in snowflake and pandas github issues and see if anything comes up too
r

Rohan Meringenti

05/25/2023, 6:21 PM
ty!
Did some more investigation, and added a bunch of debug statements. It looks like append is getting respected, however sqlalchemy seems to think there's nothing in the table even though there is, I think. Anyway to try this with a lower version can I ask which version of dagster/dagster-snowflake and its dependencies you are using/recommend? Changing just the snowflake-connector python introduces a bunch of compatibility issues @jamie
j

jamie

05/25/2023, 8:01 PM
huh ok, i dont know why the table being empty would cause that issue. are you able to replicate the errors with other assets? These are my library versions
Copy code
dagster-snowflake                 1!0+dev      
dagster-snowflake-pandas          1!0+dev      
dagster-snowflake-pyspark         1!0+dev      
dbt-snowflake                     1.3.0
snowflake-connector-python        2.7.12
snowflake-sqlalchemy              1.2.3
pandas                            1.3.5
the 1!0+dev just means i’m on a cloned version of the dagster repo, and my code is rebased from today so the latest dagster versions should work. We have a pin on
snowflake-connector-python
in our code, but it just pins it to >2.1.0 so anything above that should theoretically work, but there may be conflicts with other non-dagster libraries. Maybe a fresh virtual env?
r

Rohan Meringenti

05/25/2023, 8:09 PM
got it will try that out
I lowered the version to what you had and still same problem: Tried to make a toy example as well: The contents of the table initially was Stephen, Oslo and Jane, Stockholm, and it got replace the second time I materialized the asset. Am i doing something obviously wrong here?
Copy code
@asset
def test_table() -> pd.DataFrame:
    df = pd.DataFrame(data=[['Stephen1', 'Oslo1'], ['Jane1', 'Stockholm1']], columns=['Name', 'City'])
    return df
I think i've narrowed this down: It looks before even handle_output is called the SnowflakeDbClient calls delete_table_slice here looking at the cleanup function it deletes from the target table to make room for the output data being written which is odd behaviour...
141 Views