Hello! I am starting to use the snowflake integra...
# ask-community
d
Hello! I am starting to use the snowflake integration, but apparently, the "authenticator" connection parameter isn't implemented? My current setup unfortunately is using "externalbrowser" authenticator. Thought Id seen a PR to add this last year.
dagit==1.1.21 dagster==1.1.21 dagster-graphql==1.1.21 dagster-postgres==0.17.21 dagster-snowflake==0.17.21 dagster-snowflake-pandas==0.17.21
Copy code
dagster._core.errors.DagsterInvalidConfigError: Error in config for resource io_manager
Error 1: Received unexpected config entry "authenticator" at path root:config. Expected: "{ account: (String | { env: String }) database: (String | { env: String }) password?: (String | { env: String }) private_key?: (String | { env: String }) private_key_password?: (String | { env: String }) private_key_path?: (String | { env: String }) role?: (String | { env: String }) schema?: (String | { env: String }) user: (String | { env: String }) warehouse?: (String | { env: String }) }".

  File "/home/some_user/envs/dagster_dev/lib/python3.8/site-packages/dagster/_grpc/impl.py", line 419, in get_external_execution_plan_snapshot
    create_execution_plan(
  File "/home/some_user/envs/dagster_dev/lib/python3.8/site-packages/dagster/_core/execution/api.py", line 956, in create_execution_plan
    resolved_run_config = ResolvedRunConfig.build(pipeline_def, run_config, mode=mode)
  File "/home/some_user/envs/dagster_dev/lib/python3.8/site-packages/dagster/_core/system_config/objects.py", line 209, in build
    config_mapped_resource_configs = config_map_resources(resource_defs, resource_configs)
  File "/home/some_user/envs/dagster_dev/lib/python3.8/site-packages/dagster/_core/system_config/objects.py", line 284, in config_map_resources
    raise DagsterInvalidConfigError(
Definition that is getting environment variable values from local .env file:
Copy code
defs = Definitions(
    assets=[manufacturers, manufacturer_id],
    resources={
        "io_manager": snowflake_pandas_io_manager.configured(
            {
                "account": {"env": "SF_ACCOUNT"},
                "user": {"env": "SF_USERNAME"},
                "warehouse": {"env": "SF_WAREHOUSE"},
                "database": {"env": "SF_DATABASE"},
                "role": {"env": "SF_ROLE"},
                "authenticator": {"env": "SF_AUTHENTICATOR"},
                "schema": "my_schema",
            }
        )
    },
)
Contents of my local config.py file which seems to indicate authenticator is a valid parameter:
Copy code
from dagster import Bool, Field, IntSource, StringSource


def define_snowflake_config():
    """Snowflake configuration.

    See the Snowflake documentation for reference:
        <https://docs.snowflake.net/manuals/user-guide/python-connector-api.html>
    """
    account = Field(
        StringSource,
        description="Your Snowflake account name. For more details, see  <https://bit.ly/2FBL320>.",
        is_required=False,
    )

    user = Field(StringSource, description="User login name.", is_required=True)

    password = Field(StringSource, description="User password.", is_required=False)

    database = Field(
        StringSource,
        description=(
            "Name of the default database to use. After login, you can use USE DATABASE "
            " to change the database."
        ),
        is_required=False,
    )

    schema = Field(
        StringSource,
        description=(
            "Name of the default schema to use. After login, you can use USE SCHEMA to "
            "change the schema."
        ),
        is_required=False,
    )

    role = Field(
        StringSource,
        description=(
            "Name of the default role to use. After login, you can use USE ROLE to change "
            " the role."
        ),
        is_required=False,
    )

    warehouse = Field(
        StringSource,
        description=(
            "Name of the default warehouse to use. After login, you can use USE WAREHOUSE "
            "to change the role."
        ),
        is_required=False,
    )

    private_key = Field(
        StringSource,
        description=(
            "Raw private key to use. See"
            " <https://docs.snowflake.com/en/user-guide/key-pair-auth.html> for details. Alternately,"
            " set private_key_path and private_key_password."
        ),
        is_required=False,
    )

    private_key_password = Field(
        StringSource,
        description=(
            "Raw private key password to use. See"
            " <https://docs.snowflake.com/en/user-guide/key-pair-auth.html> for details. Required for"
            " both private_key and private_key_path."
        ),
        is_required=False,
    )

    private_key_path = Field(
        StringSource,
        description=(
            "Raw private key path to use. See"
            " <https://docs.snowflake.com/en/user-guide/key-pair-auth.html> for details. Alternately,"
            " set the raw private key as private_key."
        ),
        is_required=False,
    )

    autocommit = Field(
        Bool,
        description=(
            "None by default, which honors the Snowflake parameter AUTOCOMMIT. Set to True "
            "or False to enable or disable autocommit mode in the session, respectively."
        ),
        is_required=False,
    )

    client_prefetch_threads = Field(
        IntSource,
        description=(
            "Number of threads used to download the results sets (4 by default). "
            "Increasing the value improves fetch performance but requires more memory."
        ),
        is_required=False,
    )

    client_session_keep_alive = Field(
        StringSource,
        description=(
            "False by default. Set this to True to keep the session active indefinitely, "
            "even if there is no activity from the user. Make certain to call the close method to "
            "terminate the thread properly or the process may hang."
        ),
        is_required=False,
    )

    login_timeout = Field(
        IntSource,
        description=(
            "Timeout in seconds for login. By default, 60 seconds. The login request gives "
            'up after the timeout length if the HTTP response is "success".'
        ),
        is_required=False,
    )

    network_timeout = Field(
        IntSource,
        description=(
            "Timeout in seconds for all other operations. By default, none/infinite. A general"
            " request gives up after the timeout length if the HTTP response is not 'success'."
        ),
        is_required=False,
    )

    ocsp_response_cache_filename = Field(
        StringSource,
        description=(
            "URI for the OCSP response cache file.  By default, the OCSP response cache "
            "file is created in the cache directory."
        ),
        is_required=False,
    )

    validate_default_parameters = Field(
        Bool,
        description=(
            "False by default. Raise an exception if either one of specified database, "
            "schema or warehouse doesn't exists if True."
        ),
        is_required=False,
    )

    paramstyle = Field(
        # TODO should validate only against permissible values for this
        StringSource,
        description=(
            "pyformat by default for client side binding. Specify qmark or numeric to "
            "change bind variable formats for server side binding."
        ),
        is_required=False,
    )

    timezone = Field(
        StringSource,
        description=(
            "None by default, which honors the Snowflake parameter TIMEZONE. Set to a "
            "valid time zone (e.g. America/Los_Angeles) to set the session time zone."
        ),
        is_required=False,
    )

    connector = Field(
        StringSource,
        description=(
            "Indicate alternative database connection engine. Permissible option is "
            "'sqlalchemy' otherwise defaults to use the Snowflake Connector for Python."
        ),
        is_required=False,
    )

    cache_column_metadata = Field(
        StringSource,
        description=(
            "Optional parameter when connector is set to sqlalchemy. Snowflake SQLAlchemy takes a"
            " flag cache_column_metadata=True such that all of column metadata for all tables are"
            ' "cached"'
        ),
        is_required=False,
    )

    numpy = Field(
        StringSource,
        description=(
            "Optional parameter when connector is set to sqlalchemy. To enable fetching "
            "NumPy data types, add numpy=True to the connection parameters."
        ),
        is_required=False,
    )

    authenticator = Field(
        StringSource,
        description="Optional parameter to specify the authentication mechanism to use.",
        is_required=False,
    )

    return {
        "account": account,
        "user": user,
        "password": password,
        "database": database,
        "schema": schema,
        "role": role,
        "warehouse": warehouse,
        "autocommit": autocommit,
        "private_key": private_key,
        "private_key_password": private_key_password,
        "private_key_path": private_key_path,
        "client_prefetch_threads": client_prefetch_threads,
        "client_session_keep_alive": client_session_keep_alive,
        "login_timeout": login_timeout,
        "network_timeout": network_timeout,
        "ocsp_response_cache_filename": ocsp_response_cache_filename,
        "validate_default_parameters": validate_default_parameters,
        "paramstyle": paramstyle,
        "timezone": timezone,
        "connector": connector,
        "cache_column_metadata": cache_column_metadata,
        "numpy": numpy,
        "authenticator": authenticator,
    }
c
hrm looks like it’s implemented for the
snowflake_resource
but not
snowflake_io_manager
. Filed an issue: https://github.com/dagster-io/dagster/issues/12868
d
Thanks @chris! I will make updates to my local source files to try to get it to work, although big DISCLAIMER, I am not an expert, so it'll be trial and error. I thought just adding the following to snowflake_io_manager.py:
Copy code
"authenticator": Field(
    StringSource,
    description="Optional parameter to specify the authentication mechanism to use.",
    is_required=False,
)
would do the trick, but I am getting closer, I think? Now getting a different error message related to username or password:
Copy code
sqlalchemy.exc.DatabaseError: (snowflake.connector.errors.DatabaseError) 250001 (08001): Failed to connect to DB: <http://REDACTED.snowflakecomputing.com:443|REDACTED.snowflakecomputing.com:443>. Incorrect username or password was specified.
(Background on this error at: <https://sqlalche.me/e/14/4xp6>)
So it appears I have to possibly make edits to a different .py file. So I then started to look at the
resources.py
file. Again, I have no idea what I'm doing, but after adding
"authenticator",
to line 80 in
resources.py
seems to have done the trick now- no longer getting errors and I was able to write my asset as a Snowflake table. Now, FWIW, based on using other SQL clients to Snowflake, when using "externalbrowser" authenticator, I do not have to supply the "password" connection parameter as I think this is not applicable when using externalbrowser authenticator. But in dagster snowflake library, it appears if I don't supply the "password" connection parameter, I will get an error (
dagster._check.CheckError: Invariant failed. Description: Missing config: Password or private key authentication required for Snowflake resource.
) and the password doesn't even have to be correct btw. That's ok I guess, but just mentioning this as FYI. I'm sure there is a way to change that behavior if there is such a need.
c
Yea I think that last bit you mentioned about the password is just a design flaw of our config schema for that resource - should definitely be something we can fix.
I think @jamie is looking into this further - hopefully we can get a solution merged into the library
j
hey, yeah I’ll be looking into this! I’m on vacation next week, but it’s on my list for when i get back
d
@jamie Have a nice vaycay!