I am trying to use `snowflake_io_manager` with par...
# ask-community
b
I am trying to use
snowflake_io_manager
with partitions. But I'm getting an error saying no
partition_expr
is given even though I pass it to the Output metadata. my asset
Copy code
@asset(
    io_manager_key="snowflake_io_manager",
    required_resource_keys={"eldermark"},
    partitions_def=WeeklyPartitionsDefinition(start_date="2022-11-01"),
    key_prefix=["snowflake", "eldermark_proxy"]
)
def resident(context) -> Output[DataFrame]:
    start, end = context.asset_partitions_time_window_for_output()
    filter_str = f"LastMod_Stamp >= {start.timestamp()} AND LastMod_Stamp < {end.timestamp()}"

    records = context.resources.eldermark.fetch_obj(
        obj="Resident", filter=filter_str
    )

    df = pd.DataFrame(list(records), columns=["src"], dtype="string")

    yield Output(
        df,
        metadata={
            "partition_expr": "PARSE_JSON(SRC):LASTMOD_STAMP::TIMESTAMP"
        }
    )
error thrown
Copy code
dagster._core.errors.DagsterExecutionHandleOutputError: Error occurred while handling output "result" of step "snowflake__eldermark_proxy__resident":

  File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_plan.py", line 266, in dagster_event_sequence_for_step
    for step_event in check.generator(step_events):
  File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 394, in core_dagster_event_sequence_for_step
    for evt in _type_check_and_store_output(step_context, user_event, input_lineage):
  File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 451, in _type_check_and_store_output
    for evt in _store_output(step_context, step_output_handle, output, input_lineage):
  File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 636, in _store_output
    for elt in iterate_with_context(
  File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/dagster/_utils/__init__.py", line 462, in iterate_with_context
    return
  File "/Users/binhcongpham/.pyenv/versions/3.9.15/lib/python3.9/contextlib.py", line 137, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/utils.py", line 82, in solid_execution_error_boundary
    raise error_cls(

The above exception was caused by the following exception:
ValueError: Asset 'AssetKey(['snowflake', 'eldermark_proxy', 'resident'])' has partitions, but no 'partition_expr' metadata value, so we don't know what column to filter it on.

  File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/utils.py", line 52, in solid_execution_error_boundary
    yield
  File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/dagster/_utils/__init__.py", line 460, in iterate_with_context
    next_output = next(iterator)
  File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 626, in _gen_fn
    gen_output = output_manager.handle_output(output_context, output.value)
  File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/dagster/_core/storage/db_io_manager.py", line 90, in handle_output
    table_slice = self._get_table_slice(context, context)
  File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/dagster/_core/storage/db_io_manager.py", line 183, in _get_table_slice
    raise ValueError(
s
Have you tried adding the metadata w/ the partition expr to the asset decorator (not the output)?
b
Thank you that worked. But why would metadata not be recognized in
Output
?
Now I'm getting a schema error in sqlalchemy. The conn is saying that no schema was specified. I have defined my asset with a key_prefix. updated asset
Copy code
@asset(
    io_manager_key="snowflake_io_manager",
    required_resource_keys={"eldermark"},
    partitions_def=WeeklyPartitionsDefinition(start_date="2022-11-01"),
    key_prefix=["snowflake", "eldermark_proxy"],
    metadata={"partition_expr": "PARSE_JSON(SRC):LASTMOD_STAMP::TIMESTAMP"}
)
def resident(context) -> DataFrame:
    start, end = context.asset_partitions_time_window_for_output()
    filter_str = f"LastMod_Stamp >= {start.timestamp()} AND LastMod_Stamp < {end.timestamp()}"

    records = context.resources.eldermark.fetch_obj(
        obj="Resident", filter=filter_str
    )

    df = pd.DataFrame(list(records), columns=["src"], dtype="string")

    return df
error
Copy code
dagster._core.errors.DagsterExecutionHandleOutputError: Error occurred while handling output "result" of step "snowflake__eldermark_proxy__resident":

  File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_plan.py", line 266, in dagster_event_sequence_for_step
    for step_event in check.generator(step_events):
  File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 394, in core_dagster_event_sequence_for_step
    for evt in _type_check_and_store_output(step_context, user_event, input_lineage):
  File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 451, in _type_check_and_store_output
    for evt in _store_output(step_context, step_output_handle, output, input_lineage):
  File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 636, in _store_output
    for elt in iterate_with_context(
  File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/dagster/_utils/__init__.py", line 462, in iterate_with_context
    return
  File "/Users/binhcongpham/.pyenv/versions/3.9.15/lib/python3.9/contextlib.py", line 137, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/utils.py", line 82, in solid_execution_error_boundary
    raise error_cls(

The above exception was caused by the following exception:
sqlalchemy.exc.ProgrammingError: (snowflake.connector.errors.ProgrammingError) 090106 (22000): Cannot perform CREATE TABLE. This session does not have a current schema. Call 'USE SCHEMA', or use a qualified name.
[SQL:
CREATE TABLE resident (
"SRC" TEXT
)

]
(Background on this error at: <https://sqlalche.me/e/14/f405>)

  File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/utils.py", line 52, in solid_execution_error_boundary
    yield
  File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/dagster/_utils/__init__.py", line 460, in iterate_with_context
    next_output = next(iterator)
  File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 626, in _gen_fn
    gen_output = output_manager.handle_output(output_context, output.value)
  File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/dagster/_core/storage/db_io_manager.py", line 103, in handle_output
    self._handlers_by_type[obj_type].handle_output(context, table_slice, obj) or {}
  File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/dagster_snowflake_pandas/snowflake_pandas_type_handler.py", line 94, in handle_output
    with_uppercase_cols.to_sql(
  File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/pandas/core/generic.py", line 2951, in to_sql
    return sql.to_sql(
  File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/pandas/io/sql.py", line 698, in to_sql
    return pandas_sql.to_sql(
  File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/pandas/io/sql.py", line 1732, in to_sql
    table = self.prep_table(
  File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/pandas/io/sql.py", line 1631, in prep_table
    table.create()
  File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/pandas/io/sql.py", line 843, in create
    self._execute_create()
  File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/pandas/io/sql.py", line 829, in _execute_create
    self.table.create(bind=self.pd_sql.connectable)
  File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/sqlalchemy/sql/schema.py", line 962, in create
    bind._run_ddl_visitor(ddl.SchemaGenerator, self, checkfirst=checkfirst)
  File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 3228, in _run_ddl_visitor
    conn._run_ddl_visitor(visitorcallable, element, **kwargs)
  File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 2211, in _run_ddl_visitor
    visitorcallable(self.dialect, self, **kwargs).traverse_single(element)
  File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/sqlalchemy/sql/visitors.py", line 524, in traverse_single
    return meth(obj, **kw)
  File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/sqlalchemy/sql/ddl.py", line 895, in visit_table
    self.connection.execute(
  File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1380, in execute
    return meth(self, multiparams, params, _EMPTY_EXECUTION_OPTS)
  File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/sqlalchemy/sql/ddl.py", line 80, in _execute_on_connection
    return connection._execute_ddl(
  File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1472, in _execute_ddl
    ret = self._execute_context(
  File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1943, in _execute_context
    self._handle_dbapi_exception(
  File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 2124, in _handle_dbapi_exception
    util.raise_(
  File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 210, in raise_
    raise exception
  File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1900, in _execute_context
    self.dialect.do_execute(
  File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
    cursor.execute(statement, parameters)
  File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/snowflake/connector/cursor.py", line 804, in execute
    Error.errorhandler_wrapper(self.connection, self, error_class, errvalue)
  File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/snowflake/connector/errors.py", line 276, in errorhandler_wrapper
    handed_over = Error.hand_to_other_handler(
  File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/snowflake/connector/errors.py", line 331, in hand_to_other_handler
    cursor.errorhandler(connection, cursor, error_class, error_value)
  File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/snowflake/connector/errors.py", line 210, in default_errorhandler
    raise error_class(

The above exception was caused by the following exception:
snowflake.connector.errors.ProgrammingError: 090106 (22000): Cannot perform CREATE TABLE. This session does not have a current schema. Call 'USE SCHEMA', or use a qualified name.

  File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1900, in _execute_context
    self.dialect.do_execute(
  File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
    cursor.execute(statement, parameters)
  File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/snowflake/connector/cursor.py", line 804, in execute
    Error.errorhandler_wrapper(self.connection, self, error_class, errvalue)
  File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/snowflake/connector/errors.py", line 276, in errorhandler_wrapper
    handed_over = Error.hand_to_other_handler(
  File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/snowflake/connector/errors.py", line 331, in hand_to_other_handler
    cursor.errorhandler(connection, cursor, error_class, error_value)
  File "/Users/binhcongpham/Repos/data_operations/.venv/lib/python3.9/site-packages/snowflake/connector/errors.py", line 210, in default_errorhandler
    raise error_class(
s
what does your configured call look like for the snowflake io manager?
b
Copy code
snowflake_io_manager = build_snowflake_io_manager([SnowflakePandasTypeHandler()])
configured_snowflake_io_manager = snowflake_io_manager.configured({
    "account": {"env": "SNOWFLAKE_ACCOUNT"},
    "user": {"env": "SNOWFLAKE_USER"},
    "password": {"env": "SNOWFLAKE_PASSWORD"},
    "database": {"env": "SNOWFLAKE_LIFESPRK_DATABASE"},
    "role": {"env": "SNOWFLAKE_ROLE"},
    "warehouse": {"env": "SNOWFLAKE_WAREHOUSE"},
})
s
that is odd, it all looks correct. what version of the packages? FWIW this code snippet is very similar and I just confirmed it works https://gist.github.com/slopp/6b2a6a6342630fe945bc1e5498d01bff You might try
key_prefix = "ELDERMARK_PROXY"
, sometimes capitalization gets a little wonky. I'd also double check the schema exists in that database (the io manager creates tables but it wont create the schema, it just uses it)\
b
I see, that schema doesnt exist yet. Thanks for the help!