https://dagster.io/ logo
#integration-airbyte
Title
# integration-airbyte
o

Ohad

04/10/2023, 10:46 PM
Hi there, when I try to run
dagster dev
I get the following error
/home/byteuser/venv/lib/python3.10/site-packages/dagster/_core/workspace/context.py:591: UserWarning: Error loading repository location hillsg_dagster:dagster._core.errors.DagsterInvalidDefinitionError: "src_airtable__hg_data_entry/sub_learning_areas" is not a valid name in Dagster. Names must be in regex ^[A-Za-z0-9_]+$.
Looking into it, the Stream name for an Airbyte connection with Airtable is
"src_airtable__hg_data_entry/sub_learning_areas"
I can't control the stream name, this is how Airbyte resolves the Base/Table name with the Airbyte connector. When the table is created in the DB, it changes "/" to "_". Any idea how can I work around this issue?
j

Jon Erik Kemi Warghed

04/12/2023, 6:01 AM
Ran into something similar dagster airbyte doesn't respect the changes airbyte does to naming of special characters in field names either.
a

Adam Bloom

04/12/2023, 2:12 PM
You might want to pass a custom _`connection_to_asset_key_fn`_ when loading assets?
o

Ohad

04/12/2023, 8:33 PM
Hi @Adam Bloom, how would the custom function look? The table db names are: src_airtable__hg_dat__ry_sub_learning_areas src_airtable__hg_data_entry_absence_groups This is what I got for now..
Copy code
airbyte_assets = load_assets_from_airbyte_instance(
    AIRBYTE_INSTANCE,
    key_prefix=['sources'],
    connection_to_asset_key_fn=?)
h

Hiroto Yamakawa

07/04/2023, 1:16 PM
Hi @Ohad, were you able to find a solution to this issue?
o

Ohad

07/08/2023, 12:16 AM
Unfortunately not @Hiroto Yamakawa pleas let me know if you do find a solution for it 🙏
a

Adam Bloom

07/08/2023, 12:21 AM
Did you explore adding a custom mapping function? I don't see why that wouldn't work for your case. I'm not using one myself, so I don't have an example, but I'll link you to the default one if you haven't looked at that already
The default is just using the table: https://github.com/dagster-io/dagster/blob/master/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_defs.py#L505-L507 if you just wanted to replace the
/
with another valid character (i.e.
_
), you could use
lambda _, table: AssetKey(path=[table.replace('/','_')])
The first argument to that lambda is additional metadata if you want to use it in your asset keys.
🙏 1
o

Ohad

07/08/2023, 12:55 AM
Great idea. Thanks, I’ll give it a go.
h

Hiroto Yamakawa

07/11/2023, 2:07 PM
@Ohad the only way I was able to do it was as follow:
Copy code
from dagster_airbyte.asset_defs import _clean_name

def connection_to_asset_key_function(meta, name):

    for item in meta.stream_data:
        item['stream']['name'] = _clean_name(item['stream']['name'])
    return AssetKey([meta.name, name])
and then in
dagster_airbyte/asset_defs.py
( a file from the package):
Copy code
def parse_stream_tables(
    self, return_normalization_tables: bool = False
) -> Mapping[str, AirbyteTableMetadata]:
   [...]

    for stream in enabled_streams:
        name = _clean_name(cast(str, stream.get("stream", {}).get("name")))
   [...]
that way, the files with rejected charaters were loaded into Dagster
🙌 1
the modification in
dagster_airbyte/asset_defs.py
is actually enough, since stream parsing happens first. I guess this would need a PR
I would be happy to open it, my first OSS contribution
o

Ohad

07/21/2023, 8:57 PM
Thank you @Hiroto Yamakawa for sharing this, and it would be great if this could be merged into dagster_airbyte 🙏
👌 1
4 Views