I am using `dagster-polars` library for PolarsParq...
# ask-community
b
I am using
dagster-polars
library for PolarsParquetIOManager. I am struggling to set up correct S3 Access. Our S3 credentials are not in their standard format and i need to copy it from some other place, How do I provide the PolarsParquetIOManager correct credentials cc @Daniel Gafni
d
Hey! You should be able to provide
storage_options
like this:
Copy code
PolarsParquetIOManager(..., cloud_storage_options={...})
This feature is a recent addition. I think @Ion Koutsouris has this working in production. Let me know if this works for you
b
Cool, thanks, trying it now
d
I think both
storage_options
and
cloud_storage_options
should work.
storage_options
is a Pydantic alias to
cloud_storage_options
, which is the original field.
storage_options
is an alias available for configuration from dicts/yaml
👍 1
Code for reference
Actually seems like aliases only affect serialization/deserialization. You would have to use
cloud_storage_options
when instantiating the class.
b
Yup, I was going to update you that info..
fixing it..
d
So does it work?
b
some other complications came up.. will surely update you after verification
👍 1
Using the attached code as shown in snippet at the bottom I still am getting errors as below.
Copy code
polars.exceptions.ComputeError: Generic S3 error: Error after 0 retries in 300.000908139s, max_retries:2, retry_timeout:10s, source:error sending request for url (<http://169.254.169.254/latest/api/token>): operation timed out
  File "/usr/local/lib/python3.10/site-packages/dagster/_core/execution/plan/utils.py", line 54, in op_execution_error_boundary
    yield
  File "/usr/local/lib/python3.10/site-packages/dagster/_core/execution/plan/inputs.py", line 831, in _load_input_with_input_manager
    value = input_manager.load_input(context)
  File "/usr/local/lib/python3.10/site-packages/dagster_polars/io_managers/base.py", line 219, in load_input
    return self._load_single_input(path, context)
  File "/usr/local/lib/python3.10/site-packages/dagster/_core/storage/upath_io_manager.py", line 239, in _load_single_input
    obj = self.load_from_path(context=context, path=path)
  File "/usr/local/lib/python3.10/site-packages/dagster_polars/io_managers/base.py", line 319, in load_from_path
    ldf = self.scan_df_from_path(path=path, context=context)  # type: ignore
  File "/usr/local/lib/python3.10/site-packages/dagster_polars/io_managers/parquet.py", line 276, in scan_df_from_path
    ldf = scan_parquet(path, context)
  File "/usr/local/lib/python3.10/site-packages/dagster_polars/io_managers/parquet.py", line 75, in scan_parquet
    return pl.scan_parquet(str(path), storage_options=storage_options, **kwargs)  # type: ignore
  File "/usr/local/lib/python3.10/site-packages/polars/utils/deprecation.py", line 136, in wrapper
    return function(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/polars/utils/deprecation.py", line 136, in wrapper
    return function(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/polars/io/parquet/functions.py", line 311, in scan_parquet
    return pl.LazyFrame._scan_parquet(
  File "/usr/local/lib/python3.10/site-packages/polars/lazyframe/frame.py", line 464, in _scan_parquet
    self._ldf = PyLazyFrame.new_from_parquet(
@Daniel Gafni edit. wierdness of slack showing badly ordered snippets.
d
I can think of 3 steps: 1. @Ion Koutsouris please let us know if you have this working in prod. Are there any errors above? 2. @Binoy Shah meanwhile, you can try to debug this by overriding this method (
def scan_df_from_path
) and changing this function usage it in. Perhaps you can print the values available in
path.storage_options
. 3. I think I might rework this by directly passing
self.cloud_storage_options
to data reading code instead of relying on
path.storage_options
. Also, can you try writing a parquet?
i
There were some issues with Polars and S3 not so long ago, something to do with concurrency budget in Tokio, so would be good to share what version you have
b
polars = { version = "^0.20.5", extras = ["numpy", "pyarrow", "connectorx" , "fsspec"] }
i
Ok point 3, you should double check what comes out of UPath
Because we are passing the storage options into UPath and then its upaths responsibility to construct the file system and storage options, if the Uri or storage options are not recognized then they are not properly passed through
d
Yes, I suspect this may be happening here. Try to print these values or jump into a remote debugger.
b
in another code I do this
Copy code
data_dictionary_latest: LazyFrame = pl.scan_parquet(source=survey_dict_in_s3, storage_options=_aws_secrets_from_env())
and that works
Copy code
def _aws_secrets_from_env():
    return {
        "aws_access_key_id": env_or_value("ACCESS_KEY"),
        "aws_secret_access_key": env_or_value("SECRET_KEY"),
        "aws_session_token": env_or_value("AWS_SESSION_TOKEN"),
        "aws_region": "us-east-1",
    }
@Ion Koutsouris when you say check what comes out of UPath, can you elaborate ?
i
Try to manually create UPath object
uPath(uri, **storage_options)
b
Oh okay.. checking now
i
Then jnspect the storage options in the object instance
d
Yeah check what's in
UPath(uri, **storage_options).storage_options
b
Copy code
my_up:UPath = UPath("<s3://bucket/dagster-io-parquet/rce_shared/SURVEY_RESPONSE_DATA_DICTIONARY.parquet>", **s3_io_creds )
my_up.storage_options
Out[34]: 
{'aws_access_key_id': 'AS99999999999',
 'aws_secret_access_key': 'DAu999999999St',
 'aws_session_token': 'FwoB53SI'}
So that's appropriate. Its not from dagster runtime, its on my python console ☝️
i
Try bumping polars
d
I suggest printing values/debugging inside the actual IOManager code just to be sure these values are passed to
pl.scan_parquet
b
is there a way I can directly invoke PolarsParquetIOManager to read the parquet file ?
d
Perhaps this code might be the easiest way:
Copy code
from dagster import materialize, asset, SourceAsset

upstream_asset = SourceAsset(["my_bucket", "my_key"], io_manager_def=PolarsParquetIOManager())

@asset
def my_asset(upstream_asset):
    ...

materialize([upstream_asset, my_asset])
b
Okay, checking it out
d
you can then insert a breakpoint in the PolarsParquetIOManager
👍 1
b
So Some progress @Daniel Gafni, It fails only in Kubernetes, but not in local When I run it in an asset, and print storage Options
print(context.resources.polars_io_manager_v2.storage_options)
it prints out
{}
d
So seems like an issue on your side? If it works locally it should also work in K8s (given the setup is correct)
b
That's what I am assuming, but I have Env variables setup a bit differently, for s3 credentials. My Local S3 Credential are in its Provider Chain, so the default aws credential profile has necessary credentials, although I am injecting the variables as I described above in the thread, i suspect it's falling back to its default credential chain
where'as Kubernetes does not have the default credential chain..
~/.aws/credentials
file is not present in kube pod
its only environ var
d
I think the first step should be figuring out how to load the Parauet file with raw polars Should be easy if you can exec in your pod
d
I see. In this case your goal should be seeing the same dictionary in path.storage_options inside the IOManager.
b
could it be that I need to do late binding and my dictionary should be the form of
Copy code
aws_access_key_id:
        env: AWS_ACCESS_KEY_ID
      aws_secret_access_key:
        env: AWS_SECRET_ACCESS_KEY
      aws_session_token:
        env: AWS_SESSION_TOKEN
such that the
_process_env_vars
evaluates it late
?
right now the code to build PolarsParquetIOManager https://dagster.slack.com/files/U03AE3Z2A0H/F06LBT91B42/untitled.py is invoked at startup of the App, but as resource, and in dagster pod runs, i might need to do late binding by associating the env keys
d
Can’t look at the code right now but I’m pretty sure cloud_storage_options supposed to be used with EnvVar. You should pass a dict of EnvVars to it. I think my previous example was misleading.
b
No worries, thanks for the help and have a great weekend
Hey @Daniel Gafni just to update you, I couldnt make the
storage_options
/
cloud_storage_options
work, not sure if it was my problem or some inherent underlying UPath or other translation issue. I finally remedied it by converting my non-conforming credential fields to AWS recognized ENV variables at the app startup in the
__init__.py
file and it worked.
Copy code
if getenv("IS_CI", "default") == "default":
    environ["AWS_ACCESS_KEY_ID"] = getenv("ACCESS_KEY")
    environ["AWS_SECRET_ACCESS_KEY"] = getenv("SECRET_KEY")
    environ["AWS_SESSION_TOKEN"] = getenv("AWS_SESSION_TOKEN")
    environ["AWS_REGION"] = getenv("AWS_REGION", "us-east-1")