https://dagster.io/ logo
#ask-community
Title
# ask-community
a

Aatish Master

06/28/2023, 11:33 PM
Hi there, new to dagster. I'm trying to query data from snowflake, output results to dataframe, turn it into a parquet file then load to adls2. I'm trying to create a custom filesystem-based io manager based on the example in the docs (I/O Managers | Dagster). I'm getting the following error: Error occurred while handling output "result" of step "{asset_name}": Exception was caused by the following exception NotImplementedError. Says it might be something to do with the handle output, dump_to_path method.Code in comments. Any help is appreciated. Thanks in advance!
class PandasParquetIOManager(UPathIOManager):
extension: str = ".parquet"
def dump_to_path(self, context: OutputContext, obj: pd.DataFrame, path: UPath):
with path.open("wb") as file:
<http://obj.to|obj.to>_parquet(file)
def load_from_path(self, context: InputContext, path: UPath) -> pd.DataFrame:
with path.open("rb") as file:
return pd.read_parquet(file)
class Adls2ParquetIOManager(ConfigurableIOManagerFactory):
base_path: str = '{storage_account_url}'
AZURE_DATA_LAKE_STORAGE_ACC = {'env': 'AZURE_DATA_LAKE_STORAGE_ACC'}
AZURE_DATA_LAKE_STORAGE_KEY = {'env': 'AZURE_DATA_LAKE_STORAGE_KEY'}
def create_io_manager(self, context) -> PandasParquetIOManager:
base_path = UPath(self.base_path)
assert str(base_path).startswith("https://"), base_path
return PandasParquetIOManager(base_path=base_path)
defs = Definitions(
assets=load_assets_from_package_module(assets),
resources={
"snowflake": SnowflakeResource(
account=EnvVar("SNOWFLAKE_ACCOUNT"),
user=EnvVar("SNOWFLAKE_USER"),
password=EnvVar("SNOWFLAKE_PASSWORD"),
warehouse=EnvVar("SNOWFLAKE_WAREHOUSE"),
database=EnvVar("SNOWFLAKE_DATABASE"),
schema=EnvVar("SNOWFLAKE_SCHEMA"),
),
"adls2_io_mgr": Adls2ParquetIOManager(),
}
)
@asset(io_manager_key='adls2_io_mgr')
def asset_name(snowflake: SnowflakeResource) -> Output[pd.DataFrame]:
with snowflake.get_connection() as conn:
df = conn.cursor().execute("select * from table").fetch_pandas_all()
return Output(
value=df,
metadata={
"num_records": len(df)
}
)
j

jamie

06/29/2023, 3:40 PM
hey @Aatish Master could you share the full error message/stack trace as well?
a

Aatish Master

06/29/2023, 8:33 PM
@jamie Sure!
dagster._core.errors.DagsterExecutionHandleOutputError: Error occurred while handling output "result" of step "asset_name":
File "C:\Users\User\Projects\volpara-dagster\venv\lib\site-packages\dagster\_core\execution\plan\execute_plan.py", line 273, in dagster_event_sequence_for_step
for step_event in check.generator(step_events):
File "C:\Users\User\Projects\volpara-dagster\venv\lib\site-packages\dagster\_core\execution\plan\execute_step.py", line 375, in core_dagster_event_sequence_for_step
for evt in _type_check_and_store_output(step_context, user_event):
File "C:\Users\User\Projects\volpara-dagster\venv\lib\site-packages\dagster\_core\execution\plan\execute_step.py", line 428, in _type_check_and_store_output
for evt in _store_output(step_context, step_output_handle, output):
File "C:\Users\User\Projects\volpara-dagster\venv\lib\site-packages\dagster\_core\execution\plan\execute_step.py", line 603, in _store_output
for elt in iterate_with_context(
File "C:\Users\User\Projects\volpara-dagster\venv\lib\site-packages\dagster\_utils\__init__.py", line 443, in iterate_with_context
with context_fn():
File "C:\Users\User\AppData\Local\Programs\Python\Python310\lib\contextlib.py", line 153, in __exit__
self.gen.throw(typ, value, traceback)
File "C:\Users\User\Projects\volpara-dagster\venv\lib\site-packages\dagster\_core\execution\plan\utils.py", line 84, in op_execution_error_boundary
raise error_cls(
The above exception was caused by the following exception:
NotImplementedError
File "C:\Users\User\Projects\volpara-dagster\venv\lib\site-packages\dagster\_core\execution\plan\utils.py", line 54, in op_execution_error_boundary
yield
File "C:\Users\User\Projects\volpara-dagster\venv\lib\site-packages\dagster\_utils\__init__.py", line 445, in iterate_with_context
next_output = next(iterator)
File "C:\Users\User\Projects\volpara-dagster\venv\lib\site-packages\dagster\_core\execution\plan\execute_step.py", line 593, in _gen_fn
gen_output = output_manager.handle_output(output_context, output.value)
File "C:\Users\User\Projects\volpara-dagster\venv\lib\site-packages\dagster\_core\storage\upath_io_manager.py", line 271, in handle_output
self.dump_to_path(context=context, obj=obj, path=path)
File "C:\Users\User\Projects\volpara-dagster\vht-project\quickstart_snowflake\__init__.py", line 32, in dump_to_path
with path.open("wb") as file:
File "C:\Users\User\Projects\volpara-dagster\venv\lib\site-packages\upath\core.py", line 251, in open
return self._accessor.open(self, *args, **kwargs)
File "C:\Users\User\Projects\volpara-dagster\venv\lib\site-packages\upath\core.py", line 47, in open
return self._fs.open(self._format_path(path), mode, *args, **kwargs)
File "C:\Users\User\Projects\volpara-dagster\venv\lib\site-packages\fsspec\spec.py", line 1241, in open
f = self._open(
File "C:\Users\User\Projects\volpara-dagster\venv\lib\site-packages\fsspec\implementations\http.py", line 351, in _open
raise NotImplementedError
j

jamie

06/29/2023, 8:37 PM
ah ok looks like that’s coming from the
ffspec
library - specifically this line https://github.com/fsspec/filesystem_spec/blob/master/fsspec/implementations/http.py#L351
a

Aatish Master

06/30/2023, 3:52 AM
@jamie Right, from my understanding the this error appears when the dump_to_path method is invoked. I just copied this from the example and naively figured it would work for my case. How would I go about resolving this for my case? I changed the filesystem to abfss and now I'm getting an azure credential error:
class Adls2ParquetIOManager(ConfigurableIOManagerFactory):
base_path: str = 'abfss://{account_name}.<http://blob.core.windows.net/{container_name}|blob.core.windows.net/{container_name}>'
storage_options = {
"account_name": EnvVar("AZURE_DATA_LAKE_STORAGE_ACC"),
"connection_string": EnvVar("AZURE_DATA_LAKE_STORAGE_KEY"),
}
def create_io_manager(self, context) -> PandasParquetIOManager:
base_path = UPath(self.base_path, **self.storage_options)
assert str(base_path).startswith("abfss://"), base_path
return PandasParquetIOManager(base_path=base_path)
azure.core.exceptions.ClientAuthenticationError: Server failed to authenticate the request. Please refer to the information in the www-authenticate header.
RequestId:2ea7331f-301e-00c1-3803-ab8bde000000
Time:2023-06-30T03:35:28.0373888Z
ErrorCode:NoAuthenticationInformation
Content: <?xml version="1.0" encoding="utf-8"?><Error><Code>NoAuthenticationInformation</Code><Message>Server failed to authenticate the request. Please refer to the information in the www-authenticate header.
RequestId:2ea7331f-301e-00c1-3803-ab8bde000000
Time:2023-06-30T03:35:28.0373888Z</Message></Error>
File "C:\Users\User\Projects\volpara-dagster\venv\lib\site-packages\adlfs\spec.py", line 2035, in _async_upload_chunk
await bc.stage_block(
File "C:\Users\User\Projects\volpara-dagster\venv\lib\site-packages\azure\core\tracing\decorator_async.py", line 77, in wrapper_use_tracer
return await func(*args, **kwargs)
File "C:\Users\User\Projects\volpara-dagster\venv\lib\site-packages\azure\storage\blob\aio\_blob_client_async.py", line 1630, in stage_block
process_storage_error(error)
File "C:\Users\User\Projects\volpara-dagster\venv\lib\site-packages\azure\storage\blob\_shared\response_handlers.py", line 189, in process_storage_error
exec("raise error from None")  # pylint: disable=exec-used # nosec
File "<string>", line 1, in <module>
File "C:\Users\User\Projects\volpara-dagster\venv\lib\site-packages\azure\storage\blob\aio\_blob_client_async.py", line 1628, in stage_block
return await self._client.block_blob.stage_block(**options)
File "C:\Users\User\Projects\volpara-dagster\venv\lib\site-packages\azure\core\tracing\decorator_async.py", line 77, in wrapper_use_tracer
return await func(*args, **kwargs)
File "C:\Users\User\Projects\volpara-dagster\venv\lib\site-packages\azure\storage\blob\_generated\aio\operations\_block_blob_operations.py", line 621, in stage_block
map_error(status_code=response.status_code, response=response, error_map=error_map)
File "C:\Users\User\Projects\volpara-dagster\venv\lib\site-packages\azure\core\exceptions.py", line 109, in map_error
raise error
How do I correctly pass credentials to the IO manager? Thanks!