https://dagster.io/ logo
#integration-snowflake
Title
# integration-snowflake
c

clay

04/19/2023, 2:38 AM
How are snowflake sessions managed in Dagster? If I have two subsequent assets that each require a snowflake resource that is set up with
snowflake_resource.configured
, are the subsequent asset materializations using the same snowflake session? I'm having a problem trying to run two jobs in different warehouses simultaneously when I know I have access to both warehouses. I'm getting the
Object does not exist, or operation cannot be performed.
error when I try to call
USE WAREHOUSE ..
when I can easily use that warehouse in DBeaver, for instance. Is there a way to print the connection URL that the snowflake connector is using?
t

Tim Castillo

04/19/2023, 3:15 AM
I believe resources are re-initialized with each op/asset that uses it. So it'd be a new session each time. Aaah, yeah I don't believe you can swap warehouses in-flight. That's odd behavior though, as I'd assume Snowflake wouldn't limit you if you spun up another session. Let me see what I can dig up about this, though. In the meanwhile, I'm curious about how other tools can support this. I know you can swap out warehouses for individual models in dbt.
In terms of your question about getting the connection URL, the
SnowflakeResource
uses either SQLAlchemy or the official Snowflake connector, depending on how it's configured. If you wanted to tinker around with the parameters or just get the values from the object, here's the file that defines the resource https://github.com/dagster-io/dagster/blob/master/python_modules/libraries/dagster-snowflake/dagster_snowflake/resources.py#L148
c

clay

04/19/2023, 7:47 PM
I figured out what was happening... We swap roles and warehouses all the time, so it's natural in our flow... Because of this, I have a bootstrapping query in DBeaver to run
USE SECONDARY ROLES ALL
as soon as a connection is created. I forgot that was happening... However, in order to use the relevant warehouse, I have to use my secondary roles first. So, I was getting an error about an object not existing because Dagster doesn't give me the ability to run bootstrapping queries when each connection is made and this particular operation required the activation of a secondary role b/c it's the secondary role that is allowed to use the warehouse in question.
t

Tim Castillo

04/19/2023, 7:49 PM
aaah, I had hunch it was a role thing when I saw the error message, but that second half (
or operation cannot be performed.
) is so ambiguous that I brushed my hunch off. Either the Snowflake SQLAlchemy or the official Snowflake Python SDK should have a flag that lets you trigger that same
use secondary roles
, from what I remember, I'm just not sure if we surface them.
c

clay

04/19/2023, 7:50 PM
FWIW, I was trying to set up two warehouses to run Snowflake's benchmarking queries simultaneously... I have each of the 100 or so queries in it's own
.sql
file. I synthesized the assets with a different python script and then just call this function with the relevant warehouse/credentials/sql file ``````
Copy code
def execute_snowflake_sql(context, filename) -> None:
    """
    Executes a SQL query on Snowflake
    """
    with context.resources.marketing_analytics_snowflake.get_connection() as conn:
        <http://context.log.info|context.log.info>("Snowflake connection established.")
        with closing(conn.cursor()) as cursor:
            <http://context.log.info|context.log.info>("Snowflake cursor assigned.")

            secondary = "USE SECONDARY ROLES ALL"
            <http://context.log.info|context.log.info>(secondary)
            cursor.execute(secondary)

            # These are specific to the benchmarking
            warehouse = "USE WAREHOUSE MKTG_PUB_ANLY_ETL"
            <http://context.log.info|context.log.info>(warehouse)
            cursor.execute(warehouse)

            schema = "USE SCHEMA SNOWFLAKE_SAMPLE_DATA.TPCDS_SF10TCL"
            <http://context.log.info|context.log.info>(schema)
            cursor.execute(schema)

            cache = "ALTER SESSION SET use_cached_result = FALSE"
            <http://context.log.info|context.log.info>(cache)
            cursor.execute(cache)

            f = file_relative_path(__file__, filename)

            with open(f, "r") as fd:
                q1 = fd.read()
                <http://context.log.info|context.log.info>(q1)
                cursor.execute(q1)

    return None
And voila
Snowflake would have you run the queries in sequence but I want to run them in parallel and let Dagster manage it all because I want to put maximum pressure on the warehouses to see what will happen
t

Tim Castillo

04/19/2023, 7:52 PM
beautiful
c

clay

04/19/2023, 7:52 PM
Anyhow, I should write up an example repo for you all because it's a useful thing to be able to do. 🙂
👌 1
t

Tim Castillo

04/19/2023, 7:54 PM
If there's ever anything that you're proud about doing with Dagster, please do! And post it in #dagster-showcase, we also usually hold a bi-monthly event for community members to demo their cools tricks to each other.
❤️ 1
29 Views