Rohan Meringenti
05/25/2023, 4:02 PMsnowflake_pandas_io_manager
as the docs say and time partitions to do this, but it looks like an @asset
always recreates the table. Was wondering what the suggested route was to go about updating a table on a daily basis in snowflake?owen
05/25/2023, 4:40 PM@asset(partitions_def=DailyPartitionDefinition(...))
will accomplish this for you (each day's partition will be written independently). Is the goal here to do this without a partitions def? If so, do you mind describing the desired behavior?Rohan Meringenti
05/25/2023, 4:43 PM@asset(
partitions_def=DailyPartitionsDefinition(start_date="2023-05-01"),
metadata={"partition_expr": "_AIRBYTE_EMITTED_AT"},
)
def events_to_update(context, snowflake: SnowflakeResource) -> pd.DataFrame:
partition_date_str = context.asset_partition_key_for_output()
print("partition_date_str", partition_date_str,)
with snowflake.get_connection() as conn:
return (
conn.cursor()
.execute(
'SELECT * FROM PAGE_VIEW_EVENTS limit 100'
)
.fetch_pandas_all()
)
owen
05/25/2023, 4:59 PMRohan Meringenti
05/25/2023, 5:12 PM- dagster - ERROR - __ASSET_JOB_0 - 6abd0614-5360-4953-a884-4cfd00bba906 - 78568 - events_to_update - STEP_FAILURE - Execution of step "events_to_update" failed.
dagster._core.errors.DagsterExecutionHandleOutputError: Error occurred while handling output "result" of step "events_to_update"::
sqlalchemy.exc.ProgrammingError: (snowflake.connector.errors.ProgrammingError) 002002 (42710): SQL compilation error:
Object 'EVENTS_TO_UPDATE' already exists.
[SQL:
CREATE TABLE events_to_update (...
owen
05/25/2023, 5:58 PMjamie
05/25/2023, 6:02 PMdagster
dagster_snowflake
and dagster_snowflake_pandas
are you on? Also the versions of snowflake-connector-python
and snowflake-sqlalchemy
would be helpfulRohan Meringenti
05/25/2023, 6:04 PMdagit==1.3.5
dagster==1.3.5
dagster-graphql==1.3.5
dagster-snowflake==0.19.5
dagster-snowflake-pandas==0.19.5
snowflake-connector-python==3.0.3
snowflake-sqlalchemy==1.4.7
import pandas as pd
from dagster_snowflake_pandas import SnowflakePandasIOManager
from dagster_snowflake import SnowflakeResource
from dagster import AssetIn, Definitions, EnvVar, SourceAsset, asset, graph_asset, op, DailyPartitionsDefinition
pv_events = SourceAsset(key="page_view_events")
wc_events = SourceAsset(key="wallet_connect_events")
@asset(
partitions_def=DailyPartitionsDefinition(start_date="2023-05-01"),
metadata={"partition_expr": "_AIRBYTE_EMITTED_AT"},
)
def events_to_update(context, snowflake: SnowflakeResource) -> pd.DataFrame:
partition_date_str = context.asset_partition_key_for_output()
print("partition_date_str", partition_date_str)
with snowflake.get_connection() as conn:
return (
conn.cursor()
.execute(
'SELECT * FROM PAGE_VIEW_EVENTS limit 100'
)
.fetch_pandas_all()
)
defs = Definitions(
assets=[events_to_update, pv_events, wc_events],
resources={
"io_manager": SnowflakePandasIOManager(
account="redacted1",
user="redacted",
password="redacted",
database="redacted",
role="redacted",
schema="redacted",
),
"snowflake": SnowflakeResource(
account="redacted",
user="redacted",
password="redacted",
database="redacted",
schema="redacted",
)
},
)
jamie
05/25/2023, 6:10 PMwith_uppercase_cols.to_sql(
table_slice.table,
con=connection.engine,
if_exists="append",
index=False,
method=pd_writer,
)
but based on the stack trace, that is not being respected. The first thing I would try is downgrading snowflake-connector-python
to something below 3.0.0
. I have 2.7.12
installed right now and haven’t run into any issues with appending to tables, and the snowflake-connector-python
library seems to always have some gnarly bugs in it so i wouldn’t be surprised if the issue is with them
The other thing i would look into is if the existing events_to_update
table has a different set of columns than what’s listed in the stack trace. The issue could be that the pandas/snowflake functions see that the columns don’t match up and is trying to recreate the table or something like thatRohan Meringenti
05/25/2023, 6:11 PMjamie
05/25/2023, 6:15 PMRohan Meringenti
05/25/2023, 6:19 PMjamie
05/25/2023, 6:20 PMRohan Meringenti
05/25/2023, 6:21 PMjamie
05/25/2023, 8:01 PMdagster-snowflake 1!0+dev
dagster-snowflake-pandas 1!0+dev
dagster-snowflake-pyspark 1!0+dev
dbt-snowflake 1.3.0
snowflake-connector-python 2.7.12
snowflake-sqlalchemy 1.2.3
pandas 1.3.5
the 1!0+dev just means i’m on a cloned version of the dagster repo, and my code is rebased from today so the latest dagster versions should work.
We have a pin on snowflake-connector-python
in our code, but it just pins it to >2.1.0 so anything above that should theoretically work, but there may be conflicts with other non-dagster libraries. Maybe a fresh virtual env?Rohan Meringenti
05/25/2023, 8:09 PM@asset
def test_table() -> pd.DataFrame:
df = pd.DataFrame(data=[['Stephen1', 'Oslo1'], ['Jane1', 'Stockholm1']], columns=['Name', 'City'])
return df