# integration-dbt


07/12/2023, 7:43 PM
is there a way to get the dbt asset schema in dagster? like with the native dagster assets behind the "table schema" link?

Gabriel Montañola

07/12/2023, 8:36 PM
@Dennis Gera and I were trying some ways around this. Metadata wise you can override the
from DbtManifest
Copy code
def node_info_to_metadata(cls, node_info: Mapping[str, Any]) -> Mapping[str, Any]:
        dbt_schema = node_info.get("schema")
        metadata["dbt_schema"] = MetadataValue.text(dbt_schema)

        return metadata
Still, if you try to use a IO Manager for loading this dbt asset from a given database, it will try to infer the schema through
(from DBIOManager) and this will be probably a part of your AssetKey. So we tried to override
to check for
and... failed.
IOManager wise we went this route: • 1 general IOManager for working with PostgreSQL • 1 specific IOManager that clones
dbt custom schema
generation when trying to load dbt assets ◦ Concat
Copy code
class DbtDBIoManager(DbIOManager):
    def _get_table_slice(
        self, context: Union[OutputContext, InputContext], output_context: OutputContext
    ) -> TableSlice:
        output_context_metadata = output_context.metadata or {}

        schema: str
        table: str
        partition_dimensions: List[TablePartitionDimension] = []

        if not self._schema:
            raise ValueError("Schema must be specified in DbtIOManager resource config")

        if context.has_asset_key:
            asset_key_path = context.asset_key.path
            table = asset_key_path[-1]

            if len(asset_key_path) > 1 and self._schema:
                schema = f"{self._schema}_{asset_key_path[-2]}"
This works for our workflow because we expect schemas in dbt like
when developing. Production IOManager for instance will look for tables in
and so on. This is more of a PostgreSQL limitation coupled with dbt usage. But it works.
D 1
But I think you meant.. the schema schema (columns and such). This will read your manifest and fill what is needed. blob octopus Recommended discussion
Copy code
class CustomizedDbtManifest(DbtManifest):
    def node_info_to_asset_key(cls, node_info: Mapping[str, Any]) -> AssetKey:
        orig_asset_key = default_asset_key_fn(node_info)
        return AssetKey(["analytics"] + orig_asset_key.path)

    def node_info_to_metadata(cls, node_info: Mapping[str, Any]) -> Mapping[str, Any]:
        metadata: Dict[str, Any] = {}
        columns = node_info.get("columns", {})
        if len(columns) > 0:
            metadata["table_schema"] = MetadataValue.table_schema(
                            type=column_info.get("data_type") or "?",
                        for column_name, column_info in columns.items()
          return metadata
👍 1


07/14/2023, 9:30 AM
thank you! the last snippet looks like what I'm looking for. I'll give this a try Looks like I need to migrate my assets to the new model first.
@Gabriel Montañola after migrating to the new dbt_asset resource it looks like your code is now part of dagster-dbt as default_metadata_from_dbt_resource_props. too bad it requires to define each column in the sources.yml of dbt. doesn't dbt provide a way to return the actually materialized columns for each run? also it looks like the columns are just displayed as single fields in the regular asset sidebar and not like the dagster assets as a modal window.
it shows like this here
oh just saw that this unique_id is just by accident the same name i used on the column and it's actually something completely different and the actual metadata section with the table schema was just collapsed. so this works but only shows the columns defined in the schema and not the columns which are actually materialized. one step forward but still not what I expected with a real materialized table schema.