https://dagster.io/ logo
#ask-community
Title
# ask-community
c

Charles

04/10/2023, 11:15 AM
I have a question regarding Providing per-output metadata to an IO manager. Following the example below, how can I set
schema
and
table
inside of the
op_1()
function such that it's available in
MyIOManager
?
Copy code
@op(out=Out(metadata={"schema": None, "table": None}))
def op_1():
    """Return a Pandas DataFrame."""
   # DOES NOT WORK, it return None inside the io manager
   context.add_output_metadata({
        "schema": "some schema",
        "table": "some table"
    })

class MyIOManager(IOManager):
    def handle_output(self, context, obj):
        table_name = context.metadata["table"] #  this is None, it should be "some table"
        schema = context.metadata["schema"] #  this is None, it should be "some schema"
        write_dataframe_to_table(name=table_name, schema=schema, dataframe=obj)
@daniel would love your input here, what's best practice to pass metadata to io managers?
c

claire

04/10/2023, 7:14 PM
Hi Charles. Unfortunately this currently doesn't work, agree that it is a pain point that output metadata generated within an op isn't accessible via the IO manager. This is a known issue that's documented here: https://github.com/dagster-io/dagster/discussions/6913, please feel free to comment with any thoughts
c

Charles

04/10/2023, 9:02 PM
I ended up adopting the solution in that post which is ok but feels like a workaround, thanks!
a

Andrew Grigorev

04/11/2023, 7:06 PM
This works for me:
Copy code
class MetadataIOManager(IOManager):

    def load_input(self, context: InputContext) -> RawMetadataValue:
        e = context.instance.event_log_storage.get_event_records(EventRecordsFilter(
            event_type=DagsterEventType.ASSET_MATERIALIZATION,
            asset_key=context.asset_key,
            asset_partitions=[context.partition_key],
        ), limit=1)
        if len(e) == 0:
            raise Exception('Asset materialization event not found.')
        <http://context.log.info|context.log.info>("Using materialization event from run %s",
                         e[0].event_log_entry.run_id)
        d = e[0].event_log_entry.dagster_event.event_specific_data
        return d.materialization.metadata['value'].value

    def handle_output(self, context: OutputContext, obj: RawMetadataValue) -> None:
        context.add_output_metadata({'value': obj})


@io_manager(
    config_schema={},
    description="IO manager that stores and retrieves values from asset metadata.",
)
def metadata_io_manager(init_context: InitResourceContext):
    return MetadataIOManager()
m

Moulay Chihani

04/25/2023, 4:03 PM
@Audrey Kervella
q

Quentin Gaborit

06/12/2023, 7:59 PM
@Charles what’s the workaround you mentioned? Creating a MetadataIOManager that you attach to each ops and assets’ as another resource?