Andrzej Lewandowski
04/12/2023, 9:43 AMfrom {{ source('X', 'Y') }}
asset with same keys, but Airbyte doesn’t know anything about this name. So I wrote function node_info_to_asset_key which is used to load assets from dbt projects to get information from yaml file (if defined) and this already works - dagster is trying to find asset using database schema and table name - (airbyte knows where is loading data). After that I wan’t use method connection_to_asset_key_fn to map connections to format database/schema/table name - so assets from airbyte can be connected with assets from dbt. Unfortunately AirbyteConnectionMetadata
which is passed as a param to connection_to_asset_key_fn doesn’t have information about source/destination (which is available in payload in rest api). Only 4 properties are available from this payload. https://github.com/dagster-io/dagster/blob/43fc9b8948c8bc7e159fdfaefc15848ee61d447[…]modules/libraries/dagster-airbyte/dagster_airbyte/asset_defs.py
and finally question 🙂
1. It’s a better way to connect assets from dbt with assets from airbyte using names from dbt whcich airbyte doesn’t understand?
2. It’s possible to get information about source/destination from airbyte connection response or should I create pr for this?Andrzej Lewandowski
04/12/2023, 10:13 AMAndrzej Lewandowski
04/12/2023, 10:22 AMAndrzej Lewandowski
04/12/2023, 10:41 AMAdam Bloom
04/12/2023, 2:11 PMAndrzej Lewandowski
04/12/2023, 4:13 PMAdam Bloom
04/12/2023, 4:28 PM_get_node_asset_key
. you can handle sources separately than your dbt models with something like:
def _get_node_asset_key(node_info: Mapping[str, Any]) -> AssetKey:
if node_info["resource_type"] == "source":
Andrzej Lewandowski
04/12/2023, 4:44 PMconnection_to_asset_key_fn
from loading airbyte assets to get namespace and stream name and use it as an AssetKey and from dbt side i used node_info_to_asset_key
and partially worked. My problem is that I have 3 destinations configured (same snowflake connection but different database) There is a problem, because i have same schema/tables in airbyte with same name but located in different databases (data are not the same). So my idea was add database prefix to asset key in dbt (which is possible and works as well) but from airbyte side it’s impossible because I don’t have access to property - destination database (which is return by airbyte api but not accessible from AirbyteConnectionMetadata)Adam Bloom
04/12/2023, 4:58 PMnode_info_to_asset_key
. I'm using an asset factory that has additional context (i.e. your example of the destination database). example of what I'm doing:
def dbt_asset_factory(
params...
):
def _get_node_asset_key(node_info: Mapping[str, Any]) -> AssetKey:
if node_info["resource_type"] == "source":
if node_info["source_name"] == "<same name special case>":
source_name = param
...
Andrzej Lewandowski
04/12/2023, 4:59 PMAndrzej Lewandowski
04/12/2023, 4:59 PMdef connection_to_asset_key(meta, name):
stream_data = next(item for item in meta.stream_data if item['stream']['name'] == name)
if stream_data and "namespace" in stream_data['stream']:
return AssetKey([stream_data['stream']['namespace'].upper(), name.upper()])
return AssetKey([meta.name, name])
Andrzej Lewandowski
04/12/2023, 5:00 PMAdam Bloom
04/12/2023, 5:00 PMAndrzej Lewandowski
04/12/2023, 5:02 PMAdam Bloom
04/12/2023, 5:04 PMAdam Bloom
04/12/2023, 5:09 PMAdam Bloom
04/12/2023, 5:10 PMconnection_filter
Andrzej Lewandowski
04/12/2023, 5:14 PMAndrzej Lewandowski
04/12/2023, 5:17 PM