hey, I have some issues with `dagster-airbyte` int...
# ask-community
t
hey, I have some issues with
dagster-airbyte
integration plus some feature ideas (what I can add/contribute). is here anyone who is aware of the airbyte integration? I’d explain it in details what hurts me
in short, airbyte’s
build_airbyte_assets
does not support assets generated in Normalized tabular data transformation
so after I refresh my connections, I got errors like:
Copy code
op 'airbyte_sync_280a1' did not fire outputs {'src_workflows_customstatuses', 'src_tasks_customfields'}
while these tables were updated by airflow as part of the built-in transformation stage, I cannot tell dagster to assume those tables refreshed. if I was messy I can show things in more details.
j
@sandy who else is familiar with the airbyte integration with owen OOO?
🙏 1
t
I'd also make a contribution if possible, making the table list optional in build_airbyte_assets. if that parameter is empty dagster should refresh all tables in the specified connection
if you don't hate the idea I can drop a PR
s
while these tables were updated by airflow as part of the built-in transformation stage, I cannot tell dagster to assume those tables refreshed.
do you mind clarifying what you mean by this a little further?
build_airbyte_assets
needs the table list to know what software-defined assets to create. if the table list is shifting around often, you might want to consider using
airbyte_sync_op
instead?
do you know the full list of tables that might be refreshed, but not every one of them will be refreshed every time? if that's the case, one option would be modifying
build_airbyte_assets
with
is_required=False
on the `AssetOut`s. that would allow it to not fire every output every time
t
OK, let me explain this buit-in transformations:
let’s say I have a few streams defined in my connection:
these are JSON based web APIs, so I have an option in airbyte to normalize these streams:
as part of the normalization process I will get more output tables in my destination. tables, that are not part of the original stream definitions:
the reason is because airbyte will flatten some of the json hierarchies into separate tables.
an example is
src_tasks_customfields
now if I specify all tables in
build_airbyte_assets
like
src_tasks_customfields
, then I will get this error:
if I do not specify `src_tasks_customfields`in
build_airbyte_assets
, then my lineage will be broken as some of the downstream tables are using it as an input:
does this explain my issue?
s
that makes sense. can I ask you to try something? instead of using the
build_airbyte_assets
in the dagster-airbyte library, define your own:
Copy code
from typing import List, Optional

from dagster_airbyte.utils import generate_materializations

from dagster import AssetKey, AssetOut, Output
from dagster import _check as check
from dagster._annotations import experimental
from dagster._core.definitions import AssetsDefinition, multi_asset


@experimental
def build_airbyte_assets(
    connection_id: str,
    destination_tables: List[str],
    asset_key_prefix: Optional[List[str]] = None,
) -> List[AssetsDefinition]:
    """
    Builds a set of assets representing the tables created by an Airbyte sync operation.

    Args:
        connection_id (str): The Airbyte Connection ID that this op will sync. You can retrieve this
            value from the "Connections" tab of a given connector in the Airbyte UI.
        destination_tables (List[str]): The names of the tables that you want to be represented
            in the Dagster asset graph for this sync. This will generally map to the name of the
            stream in Airbyte, unless a stream prefix has been specified in Airbyte.
        asset_key_prefix (Optional[List[str]]): A prefix for the asset keys inside this asset.
            If left blank, assets will have a key of `AssetKey([table_name])`.
    """

    asset_key_prefix = check.opt_list_param(asset_key_prefix, "asset_key_prefix", of_type=str)

    @multi_asset(
        name=f"airbyte_sync_{connection_id[:5]}",
        outs={
            table: AssetOut(key=AssetKey(asset_key_prefix + [table], is_required=False))
            for table in destination_tables
        },
        required_resource_keys={"airbyte"},
        compute_kind="airbyte",
    )
    def _assets(context):
        ab_output = context.resources.airbyte.sync_and_poll(connection_id=connection_id)
        for materialization in generate_materializations(ab_output, asset_key_prefix):
            table_name = materialization.asset_key.path[-1]
            if table_name in destination_tables:
                yield Output(
                    value=None,
                    output_name=table_name,
                    metadata={
                        entry.label: entry.entry_data for entry in materialization.metadata_entries
                    },
                )
            else:
                yield materialization

    return [_assets]
this is exactly the same as the one in the dagster-airbyte library, except it marks the outputs as non-required if this works, we can make this change in the dagster-airbyte library itself
t
thanks for getting back this early! so, I will try out in a few mins and let you know
OK, it did help, not it succeeds:
(thank you)
but it does not solve the real issue, that airbyte in fact materialized this table but dagster does not know about it:
but at least it does fail, so it is better than it was
another question: why do we have to specify the table list for
build_airbyte_assets
? Shouldn’t we support to retrieve all streams by default (like in case of
load_assets_from_dbt_project
for instance). I feel specifying
destination_tables
is unnecessary in 90% of the times as you sync the entire connection.
OK, I have an idea how to make this work, let me quickly code it and I’ll share. We should create two parameters for the function, one for destination_tables for stream emitted tables. One new parameter for transformation/normalized tables. On successful completion we can also mark the normalized one materialized.
we can make this new optional parameter a Map, key is normalized table, value is the source table(s), and if the source succeeded then mark it materialized.
I know it is suboptimal, but what do you think: https://github.com/dagster-io/dagster/pull/9561/files
s
are you able to help me understand a little better why we're not registering that
src_tasks_customfields
is being updated? is there some way of detecting this with Airbyte's APIs?
another question: why do we have to specify the table list for
build_airbyte_assets
?
it's because we need to construct the asset definitions ahead of when we trigger the syncs. otherwise, we wouldn't know what to display in the asset graph until someone launches a run. that said, we've explored an approach that would query the airbyte API at the time
build_airbyte_assets
is invoked. we're interested in adding that soon.
👍 1
t
I did not find any APIs, just mentioned in the text logs
if you have a few mins later today I can show it or record a quick video
well I might have an idea
from the stream metadata we can guess the generated table names
what I can do is to check if normalization is enabled and what fields are array or object typed
there is an API where you can see if basic normalization is enabled:
Copy code
❯ curl -X POST "<http://airbyte-webapp-svc/api/v1/operations/list>" --data '{"connectionId": "280a16a9-64c4-4d90-b2ed-0bd9882b89ff"}'  -H "Content-Type: application/json" |jq
{
  "operations": [
    {
      "workspaceId": "c2b8be7c-aad2-45a3-b07e-8b3d1ab550fe",
      "operationId": "6758d834-80f8-4a92-9a1f-bdab3ea1d8bd",
      "name": "Normalization",
      "operatorConfiguration": {
        "operatorType": "normalization",
        "normalization": {
          "option": "basic"
        }
      }
    }
  ]
}
if normalization is basic, we know that airbyte will make more tables. Now, to find out what tables are generated we have to interpret the schema from
api/v1/connections/get
if that returns columns with array.object type, then we know that a new table will be created in addition to the stream tables
s
hey Tamas - our dagster-airbyte expert (Owen) is out on vacation this week, but will be back next week on Tuesday. I'm going to ask him to look in to this. feel free to ping us if we don't get to it after then
t
sounds good, thanks!
o
hi @Tamas Foldi! looked into this a bit, and this is definitely not ideal behavior, but it's a tricky problem to solve, especially because airbyte does not provide a way to query what tables will be produced by a given normalization operation (nor does it provide a way to query what tables were produced by the operation after the fact). It's definitely possible to come up with algorithms that do a pretty good job at guessing output table names based off of the connection schema, but there are definitely edge cases (see: https://docs.airbyte.com/understanding-airbyte/basic-normalization/#naming-limitations--truncation), which I don't think we would want to maintain handling for. For these cases, maybe the conceptually simplest (if somewhat annoying) solution would be to have a separate
destination_table_to_unnested_tables
mapping parameter (bad name, but that sort of idea), which maps the base table to a set of transformed, unnested tables. so for your case that would look like
Copy code
{
    "src_tasks": {"src_tasks_customfields"}, 
    "src_workflows": {"src_workflows_customstatuses"},
}
then adding those values as additional
destination_tables
. Finally, you could change the logic inside the asset to look like
Copy code
def _assets(context):
        ab_output = context.resources.airbyte.sync_and_poll(connection_id=connection_id)
        for materialization in generate_materializations(ab_output, asset_key_prefix):
            table_name = materialization.asset_key.path[-1]
            if table_name in destination_tables:
                yield Output(
                    value=None,
                    output_name=table_name,
                    metadata={
                        entry.label: entry.entry_data for entry in materialization.metadata_entries
                    },
                )
                for unnested_table_name in destination_tables_to_unnested_tables.get(table_name, []):
                    yield Output(value=None, output_name=unnested_table_name)
            else:
                yield materialization
this is pretty similar to your proposal, but is a little "more accurate", as it will only yield an output for the transformed tables if the untransformed table is created. this is nice in cases where (for example) you remove one of the source tables from the connection, but don't update the dagster code
t
yes, actually I already implemented something similar a few days back https://github.com/dagster-io/dagster/pull/9561/files
your looks better a bit as it passes a map and could keep the dependency I guess
w/r/t naming, you can run your own custom transformations in airflow, thus I prefer something like “transformation tables”, as these can be not just unnested tables
also, when I execute airbyte jobs on kubernetes, I do not see the logs on dagster console. might be a config issue on my side, but while I can see logs in multiprocess logs I cannot in k8s
o
yeah your solution seems totally reasonable, and if that feels more ergonomic there's definitely nothing wrong w/ it. re: the logs issue, you may need to set up a compute log manager to get the unstructured logs in from stdout/stderr (https://docs.dagster.io/deployment/dagster-instance#compute-log-storage)
t
thanks. regarding next steps, if adding tests and documentation any chances it can be merged to the main code base?
to avoid maintaining our own version
o
hi @Tamas Foldi! We're actually looking to include this functionality in next week's release (pr: https://github.com/dagster-io/dagster/pull/9655/files)
t
looks awesome, really appreciate it
maybe a parameter to add the generated assets to a asset group would my last wish
o
we're adding a general utility function
with_group
in this week's release, so you'll be able to do
with_group(build_airbyte_assets(...), "mygroup")
😄
t
you guys are good in mind reading
b
Piggybacking this thread because it seems like it's closely related. I'm getting the "not fired" error on Airbyte, even though the materialization works and the names seem to match my dbt setup. Airbyte is using raw JSON output. Got a very similar issue with s3 as an output as well. The Materializaition looks right, the Airbyte returns successfully, and it is updating in the destination database. When I run the DBT alone it also executes
o
hey @Bit Unwise! My guess is that you're calling something to the effect of
build_airbyte_assets(connection_id=..., destination_tables=["volleyball_volleyball"], asset_key_prefix=["airbyte_volleyball_volleyball"])
, is that accurate? If so, I think you'd want to swap out
destination_tables=["airbyte_volleyball_details"]
this error message is pretty unhelpful in providing that information, but the basic idea is that when you run a sync, airbyte provides some metadata telling us what tables it actually produced. If that table name matches one of the table names you provided, then we create an output for that (as well as an asset materialization event), but if it doesn't match one of the destination table names, we just create the materialization, which is happening here
b
I see, so I need to tweak e.g. DBT for the downstream pieces? I had it listed like this:
Copy code
version: 2

sources:
  # VOLLEYBALL
  - name: airbyte_volleyball_volleyball
    schema: airbyte_volleyball_volleyball
    tables:
      - name: volleyball_volleyball
        identifier: _airbyte_raw_airbyte_volleyball_details
in
sources.yml
i did try
_airbyte_raw_airbyte_volleyball_details
as well earlier (the output from the final table in airbyte), and thought I got the same error. rerunning now
o
tweaking the dbt source name to match what airbyte's generating (so table name = airbyte_volleball_details") would probably be the most straightforward route. alternatively, you can customize how dagster generates asset keys from your sources.yaml file using the node_info_to_asset_key parameter, if you'd prefer not to have to edit your dbt project
b
strange. same error for
airbyte_volleyball_details
(the name of the stream) and
_airbyte_raw_airbyte_volleyball_details
(the final output table). Airbyte logs, in case I'm just doing something silly -- quite likely (
op 'airbyte_sync_49b83' did not fire outputs {'airbyte_volleball_details'}
)
Copy code
2022-09-14 22:26:54 destination > 2022-09-14 22:26:54 INFO i.a.i.d.r.InMemoryRecordBufferingStrategy(lambda$flushAll$1):86 - Flushing airbyte_volleyball_details: 34 records (1 MB)
2022-09-14 22:26:54 destination > 2022-09-14 22:26:54 INFO i.a.i.d.j.SqlOperations(onDestinationCloseOperations):137 - No onDestinationCloseOperations required for this destination.
2022-09-14 22:26:54 destination > 2022-09-14 22:26:54 INFO i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):162 - Finalizing tables in destination started for 1 streams
2022-09-14 22:26:54 destination > 2022-09-14 22:26:54 INFO i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):167 - Finalizing stream airbyte_volleyball_details. schema airbyte_volleyball_volleyball, tmp table _airbyte_tmp_fnm_airbyte_volleyball_details, final table _airbyte_raw_airbyte_volleyball_details
2022-09-14 22:26:54 destination > 2022-09-14 22:26:54 INFO i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):180 - Executing finalization of tables.
2022-09-14 22:26:54 destination > 2022-09-14 22:26:54 INFO i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):182 - Finalizing tables in destination completed.
2022-09-14 22:26:54 destination > 2022-09-14 22:26:54 INFO i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):185 - Cleaning tmp tables in destination started for 1 streams
2022-09-14 22:26:54 destination > 2022-09-14 22:26:54 INFO i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):189 - Cleaning tmp table in destination started for stream airbyte_volleyball_details. schema airbyte_volleyball_volleyball, tmp table name: _airbyte_tmp_fnm_airbyte_volleyball_details
2022-09-14 22:26:54 destination > 2022-09-14 22:26:54 INFO i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):194 - Cleaning tmp tables in destination completed.
2022-09-14 22:26:54 destination > 2022-09-14 22:26:54 INFO i.a.i.b.IntegrationRunner(runInternal):171 - Completed integration: io.airbyte.integrations.base.ssh.SshWrappedDestination
2022-09-14 22:26:54 destination > 2022-09-14 22:26:54 INFO i.a.i.d.p.PostgresDestination(main):92 - completed destination: class io.airbyte.integrations.destination.postgres.PostgresDestination
o
"airbyte_volleball_details" -> "airbyte_volleyball_details" (I think that was originally my typo, sorry blob smile sweat )
b
goodness gracious. Turns out I just can't type, and it works fine when using the
Stream Name
. Super helpful, thank you!
o
glad it's working!
🌈 1
❤️ 1