https://dagster.io/ logo
#integration-airbyte
Title
# integration-airbyte
a

Andrzej Lewandowski

04/12/2023, 9:43 AM
Hi there. I was trying to connect assets from dbt with assets with airbyte. I’ve already configured both integrations, assets are available on dagster but I can connect them using proper names. I saw that dbt is trying to find assets from source e.g.
from {{ source('X', 'Y') }}
asset with same keys, but Airbyte doesn’t know anything about this name. So I wrote function node_info_to_asset_key which is used to load assets from dbt projects to get information from yaml file (if defined) and this already works - dagster is trying to find asset using database schema and table name - (airbyte knows where is loading data). After that I wan’t use method connection_to_asset_key_fn to map connections to format database/schema/table name - so assets from airbyte can be connected with assets from dbt. Unfortunately
AirbyteConnectionMetadata
which is passed as a param to connection_to_asset_key_fn doesn’t have information about source/destination (which is available in payload in rest api). Only 4 properties are available from this payload. https://github.com/dagster-io/dagster/blob/43fc9b8948c8bc7e159fdfaefc15848ee61d447[…]modules/libraries/dagster-airbyte/dagster_airbyte/asset_defs.py and finally question 🙂 1. It’s a better way to connect assets from dbt with assets from airbyte using names from dbt whcich airbyte doesn’t understand? 2. It’s possible to get information about source/destination from airbyte connection response or should I create pr for this?
dagster bot responded by community 1
tl;dr what I need to to do to get destination namespace in connection_to_asset_key_fn when importing assets from airbyte instances
I saw that in metadata I have stream_data i have list of table/streams so I can iterate over it and found proper namespace, but i need to overate on every assets to it’s not optimal solution i think so
but in stream_data i doestn’t have information about destination database
a

Adam Bloom

04/12/2023, 2:11 PM
is airbyte upstream of dbt or the other way around? we have airbyte->dbt, and were able to get things going with dbt sources matching the airbyte asset names
a

Andrzej Lewandowski

04/12/2023, 4:13 PM
Yes, airbyte is upstream of dbt
a

Adam Bloom

04/12/2023, 4:28 PM
do your sources in dbt match the airbyte asset keys? if not, you'll need to put a mapping function in
_get_node_asset_key
. you can handle sources separately than your dbt models with something like:
Copy code
def _get_node_asset_key(node_info: Mapping[str, Any]) -> AssetKey:
        if node_info["resource_type"] == "source":
a

Andrzej Lewandowski

04/12/2023, 4:44 PM
I did it exactly as you wrote. I used
connection_to_asset_key_fn
from loading airbyte assets to get namespace and stream name and use it as an AssetKey and from dbt side i used
node_info_to_asset_key
and partially worked. My problem is that I have 3 destinations configured (same snowflake connection but different database) There is a problem, because i have same schema/tables in airbyte with same name but located in different databases (data are not the same). So my idea was add database prefix to asset key in dbt (which is possible and works as well) but from airbyte side it’s impossible because I don’t have access to property - destination database (which is return by airbyte api but not accessible from AirbyteConnectionMetadata)
a

Adam Bloom

04/12/2023, 4:58 PM
ah - I had something similar, but I handled it by using additional context in
node_info_to_asset_key
. I'm using an asset factory that has additional context (i.e. your example of the destination database). example of what I'm doing:
Copy code
def dbt_asset_factory(
    params...
):

    def _get_node_asset_key(node_info: Mapping[str, Any]) -> AssetKey:
        if node_info["resource_type"] == "source":
            if node_info["source_name"] == "<same name special case>":
                source_name = param
    ...
a

Andrzej Lewandowski

04/12/2023, 4:59 PM
it’s not a problem on dbt side - I saw that I have more possibilities to deal with it 🙂 My problem is how to get database with namespace with table as an asset key from airbyte side. I have this code:
Copy code
def connection_to_asset_key(meta, name):

    stream_data = next(item for item in meta.stream_data if item['stream']['name'] == name)
    if stream_data and "namespace" in stream_data['stream']:
        return AssetKey([stream_data['stream']['namespace'].upper(), name.upper()])

    return AssetKey([meta.name, name])
which add namespace (schema) to asset key and my ~80% cases works right now
a

Adam Bloom

04/12/2023, 5:00 PM
you could do something similar if you use an asset factory for airbyte assets
a

Andrzej Lewandowski

04/12/2023, 5:02 PM
do you have some documentation for this?
a

Adam Bloom

04/12/2023, 5:04 PM
no - not aware of docs. can share an incomplete example if that would help. (I can't share the complete example since our user code can't be shared in full)
the docs you'd want concern the filtering arguments for loading assets from airbyte, since using those would be required. https://docs.dagster.io/_apidocs/libraries/dagster-airbyte#assets
i.e.
connection_filter
a

Andrzej Lewandowski

04/12/2023, 5:14 PM
thanks, I got an idea how to solve it
I really appreciate you help
2 Views