Binh Pham
12/02/2022, 10:19 PMsnowflake_io_manager
with partitions. But I'm getting an error saying no partition_expr
is given even though I pass it to the Output metadata.
my asset
@asset(
io_manager_key="snowflake_io_manager",
required_resource_keys={"eldermark"},
partitions_def=WeeklyPartitionsDefinition(start_date="2022-11-01"),
key_prefix=["snowflake", "eldermark_proxy"]
)
def resident(context) -> Output[DataFrame]:
start, end = context.asset_partitions_time_window_for_output()
filter_str = f"LastMod_Stamp >= {start.timestamp()} AND LastMod_Stamp < {end.timestamp()}"
records = context.resources.eldermark.fetch_obj(
obj="Resident", filter=filter_str
)
df = pd.DataFrame(list(records), columns=["src"], dtype="string")
yield Output(
df,
metadata={
"partition_expr": "PARSE_JSON(SRC):LASTMOD_STAMP::TIMESTAMP"
}
)
error thrown
dagster._core.errors.DagsterExecutionHandleOutputError: Error occurred while handling output "result" of step "snowflake__eldermark_proxy__resident":
File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_plan.py", line 266, in dagster_event_sequence_for_step
for step_event in check.generator(step_events):
File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 394, in core_dagster_event_sequence_for_step
for evt in _type_check_and_store_output(step_context, user_event, input_lineage):
File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 451, in _type_check_and_store_output
for evt in _store_output(step_context, step_output_handle, output, input_lineage):
File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 636, in _store_output
for elt in iterate_with_context(
File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/dagster/_utils/__init__.py", line 462, in iterate_with_context
return
File "/Users/binhcongpham/.pyenv/versions/3.9.15/lib/python3.9/contextlib.py", line 137, in __exit__
self.gen.throw(typ, value, traceback)
File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/utils.py", line 82, in solid_execution_error_boundary
raise error_cls(
The above exception was caused by the following exception:
ValueError: Asset 'AssetKey(['snowflake', 'eldermark_proxy', 'resident'])' has partitions, but no 'partition_expr' metadata value, so we don't know what column to filter it on.
File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/utils.py", line 52, in solid_execution_error_boundary
yield
File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/dagster/_utils/__init__.py", line 460, in iterate_with_context
next_output = next(iterator)
File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 626, in _gen_fn
gen_output = output_manager.handle_output(output_context, output.value)
File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/dagster/_core/storage/db_io_manager.py", line 90, in handle_output
table_slice = self._get_table_slice(context, context)
File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/dagster/_core/storage/db_io_manager.py", line 183, in _get_table_slice
raise ValueError(
Sean Lopp
12/02/2022, 10:31 PMBinh Pham
12/02/2022, 11:02 PMOutput
?@asset(
io_manager_key="snowflake_io_manager",
required_resource_keys={"eldermark"},
partitions_def=WeeklyPartitionsDefinition(start_date="2022-11-01"),
key_prefix=["snowflake", "eldermark_proxy"],
metadata={"partition_expr": "PARSE_JSON(SRC):LASTMOD_STAMP::TIMESTAMP"}
)
def resident(context) -> DataFrame:
start, end = context.asset_partitions_time_window_for_output()
filter_str = f"LastMod_Stamp >= {start.timestamp()} AND LastMod_Stamp < {end.timestamp()}"
records = context.resources.eldermark.fetch_obj(
obj="Resident", filter=filter_str
)
df = pd.DataFrame(list(records), columns=["src"], dtype="string")
return df
error
dagster._core.errors.DagsterExecutionHandleOutputError: Error occurred while handling output "result" of step "snowflake__eldermark_proxy__resident":
File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_plan.py", line 266, in dagster_event_sequence_for_step
for step_event in check.generator(step_events):
File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 394, in core_dagster_event_sequence_for_step
for evt in _type_check_and_store_output(step_context, user_event, input_lineage):
File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 451, in _type_check_and_store_output
for evt in _store_output(step_context, step_output_handle, output, input_lineage):
File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 636, in _store_output
for elt in iterate_with_context(
File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/dagster/_utils/__init__.py", line 462, in iterate_with_context
return
File "/Users/binhcongpham/.pyenv/versions/3.9.15/lib/python3.9/contextlib.py", line 137, in __exit__
self.gen.throw(typ, value, traceback)
File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/utils.py", line 82, in solid_execution_error_boundary
raise error_cls(
The above exception was caused by the following exception:
sqlalchemy.exc.ProgrammingError: (snowflake.connector.errors.ProgrammingError) 090106 (22000): Cannot perform CREATE TABLE. This session does not have a current schema. Call 'USE SCHEMA', or use a qualified name.
[SQL:
CREATE TABLE resident (
"SRC" TEXT
)
]
(Background on this error at: <https://sqlalche.me/e/14/f405>)
File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/utils.py", line 52, in solid_execution_error_boundary
yield
File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/dagster/_utils/__init__.py", line 460, in iterate_with_context
next_output = next(iterator)
File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 626, in _gen_fn
gen_output = output_manager.handle_output(output_context, output.value)
File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/dagster/_core/storage/db_io_manager.py", line 103, in handle_output
self._handlers_by_type[obj_type].handle_output(context, table_slice, obj) or {}
File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/dagster_snowflake_pandas/snowflake_pandas_type_handler.py", line 94, in handle_output
with_uppercase_cols.to_sql(
File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/pandas/core/generic.py", line 2951, in to_sql
return sql.to_sql(
File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/pandas/io/sql.py", line 698, in to_sql
return pandas_sql.to_sql(
File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/pandas/io/sql.py", line 1732, in to_sql
table = self.prep_table(
File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/pandas/io/sql.py", line 1631, in prep_table
table.create()
File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/pandas/io/sql.py", line 843, in create
self._execute_create()
File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/pandas/io/sql.py", line 829, in _execute_create
self.table.create(bind=self.pd_sql.connectable)
File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/sqlalchemy/sql/schema.py", line 962, in create
bind._run_ddl_visitor(ddl.SchemaGenerator, self, checkfirst=checkfirst)
File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 3228, in _run_ddl_visitor
conn._run_ddl_visitor(visitorcallable, element, **kwargs)
File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 2211, in _run_ddl_visitor
visitorcallable(self.dialect, self, **kwargs).traverse_single(element)
File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/sqlalchemy/sql/visitors.py", line 524, in traverse_single
return meth(obj, **kw)
File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/sqlalchemy/sql/ddl.py", line 895, in visit_table
self.connection.execute(
File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1380, in execute
return meth(self, multiparams, params, _EMPTY_EXECUTION_OPTS)
File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/sqlalchemy/sql/ddl.py", line 80, in _execute_on_connection
return connection._execute_ddl(
File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1472, in _execute_ddl
ret = self._execute_context(
File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1943, in _execute_context
self._handle_dbapi_exception(
File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 2124, in _handle_dbapi_exception
util.raise_(
File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 210, in raise_
raise exception
File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1900, in _execute_context
self.dialect.do_execute(
File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
cursor.execute(statement, parameters)
File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/snowflake/connector/cursor.py", line 804, in execute
Error.errorhandler_wrapper(self.connection, self, error_class, errvalue)
File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/snowflake/connector/errors.py", line 276, in errorhandler_wrapper
handed_over = Error.hand_to_other_handler(
File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/snowflake/connector/errors.py", line 331, in hand_to_other_handler
cursor.errorhandler(connection, cursor, error_class, error_value)
File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/snowflake/connector/errors.py", line 210, in default_errorhandler
raise error_class(
The above exception was caused by the following exception:
snowflake.connector.errors.ProgrammingError: 090106 (22000): Cannot perform CREATE TABLE. This session does not have a current schema. Call 'USE SCHEMA', or use a qualified name.
File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1900, in _execute_context
self.dialect.do_execute(
File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
cursor.execute(statement, parameters)
File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/snowflake/connector/cursor.py", line 804, in execute
Error.errorhandler_wrapper(self.connection, self, error_class, errvalue)
File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/snowflake/connector/errors.py", line 276, in errorhandler_wrapper
handed_over = Error.hand_to_other_handler(
File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/snowflake/connector/errors.py", line 331, in hand_to_other_handler
cursor.errorhandler(connection, cursor, error_class, error_value)
File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/snowflake/connector/errors.py", line 210, in default_errorhandler
raise error_class(
Sean Lopp
12/02/2022, 11:08 PMBinh Pham
12/02/2022, 11:08 PMsnowflake_io_manager = build_snowflake_io_manager([SnowflakePandasTypeHandler()])
configured_snowflake_io_manager = snowflake_io_manager.configured({
"account": {"env": "SNOWFLAKE_ACCOUNT"},
"user": {"env": "SNOWFLAKE_USER"},
"password": {"env": "SNOWFLAKE_PASSWORD"},
"database": {"env": "SNOWFLAKE_LIFESPRK_DATABASE"},
"role": {"env": "SNOWFLAKE_ROLE"},
"warehouse": {"env": "SNOWFLAKE_WAREHOUSE"},
})
Sean Lopp
12/02/2022, 11:13 PMkey_prefix = "ELDERMARK_PROXY"
, sometimes capitalization gets a little wonky. I'd also double check the schema exists in that database (the io manager creates tables but it wont create the schema, it just uses it)\Binh Pham
12/02/2022, 11:14 PM