Travis DePriest
04/13/2023, 3:06 AM@io_manager(
config_schema={
"base_path": Field(str, is_required=True),
"AWS_ACCESS_KEY_ID": StringSource,
"AWS_SECRET_ACCESS_KEY": StringSource,
}
)
def s3_parquet_io_manager(init_context: InitResourceContext) -> PandasParquetIOManager:
# `UPath` will read boto env vars.
# The credentials can also be taken from the config and passed to `UPath` directly.
base_path = UPath(init_context.resource_config.get("base_path"))
assert str(base_path).startswith("s3://"), base_path
return PandasParquetIOManager(base_path=base_path)
They pass the config schema the AWS ACCESS KEY ID and the AWS SECRET ACCESS KEY and then the comments say that UPath will read the boto env vars? How does it implement this? If I want to write to a specific gcs bucket, how would I write my custom UPathIOManager, would I need to have the gcs client in the dump_from_path and load_from_path method? My biggest confusion so far comes from the config_schema...can I define whatever keys I want here? How Do I pass the keys from the config schema to the dump_from_path and load_from_path methods?
Another part of me thinks I should just take the code from here https://docs.dagster.io/_modules/dagster_gcp/gcs/io_manager#gcs_pickle_io_manager and create a new class that inherits the class PickledObjectGCSIOManager and add and extension parameter and override the load_input and handle_output methods to write an html file instead of pickling the object.Travis DePriest
04/13/2023, 4:21 AMTim Castillo
04/13/2023, 2:53 PM