Thorsten Schäfer
08/03/2022, 10:06 AMowen
08/03/2022, 5:58 PMThorsten Schäfer
08/03/2022, 6:01 PMclass MetaDataDecorator(IOManager):
def __init__(self, base_io_manger: IOManager):
self.base_io_manager = base_io_manger
def handle_output(self, context: OutputContext, obj: Any) -> None:
self.base_io_manager.handle_output(context, obj)
if isinstance(obj, DataFrame):
asset_name = context.asset_key.path[-1]
context.log.debug(f"Storing meta information for data frame output of asset {asset_name}")
# TODO yielding currently not working
#yield MetadataEntry(value=<http://MetadataValue.int|MetadataValue.int>(len(obj)), label="Number of rows")
#yield MetadataEntry(value=<http://MetadataValue.int|MetadataValue.int>(len(obj.columns)), label="Number of columns")
#yield MetadataEntry(value=MetadataValue.text(<http://obj.to|obj.to>_string(max_rows=10)), label='Sample')
def load_input(self, context: InputContext) -> Any:
return self.base_io_manager.load_input(context)
class DataframeToCsvDecorator(IOManager):
def __init__(self, base_dir, base_io_manger: IOManager):
self.base_dir = base_dir
self.base_io_manager = base_io_manger
def _get_path(self, output_context: OutputContext) -> str:
return os.path.join(self.base_dir, f"{output_context.asset_key.path[-1]}.csv")
def handle_output(self, context: OutputContext, obj: Any) -> None:
self.base_io_manager.handle_output(context, obj)
if isinstance(obj, DataFrame):
asset_name = context.asset_key.path[-1]
path = self._get_path(context)
context.log.debug(f"Storing CSV for data frame output of asset {asset_name} to path {path}")
<http://obj.to|obj.to>_csv(path, index=False)
# TODO yielding currently not working
#yield MetadataEntry(value=MetadataValue.path(os.path.abspath(path)), label="CSV path")
def load_input(self, context: InputContext) -> Any:
return self.base_io_manager.load_input(context)
@io_manager(
config_schema={"base_dir": Field(StringSource, is_required=False)},
description="Built-in filesystem IO manager that stores and retrieves values using pickling.",
)
def csv_io_manager(init_context):
base_dir = init_context.resource_config.get(
"base_dir", init_context.instance.storage_directory()
)
return DataframeToCsvDecorator(base_dir, MetaDataDecorator(PickledObjectFilesystemIOManager(base_dir)))
yield MetaDataEntry
lines in the code, only the outer decorator (DataframeToCsvConverter
) gets called, but the call does not go to the MetaDataDecorator
and the PickledObjectFilesystemIOManager
owen
08/03/2022, 6:06 PMyield
statement inside of a function, it becomes a generator. This generator does not fully execute when invoked, i.e.:
>>> def f():
... yield 1
... 1 / 0
... yield 2
...
>>> f()
<generator object f at 0x1071736d0>
>>>
def handle_output(self, context: OutputContext, obj: Any) -> None:
yield from self.base_io_manager.handle_output(context, obj)
yield from
will exhaust the generator from the inner IOManager's handle output function before moving onThorsten Schäfer
08/03/2022, 6:42 PM