Daniel
07/12/2023, 7:43 PMGabriel Montañola
07/12/2023, 8:36 PMnode_info_to_metadata
from DbtManifest
def node_info_to_metadata(cls, node_info: Mapping[str, Any]) -> Mapping[str, Any]:
dbt_schema = node_info.get("schema")
metadata["dbt_schema"] = MetadataValue.text(dbt_schema)
return metadata
Gabriel Montañola
07/12/2023, 8:38 PM_get_table_slice
(from DBIOManager) and this will be probably a part of your AssetKey.
So we tried to override _get_table_slice
to check for dbt_schema
and... failed.Gabriel Montañola
07/12/2023, 8:43 PMdbt custom schema
generation when trying to load dbt assets
◦ Concat user_defined_schema
+ dbt_schema
class DbtDBIoManager(DbIOManager):
def _get_table_slice(
self, context: Union[OutputContext, InputContext], output_context: OutputContext
) -> TableSlice:
output_context_metadata = output_context.metadata or {}
schema: str
table: str
partition_dimensions: List[TablePartitionDimension] = []
if not self._schema:
raise ValueError("Schema must be specified in DbtIOManager resource config")
if context.has_asset_key:
asset_key_path = context.asset_key.path
table = asset_key_path[-1]
if len(asset_key_path) > 1 and self._schema:
schema = f"{self._schema}_{asset_key_path[-2]}"
...
This works for our workflow because we expect schemas in dbt like gabriel_sales
or daniel_finance
when developing.
Production IOManager for instance will look for tables in analytics_sales
and so on. This is more of a PostgreSQL limitation coupled with dbt usage. But it works.Gabriel Montañola
07/12/2023, 8:56 PMclass CustomizedDbtManifest(DbtManifest):
@classmethod
def node_info_to_asset_key(cls, node_info: Mapping[str, Any]) -> AssetKey:
orig_asset_key = default_asset_key_fn(node_info)
return AssetKey(["analytics"] + orig_asset_key.path)
@classmethod
def node_info_to_metadata(cls, node_info: Mapping[str, Any]) -> Mapping[str, Any]:
metadata: Dict[str, Any] = {}
columns = node_info.get("columns", {})
if len(columns) > 0:
metadata["table_schema"] = MetadataValue.table_schema(
TableSchema(
columns=[
TableColumn(
name=column_name,
type=column_info.get("data_type") or "?",
description=column_info.get("description"),
)
for column_name, column_info in columns.items()
]
)
)
return metadata
Daniel
07/14/2023, 9:30 AMDaniel
08/16/2023, 6:53 AMDaniel
08/16/2023, 6:53 AMDaniel
08/16/2023, 11:52 AM