Hi all, I've been playing around with adding metad...
# ask-community
m
Hi all, I've been playing around with adding metadata to the output of my io manager and I managed to get things working, I now have nice graphs and markdowns showing in Dagit! dagster spin While testing I've created a few test metadata outputs that are no longer being created on new asset materializations, however I still see these old values in Dagit (although they are correctly not being updated anymore for new runs). Is there an easy way to remove any metadata that is no longer being generated? If that means losing my historic runs, I'm perfectly fine with that.
🤖 1
Found it! In Dagit -> Assets -> select assets and wiping materializations erases the history including the old metadata.
r
This is lovely! Do you have any examples one can copy?
m
I'm afraid I may have set the expectations a bit high, but I simply adjusted the default io manager by copying its entire contents and adding:
Copy code
context.add_output_metadata(
    {
        "Rows": obj.shape[0],
        "Columns": obj.shape[1],
        "Sample": MarkdownMetadataValue(obj.head(10).to_markdown()),
    }
)
to the output. I've tried the csv io manager of https://github.com/dagster-io/dagster/tree/master/examples/assets_pandas_type_metadata as well, but it expects an AssetKey on every input/output, which I couldn't get to work when not working with assets. The full code of my io manager is the following (a direct copy paste of the default one, but with the above code block added, as well as an import for MarkdownMetadataValue)):
Copy code
import pickle
from typing import Any

from upath import UPath

import dagster._check as check
from dagster import DagsterInvariantViolationError
from dagster._core.definitions.metadata import (
    MarkdownMetadataValue,
)
from dagster._core.execution.context.input import InputContext
from dagster._core.execution.context.output import OutputContext
from dagster._core.storage.upath_io_manager import UPathIOManager
from dagster._utils import PICKLE_PROTOCOL


class PickledObjectFilesystemIOManager(UPathIOManager):
    """Built-in filesystem IO manager that stores and retrieves values using pickling.
    Is compatible with local and remote filesystems via `universal-pathlib` and `fsspec`.
    Learn more about how to use remote filesystems here: <https://github.com/fsspec/universal_pathlib>

    Args:
        base_dir (Optional[str]): base directory where all the step outputs which use this object
            manager will be stored in.
        **kwargs: additional keyword arguments for `universal_pathlib.UPath`.
    """

    extension: str = (
        ""  # TODO: maybe change this to .pickle? Leaving blank for compatibility.
    )

    def __init__(self, base_dir=None, **kwargs):
        self.base_dir = check.opt_str_param(base_dir, "base_dir")

        super().__init__(base_path=UPath(base_dir, **kwargs))

    def dump_to_path(self, context: OutputContext, obj: Any, path: UPath):

        try:
            with path.open("wb") as file:
                pickle.dump(obj, file, PICKLE_PROTOCOL)

        except (AttributeError, RecursionError, ImportError, pickle.PicklingError) as e:
            executor = context.step_context.pipeline_def.mode_definitions[
                0
            ].executor_defs[0]

            if isinstance(e, RecursionError):
                # if obj can't be pickled because of RecursionError then __str__() will also
                # throw a RecursionError
                obj_repr = f"{obj.__class__} exceeds recursion limit and"
            else:
                obj_repr = obj.__str__()

            raise DagsterInvariantViolationError(
                f"Object {obj_repr} is not picklable. You are currently using the "
                f"fs_io_manager and the {executor.name}. You will need to use a different "
                "io manager to continue using this output. For example, you can use the "
                "mem_io_manager with the in_process_executor.\n"
                "For more information on io managers, visit "
                "<https://docs.dagster.io/concepts/io-management/io-managers> \n"
                "For more information on executors, vist "
                "<https://docs.dagster.io/deployment/executors#overview>"
            ) from e

        context.add_output_metadata(
            {
                "Rows": obj.shape[0],
                "Columns": obj.shape[1],
                "Sample": MarkdownMetadataValue(obj.head(10).to_markdown()),
            }
        )

    def load_from_path(self, context: InputContext, path: UPath) -> Any:
        with path.open("rb") as file:
            return pickle.load(file)
There may be easier/better solutions, but hey, it works. 🙂