Charles
04/10/2023, 11:15 AMschema
and table
inside of the op_1()
function such that it's available in MyIOManager
?
@op(out=Out(metadata={"schema": None, "table": None}))
def op_1():
"""Return a Pandas DataFrame."""
# DOES NOT WORK, it return None inside the io manager
context.add_output_metadata({
"schema": "some schema",
"table": "some table"
})
class MyIOManager(IOManager):
def handle_output(self, context, obj):
table_name = context.metadata["table"] # this is None, it should be "some table"
schema = context.metadata["schema"] # this is None, it should be "some schema"
write_dataframe_to_table(name=table_name, schema=schema, dataframe=obj)
claire
04/10/2023, 7:14 PMCharles
04/10/2023, 9:02 PMAndrew Grigorev
04/11/2023, 7:06 PMclass MetadataIOManager(IOManager):
def load_input(self, context: InputContext) -> RawMetadataValue:
e = context.instance.event_log_storage.get_event_records(EventRecordsFilter(
event_type=DagsterEventType.ASSET_MATERIALIZATION,
asset_key=context.asset_key,
asset_partitions=[context.partition_key],
), limit=1)
if len(e) == 0:
raise Exception('Asset materialization event not found.')
<http://context.log.info|context.log.info>("Using materialization event from run %s",
e[0].event_log_entry.run_id)
d = e[0].event_log_entry.dagster_event.event_specific_data
return d.materialization.metadata['value'].value
def handle_output(self, context: OutputContext, obj: RawMetadataValue) -> None:
context.add_output_metadata({'value': obj})
@io_manager(
config_schema={},
description="IO manager that stores and retrieves values from asset metadata.",
)
def metadata_io_manager(init_context: InitResourceContext):
return MetadataIOManager()
Moulay Chihani
04/25/2023, 4:03 PM