Simrun Basuita
05/16/2023, 2:08 PMSnowflakeResource
, SnowflakeConnection
and snowflake_resource
? I've tried to set things up but am a bit lost in the documentation– keep getting errors no matter what I try.
Here's where I've got to, trying to get a simple one-op job running:
jobs.py
from dagster import op, job, Config, RunConfig, EnvVar
from dagster_snowflake import SnowflakeResource
class CloneConfig(Config):
source_db: str
target_db: str
@op
def clone_db(context, config: CloneConfig, snowflake: SnowflakeResource) -> None:
query = f'create or replace database {config.target_db} clone {config.source_db}'
<http://context.log.info|context.log.info>(snowflake.get_client().execute_query('select CURRENT_VERSION()', fetch_results=True))
@job(config=RunConfig(
ops={'clone_db': CloneConfig(
source_db=EnvVar('SNOWFLAKE_DATABASE'),
target_db=EnvVar('DEPLOY_TARGET_DATABASE'),
)},
))
def deploy():
clone_db()
resources.py
from dagster import EnvVar
from dagster_snowflake import SnowflakeResource
from dagster_snowflake_pandas import SnowflakePandasIOManager
resources_def = {
"snowflake": SnowflakeResource(
account=EnvVar('SNOWFLAKE_LOCATOR'),
user=EnvVar('SNOWFLAKE_USER'),
password=EnvVar('SNOWFLAKE_PASSWORD'),
warehouse=EnvVar('SNOWFLAKE_WAREHOUSE'),
),
"snowflake_pandas_io_manager": SnowflakePandasIOManager(
account=EnvVar('SNOWFLAKE_LOCATOR'),
user=EnvVar('SNOWFLAKE_USER'),
password=EnvVar('SNOWFLAKE_PASSWORD'),
database=EnvVar('SNOWFLAKE_DATABASE'),
warehouse=EnvVar('SNOWFLAKE_WAREHOUSE'),
),
}
Definitions
from dagster import Definitions, load_assets_from_modules
from .assets import (
classification,
idiosyncratic,
ngfs,
valuations,
financials,
)
from .jobs import deploy
from .resources import resources_def
defs = Definitions(
assets=(
load_assets_from_modules([classification], group_name='classification') +
load_assets_from_modules([idiosyncratic], group_name='idiosyncratic') +
load_assets_from_modules([ngfs], group_name='ngfs') +
load_assets_from_modules([valuations], group_name='valuations') +
load_assets_from_modules([financials], group_name='financials')
),
jobs=[deploy],
resources=resources_def,
)
Currently I'm getting AttributeError: 'SnowflakeResource' object has no attribute 'execute_query'
. But the documentation here has numerous examples of an op asking for snowflake: SnowflakeResource
as an argument then calling an execute_query
attribute.
Help?jamie
05/16/2023, 4:32 PMSnowflakeResource
. snowflake_resource
is the older version and SnowflakeConnection
is an internal class that both of the resources use and you don’t need to worry about.
One of the changes we made in the new SnowflakeResource
is removing some helper methods in snowflake_resource
. We felt this let users interact with snowflake directly with the snowflake python api rather than having to learn how dagster wraps the api.
I believe if you change your op to be
@op
def clone_db(context, config: CloneConfig, snowflake: SnowflakeResource) -> None:
query = f'create or replace database {config.target_db} clone {config.source_db}'
with snowflake.get_connection() as conn:
<http://context.log.info|context.log.info>(conn.cursor().execute('select CURRENT_VERSION()', fetch_results=True))
it should work
there is an error in out docstring, so i’m fixing that nowSimrun Basuita
05/16/2023, 4:33 PMjamie
05/16/2023, 4:35 PMSimrun Basuita
05/16/2023, 4:36 PM