I'm using the experimental API for yielding metada...
# ask-community
t
I'm using the experimental API for yielding metadata from an IO Manager. I have written two classes (decorator pattern) MetaDataDecorator and DataframeToCsvDecorator which yield meta data and then delegate the call to an underlying IOManager. When using a decorated IOManager (return DataframeToCsvDecorator(base_dir, MetaDataDecorator(PickledObjectFilesystemIOManager(base_dir))) ), the call chain works when not yielding any meta-data, but it only results in call to the outer decorator (DataframeToCsvDecorator) when meta-data is yielded. Is this an intended behavior / can anyone suggest a workaround?
🤖 1
o
hi @Thorsten Schäfer! do you mind sharing a code snippet here? You can remove any business logic, just trying to get a better sense of the shape of what you're doing.
t
Sure.
class MetaDataDecorator(IOManager):
def __init__(self, base_io_manger: IOManager):
self.base_io_manager = base_io_manger
def handle_output(self, context: OutputContext, obj: Any) -> None:
self.base_io_manager.handle_output(context, obj)
if isinstance(obj, DataFrame):
asset_name = context.asset_key.path[-1]
context.log.debug(f"Storing meta information for data frame output of asset {asset_name}")
# TODO yielding currently not working
#yield MetadataEntry(value=<http://MetadataValue.int|MetadataValue.int>(len(obj)), label="Number of rows")
#yield MetadataEntry(value=<http://MetadataValue.int|MetadataValue.int>(len(obj.columns)), label="Number of columns")
#yield MetadataEntry(value=MetadataValue.text(<http://obj.to|obj.to>_string(max_rows=10)), label='Sample')
def load_input(self, context: InputContext) -> Any:
return self.base_io_manager.load_input(context)
class DataframeToCsvDecorator(IOManager):
def __init__(self, base_dir, base_io_manger: IOManager):
self.base_dir = base_dir
self.base_io_manager = base_io_manger
def _get_path(self, output_context: OutputContext) -> str:
return os.path.join(self.base_dir, f"{output_context.asset_key.path[-1]}.csv")
def handle_output(self, context: OutputContext, obj: Any) -> None:
self.base_io_manager.handle_output(context, obj)
if isinstance(obj, DataFrame):
asset_name = context.asset_key.path[-1]
path = self._get_path(context)
context.log.debug(f"Storing CSV for data frame output of asset {asset_name} to path {path}")
<http://obj.to|obj.to>_csv(path, index=False)
# TODO yielding currently not working
#yield MetadataEntry(value=MetadataValue.path(os.path.abspath(path)), label="CSV path")
def load_input(self, context: InputContext) -> Any:
return self.base_io_manager.load_input(context)
@io_manager(
config_schema={"base_dir": Field(StringSource, is_required=False)},
description="Built-in filesystem IO manager that stores and retrieves values using pickling.",
)
def csv_io_manager(init_context):
base_dir = init_context.resource_config.get(
"base_dir", init_context.instance.storage_directory()
)
return DataframeToCsvDecorator(base_dir, MetaDataDecorator(PickledObjectFilesystemIOManager(base_dir)))
When I use the IOManager (csv_io_manager) with the decorators using the above code, it works. As soon as I uncomment the
yield MetaDataEntry
lines in the code, only the outer decorator (
DataframeToCsvConverter
) gets called, but the call does not go to the
MetaDataDecorator
and the
PickledObjectFilesystemIOManager
o
got it, I see what's happening here. when you have a
yield
statement inside of a function, it becomes a generator. This generator does not fully execute when invoked, i.e.:
Copy code
>>> def f():
...     yield 1
...     1 / 0
...     yield 2
...
>>> f()
<generator object f at 0x1071736d0>
>>>
to fix this, you can change the first line of handle output to
Copy code
def handle_output(self, context: OutputContext, obj: Any) -> None:
        yield from self.base_io_manager.handle_output(context, obj)
yield from
will exhaust the generator from the inner IOManager's handle output function before moving on
t
Thanks @owen, that's working nicely. 👍