Daniel Kim
03/09/2023, 9:02 PMDaniel Kim
03/09/2023, 9:12 PMdagster._core.errors.DagsterInvalidConfigError: Error in config for resource io_manager
Error 1: Received unexpected config entry "authenticator" at path root:config. Expected: "{ account: (String | { env: String }) database: (String | { env: String }) password?: (String | { env: String }) private_key?: (String | { env: String }) private_key_password?: (String | { env: String }) private_key_path?: (String | { env: String }) role?: (String | { env: String }) schema?: (String | { env: String }) user: (String | { env: String }) warehouse?: (String | { env: String }) }".
File "/home/some_user/envs/dagster_dev/lib/python3.8/site-packages/dagster/_grpc/impl.py", line 419, in get_external_execution_plan_snapshot
create_execution_plan(
File "/home/some_user/envs/dagster_dev/lib/python3.8/site-packages/dagster/_core/execution/api.py", line 956, in create_execution_plan
resolved_run_config = ResolvedRunConfig.build(pipeline_def, run_config, mode=mode)
File "/home/some_user/envs/dagster_dev/lib/python3.8/site-packages/dagster/_core/system_config/objects.py", line 209, in build
config_mapped_resource_configs = config_map_resources(resource_defs, resource_configs)
File "/home/some_user/envs/dagster_dev/lib/python3.8/site-packages/dagster/_core/system_config/objects.py", line 284, in config_map_resources
raise DagsterInvalidConfigError(
Definition that is getting environment variable values from local .env file:
defs = Definitions(
assets=[manufacturers, manufacturer_id],
resources={
"io_manager": snowflake_pandas_io_manager.configured(
{
"account": {"env": "SF_ACCOUNT"},
"user": {"env": "SF_USERNAME"},
"warehouse": {"env": "SF_WAREHOUSE"},
"database": {"env": "SF_DATABASE"},
"role": {"env": "SF_ROLE"},
"authenticator": {"env": "SF_AUTHENTICATOR"},
"schema": "my_schema",
}
)
},
)
Contents of my local config.py file which seems to indicate authenticator is a valid parameter:
from dagster import Bool, Field, IntSource, StringSource
def define_snowflake_config():
"""Snowflake configuration.
See the Snowflake documentation for reference:
<https://docs.snowflake.net/manuals/user-guide/python-connector-api.html>
"""
account = Field(
StringSource,
description="Your Snowflake account name. For more details, see <https://bit.ly/2FBL320>.",
is_required=False,
)
user = Field(StringSource, description="User login name.", is_required=True)
password = Field(StringSource, description="User password.", is_required=False)
database = Field(
StringSource,
description=(
"Name of the default database to use. After login, you can use USE DATABASE "
" to change the database."
),
is_required=False,
)
schema = Field(
StringSource,
description=(
"Name of the default schema to use. After login, you can use USE SCHEMA to "
"change the schema."
),
is_required=False,
)
role = Field(
StringSource,
description=(
"Name of the default role to use. After login, you can use USE ROLE to change "
" the role."
),
is_required=False,
)
warehouse = Field(
StringSource,
description=(
"Name of the default warehouse to use. After login, you can use USE WAREHOUSE "
"to change the role."
),
is_required=False,
)
private_key = Field(
StringSource,
description=(
"Raw private key to use. See"
" <https://docs.snowflake.com/en/user-guide/key-pair-auth.html> for details. Alternately,"
" set private_key_path and private_key_password."
),
is_required=False,
)
private_key_password = Field(
StringSource,
description=(
"Raw private key password to use. See"
" <https://docs.snowflake.com/en/user-guide/key-pair-auth.html> for details. Required for"
" both private_key and private_key_path."
),
is_required=False,
)
private_key_path = Field(
StringSource,
description=(
"Raw private key path to use. See"
" <https://docs.snowflake.com/en/user-guide/key-pair-auth.html> for details. Alternately,"
" set the raw private key as private_key."
),
is_required=False,
)
autocommit = Field(
Bool,
description=(
"None by default, which honors the Snowflake parameter AUTOCOMMIT. Set to True "
"or False to enable or disable autocommit mode in the session, respectively."
),
is_required=False,
)
client_prefetch_threads = Field(
IntSource,
description=(
"Number of threads used to download the results sets (4 by default). "
"Increasing the value improves fetch performance but requires more memory."
),
is_required=False,
)
client_session_keep_alive = Field(
StringSource,
description=(
"False by default. Set this to True to keep the session active indefinitely, "
"even if there is no activity from the user. Make certain to call the close method to "
"terminate the thread properly or the process may hang."
),
is_required=False,
)
login_timeout = Field(
IntSource,
description=(
"Timeout in seconds for login. By default, 60 seconds. The login request gives "
'up after the timeout length if the HTTP response is "success".'
),
is_required=False,
)
network_timeout = Field(
IntSource,
description=(
"Timeout in seconds for all other operations. By default, none/infinite. A general"
" request gives up after the timeout length if the HTTP response is not 'success'."
),
is_required=False,
)
ocsp_response_cache_filename = Field(
StringSource,
description=(
"URI for the OCSP response cache file. By default, the OCSP response cache "
"file is created in the cache directory."
),
is_required=False,
)
validate_default_parameters = Field(
Bool,
description=(
"False by default. Raise an exception if either one of specified database, "
"schema or warehouse doesn't exists if True."
),
is_required=False,
)
paramstyle = Field(
# TODO should validate only against permissible values for this
StringSource,
description=(
"pyformat by default for client side binding. Specify qmark or numeric to "
"change bind variable formats for server side binding."
),
is_required=False,
)
timezone = Field(
StringSource,
description=(
"None by default, which honors the Snowflake parameter TIMEZONE. Set to a "
"valid time zone (e.g. America/Los_Angeles) to set the session time zone."
),
is_required=False,
)
connector = Field(
StringSource,
description=(
"Indicate alternative database connection engine. Permissible option is "
"'sqlalchemy' otherwise defaults to use the Snowflake Connector for Python."
),
is_required=False,
)
cache_column_metadata = Field(
StringSource,
description=(
"Optional parameter when connector is set to sqlalchemy. Snowflake SQLAlchemy takes a"
" flag cache_column_metadata=True such that all of column metadata for all tables are"
' "cached"'
),
is_required=False,
)
numpy = Field(
StringSource,
description=(
"Optional parameter when connector is set to sqlalchemy. To enable fetching "
"NumPy data types, add numpy=True to the connection parameters."
),
is_required=False,
)
authenticator = Field(
StringSource,
description="Optional parameter to specify the authentication mechanism to use.",
is_required=False,
)
return {
"account": account,
"user": user,
"password": password,
"database": database,
"schema": schema,
"role": role,
"warehouse": warehouse,
"autocommit": autocommit,
"private_key": private_key,
"private_key_password": private_key_password,
"private_key_path": private_key_path,
"client_prefetch_threads": client_prefetch_threads,
"client_session_keep_alive": client_session_keep_alive,
"login_timeout": login_timeout,
"network_timeout": network_timeout,
"ocsp_response_cache_filename": ocsp_response_cache_filename,
"validate_default_parameters": validate_default_parameters,
"paramstyle": paramstyle,
"timezone": timezone,
"connector": connector,
"cache_column_metadata": cache_column_metadata,
"numpy": numpy,
"authenticator": authenticator,
}
chris
03/10/2023, 12:03 AMsnowflake_resource
but not snowflake_io_manager
. Filed an issue: https://github.com/dagster-io/dagster/issues/12868Daniel Kim
03/10/2023, 2:28 AM"authenticator": Field(
StringSource,
description="Optional parameter to specify the authentication mechanism to use.",
is_required=False,
)
would do the trick, but I am getting closer, I think? Now getting a different error message related to username or password:
sqlalchemy.exc.DatabaseError: (snowflake.connector.errors.DatabaseError) 250001 (08001): Failed to connect to DB: <http://REDACTED.snowflakecomputing.com:443|REDACTED.snowflakecomputing.com:443>. Incorrect username or password was specified.
(Background on this error at: <https://sqlalche.me/e/14/4xp6>)
So it appears I have to possibly make edits to a different .py file. So I then started to look at the resources.py
file. Again, I have no idea what I'm doing, but after adding "authenticator",
to line 80 in resources.py
seems to have done the trick now- no longer getting errors and I was able to write my asset as a Snowflake table. Now, FWIW, based on using other SQL clients to Snowflake, when using "externalbrowser" authenticator, I do not have to supply the "password" connection parameter as I think this is not applicable when using externalbrowser authenticator. But in dagster snowflake library, it appears if I don't supply the "password" connection parameter, I will get an error ( dagster._check.CheckError: Invariant failed. Description: Missing config: Password or private key authentication required for Snowflake resource.
) and the password doesn't even have to be correct btw. That's ok I guess, but just mentioning this as FYI. I'm sure there is a way to change that behavior if there is such a need.chris
03/10/2023, 11:44 PMchris
03/10/2023, 11:45 PMjamie
03/11/2023, 12:11 AMDaniel Kim
03/11/2023, 12:26 AM