https://dagster.io/ logo
#ask-community
Title
# ask-community
l

Lior Lev-Tov

02/12/2023, 12:00 PM
Hey, I'm trying to create a pipeline - Airbyte -> DBT orchestrated by Dagster of course. For the following code -
Copy code
from dagster_airbyte import build_airbyte_assets
from dagster_dbt import load_assets_from_dbt_project
from ..utils.constants import AIRBYTE_CONNECTION_ID, DBT_PROJECT_DIR

airbyte_assets = build_airbyte_assets(
    connection_id=AIRBYTE_CONNECTION_ID,
    destination_tables=["_airbyte_raw_contacts"],
    asset_key_prefix=["stage"],
)

dbt_assets = load_assets_from_dbt_project(

    project_dir=DBT_PROJECT_DIR,

    io_manager_key="db_io_manager",
    key_prefix=["practice"],
    source_key_prefix=["stage"],
)
i get the following error for the Airbyte step: dagster._core.errors.DagsterStepOutputNotFoundError: Core compute for op "airbyte_sync_c8615" did not return an output for non-optional output "_airbyte_raw_contacts" more context - this table is generated by Airbyte and is being updated by the step, but it seems like Airbyte unable to signal Dagster that the output is there. what i'm missing?
d

Dusty Shapiro

02/13/2023, 2:14 PM
I would post this in the #dagster-airbyte channel, as I am curious as well
o

owen

02/13/2023, 11:49 PM
hi @Lior Lev-Tov! I'm not the biggest expert here, but it's definitely possible that the internal name that airbyte is creating for that sync'd table differs from its actual table name. In this case, it might consider the stream name to be just
"contacts"
rather than
_airbyte_raw_contacts
and thus is looking for that rather than what you've specified. I'd definitely give
destination_tables=["contacts"]
a try, but if that doesn't work I'm happy to dig a bit deeper here
2 Views