Binoy Shah
02/22/2024, 8:54 PMdagster-polars
library for PolarsParquetIOManager. I am struggling to set up correct S3 Access. Our S3 credentials are not in their standard format and i need to copy it from some other place, How do I provide the PolarsParquetIOManager correct credentials
cc @Daniel GafniDaniel Gafni
02/22/2024, 8:56 PMstorage_options
like this:
PolarsParquetIOManager(..., cloud_storage_options={...})
This feature is a recent addition. I think @Ion Koutsouris has this working in production.
Let me know if this works for youBinoy Shah
02/22/2024, 8:57 PMDaniel Gafni
02/22/2024, 8:57 PMstorage_options
and cloud_storage_options
should work. storage_options
is a Pydantic alias to cloud_storage_options
, which is the original field.storage_options
is an alias available for configuration from dicts/yamlDaniel Gafni
02/22/2024, 8:58 PMDaniel Gafni
02/22/2024, 9:14 PMcloud_storage_options
when instantiating the class.Binoy Shah
02/22/2024, 9:15 PMBinoy Shah
02/22/2024, 9:15 PMDaniel Gafni
02/22/2024, 11:12 PMBinoy Shah
02/23/2024, 1:33 PMBinoy Shah
02/23/2024, 3:09 PMpolars.exceptions.ComputeError: Generic S3 error: Error after 0 retries in 300.000908139s, max_retries:2, retry_timeout:10s, source:error sending request for url (<http://169.254.169.254/latest/api/token>): operation timed out
File "/usr/local/lib/python3.10/site-packages/dagster/_core/execution/plan/utils.py", line 54, in op_execution_error_boundary
yield
File "/usr/local/lib/python3.10/site-packages/dagster/_core/execution/plan/inputs.py", line 831, in _load_input_with_input_manager
value = input_manager.load_input(context)
File "/usr/local/lib/python3.10/site-packages/dagster_polars/io_managers/base.py", line 219, in load_input
return self._load_single_input(path, context)
File "/usr/local/lib/python3.10/site-packages/dagster/_core/storage/upath_io_manager.py", line 239, in _load_single_input
obj = self.load_from_path(context=context, path=path)
File "/usr/local/lib/python3.10/site-packages/dagster_polars/io_managers/base.py", line 319, in load_from_path
ldf = self.scan_df_from_path(path=path, context=context) # type: ignore
File "/usr/local/lib/python3.10/site-packages/dagster_polars/io_managers/parquet.py", line 276, in scan_df_from_path
ldf = scan_parquet(path, context)
File "/usr/local/lib/python3.10/site-packages/dagster_polars/io_managers/parquet.py", line 75, in scan_parquet
return pl.scan_parquet(str(path), storage_options=storage_options, **kwargs) # type: ignore
File "/usr/local/lib/python3.10/site-packages/polars/utils/deprecation.py", line 136, in wrapper
return function(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/polars/utils/deprecation.py", line 136, in wrapper
return function(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/polars/io/parquet/functions.py", line 311, in scan_parquet
return pl.LazyFrame._scan_parquet(
File "/usr/local/lib/python3.10/site-packages/polars/lazyframe/frame.py", line 464, in _scan_parquet
self._ldf = PyLazyFrame.new_from_parquet(
@Daniel Gafni
edit. wierdness of slack showing badly ordered snippets.Daniel Gafni
02/23/2024, 3:18 PMdef scan_df_from_path
) and changing this function usage it in. Perhaps you can print the values available in path.storage_options
.
3. I think I might rework this by directly passing self.cloud_storage_options
to data reading code instead of relying on path.storage_options
.
Also, can you try writing a parquet?Ion Koutsouris
02/23/2024, 3:23 PMBinoy Shah
02/23/2024, 3:24 PMpolars = { version = "^0.20.5", extras = ["numpy", "pyarrow", "connectorx" , "fsspec"] }
Ion Koutsouris
02/23/2024, 3:24 PMIon Koutsouris
02/23/2024, 3:25 PMDaniel Gafni
02/23/2024, 3:26 PMBinoy Shah
02/23/2024, 3:26 PMdata_dictionary_latest: LazyFrame = pl.scan_parquet(source=survey_dict_in_s3, storage_options=_aws_secrets_from_env())
and that works
def _aws_secrets_from_env():
return {
"aws_access_key_id": env_or_value("ACCESS_KEY"),
"aws_secret_access_key": env_or_value("SECRET_KEY"),
"aws_session_token": env_or_value("AWS_SESSION_TOKEN"),
"aws_region": "us-east-1",
}
Binoy Shah
02/23/2024, 3:27 PMIon Koutsouris
02/23/2024, 3:29 PMIon Koutsouris
02/23/2024, 3:30 PMBinoy Shah
02/23/2024, 3:30 PMIon Koutsouris
02/23/2024, 3:30 PMDaniel Gafni
02/23/2024, 3:30 PMUPath(uri, **storage_options).storage_options
Binoy Shah
02/23/2024, 3:40 PMmy_up:UPath = UPath("<s3://bucket/dagster-io-parquet/rce_shared/SURVEY_RESPONSE_DATA_DICTIONARY.parquet>", **s3_io_creds )
my_up.storage_options
Out[34]:
{'aws_access_key_id': 'AS99999999999',
'aws_secret_access_key': 'DAu999999999St',
'aws_session_token': 'FwoB53SI'}
So that's appropriate. Its not from dagster runtime, its on my python console ☝️Ion Koutsouris
02/23/2024, 3:50 PMDaniel Gafni
02/23/2024, 3:56 PMpl.scan_parquet
Binoy Shah
02/23/2024, 4:01 PMDaniel Gafni
02/23/2024, 4:06 PMfrom dagster import materialize, asset, SourceAsset
upstream_asset = SourceAsset(["my_bucket", "my_key"], io_manager_def=PolarsParquetIOManager())
@asset
def my_asset(upstream_asset):
...
materialize([upstream_asset, my_asset])
Binoy Shah
02/23/2024, 4:07 PMDaniel Gafni
02/23/2024, 4:07 PMBinoy Shah
02/23/2024, 9:22 PMprint(context.resources.polars_io_manager_v2.storage_options)
it prints out
{}
Daniel Gafni
02/23/2024, 10:32 PMBinoy Shah
02/23/2024, 10:35 PMBinoy Shah
02/23/2024, 10:35 PM~/.aws/credentials
file is not present in kube podBinoy Shah
02/23/2024, 10:36 PMDaniel Gafni
02/23/2024, 10:51 PMBinoy Shah
02/23/2024, 10:52 PMDaniel Gafni
02/23/2024, 10:54 PMBinoy Shah
02/23/2024, 10:55 PMaws_access_key_id:
env: AWS_ACCESS_KEY_ID
aws_secret_access_key:
env: AWS_SECRET_ACCESS_KEY
aws_session_token:
env: AWS_SESSION_TOKEN
such that the _process_env_vars
evaluates it lateBinoy Shah
02/23/2024, 10:55 PMBinoy Shah
02/23/2024, 10:56 PMDaniel Gafni
02/23/2024, 11:12 PMBinoy Shah
02/24/2024, 1:25 AMBinoy Shah
02/26/2024, 8:05 PMstorage_options
/ cloud_storage_options
work, not sure if it was my problem or some inherent underlying UPath or other translation issue. I finally remedied it by converting my non-conforming credential fields to AWS recognized ENV variables at the app startup in the __init__.py
file and it worked.
if getenv("IS_CI", "default") == "default":
environ["AWS_ACCESS_KEY_ID"] = getenv("ACCESS_KEY")
environ["AWS_SECRET_ACCESS_KEY"] = getenv("SECRET_KEY")
environ["AWS_SESSION_TOKEN"] = getenv("AWS_SESSION_TOKEN")
environ["AWS_REGION"] = getenv("AWS_REGION", "us-east-1")