Tamas Foldi
08/30/2022, 9:45 PMdagster-airbyte
integration plus some feature ideas (what I can add/contribute). is here anyone who is aware of the airbyte integration? I’d explain it in details what hurts mebuild_airbyte_assets
does not support assets generated in Normalized tabular data transformationop 'airbyte_sync_280a1' did not fire outputs {'src_workflows_customstatuses', 'src_tasks_customfields'}
jamie
08/30/2022, 9:56 PMTamas Foldi
08/30/2022, 10:06 PMsandy
08/30/2022, 10:26 PMwhile these tables were updated by airflow as part of the built-in transformation stage, I cannot tell dagster to assume those tables refreshed.do you mind clarifying what you mean by this a little further?
build_airbyte_assets
needs the table list to know what software-defined assets to create. if the table list is shifting around often, you might want to consider using airbyte_sync_op
instead?build_airbyte_assets
with is_required=False
on the `AssetOut`s. that would allow it to not fire every output every timeTamas Foldi
08/31/2022, 3:10 AMsrc_tasks_customfields
build_airbyte_assets
like src_tasks_customfields
, then I will get this error:build_airbyte_assets
, then my lineage will be broken as some of the downstream tables are using it as an input:sandy
08/31/2022, 2:46 PMbuild_airbyte_assets
in the dagster-airbyte library, define your own:
from typing import List, Optional
from dagster_airbyte.utils import generate_materializations
from dagster import AssetKey, AssetOut, Output
from dagster import _check as check
from dagster._annotations import experimental
from dagster._core.definitions import AssetsDefinition, multi_asset
@experimental
def build_airbyte_assets(
connection_id: str,
destination_tables: List[str],
asset_key_prefix: Optional[List[str]] = None,
) -> List[AssetsDefinition]:
"""
Builds a set of assets representing the tables created by an Airbyte sync operation.
Args:
connection_id (str): The Airbyte Connection ID that this op will sync. You can retrieve this
value from the "Connections" tab of a given connector in the Airbyte UI.
destination_tables (List[str]): The names of the tables that you want to be represented
in the Dagster asset graph for this sync. This will generally map to the name of the
stream in Airbyte, unless a stream prefix has been specified in Airbyte.
asset_key_prefix (Optional[List[str]]): A prefix for the asset keys inside this asset.
If left blank, assets will have a key of `AssetKey([table_name])`.
"""
asset_key_prefix = check.opt_list_param(asset_key_prefix, "asset_key_prefix", of_type=str)
@multi_asset(
name=f"airbyte_sync_{connection_id[:5]}",
outs={
table: AssetOut(key=AssetKey(asset_key_prefix + [table], is_required=False))
for table in destination_tables
},
required_resource_keys={"airbyte"},
compute_kind="airbyte",
)
def _assets(context):
ab_output = context.resources.airbyte.sync_and_poll(connection_id=connection_id)
for materialization in generate_materializations(ab_output, asset_key_prefix):
table_name = materialization.asset_key.path[-1]
if table_name in destination_tables:
yield Output(
value=None,
output_name=table_name,
metadata={
entry.label: entry.entry_data for entry in materialization.metadata_entries
},
)
else:
yield materialization
return [_assets]
this is exactly the same as the one in the dagster-airbyte library, except it marks the outputs as non-required
if this works, we can make this change in the dagster-airbyte library itselfTamas Foldi
08/31/2022, 2:52 PMbuild_airbyte_assets
? Shouldn’t we support to retrieve all streams by default (like in case of load_assets_from_dbt_project
for instance). I feel specifying destination_tables
is unnecessary in 90% of the times as you sync the entire connection.sandy
08/31/2022, 6:07 PMsrc_tasks_customfields
is being updated? is there some way of detecting this with Airbyte's APIs?another question: why do we have to specify the table list forit's because we need to construct the asset definitions ahead of when we trigger the syncs. otherwise, we wouldn't know what to display in the asset graph until someone launches a run. that said, we've explored an approach that would query the airbyte API at the time?build_airbyte_assets
build_airbyte_assets
is invoked. we're interested in adding that soon.Tamas Foldi
08/31/2022, 6:10 PM❯ curl -X POST "<http://airbyte-webapp-svc/api/v1/operations/list>" --data '{"connectionId": "280a16a9-64c4-4d90-b2ed-0bd9882b89ff"}' -H "Content-Type: application/json" |jq
{
"operations": [
{
"workspaceId": "c2b8be7c-aad2-45a3-b07e-8b3d1ab550fe",
"operationId": "6758d834-80f8-4a92-9a1f-bdab3ea1d8bd",
"name": "Normalization",
"operatorConfiguration": {
"operatorType": "normalization",
"normalization": {
"option": "basic"
}
}
}
]
}
api/v1/connections/get
sandy
09/02/2022, 4:23 AMTamas Foldi
09/02/2022, 3:15 PMowen
09/07/2022, 9:46 PMdestination_table_to_unnested_tables
mapping parameter (bad name, but that sort of idea), which maps the base table to a set of transformed, unnested tables. so for your case that would look like
{
"src_tasks": {"src_tasks_customfields"},
"src_workflows": {"src_workflows_customstatuses"},
}
then adding those values as additional destination_tables
. Finally, you could change the logic inside the asset to look like
def _assets(context):
ab_output = context.resources.airbyte.sync_and_poll(connection_id=connection_id)
for materialization in generate_materializations(ab_output, asset_key_prefix):
table_name = materialization.asset_key.path[-1]
if table_name in destination_tables:
yield Output(
value=None,
output_name=table_name,
metadata={
entry.label: entry.entry_data for entry in materialization.metadata_entries
},
)
for unnested_table_name in destination_tables_to_unnested_tables.get(table_name, []):
yield Output(value=None, output_name=unnested_table_name)
else:
yield materialization
Tamas Foldi
09/08/2022, 6:10 AMowen
09/08/2022, 4:00 PMTamas Foldi
09/09/2022, 3:12 AMowen
09/09/2022, 11:17 PMTamas Foldi
09/11/2022, 6:48 PMowen
09/12/2022, 11:32 PMwith_group
in this week's release, so you'll be able to do with_group(build_airbyte_assets(...), "mygroup")
😄Tamas Foldi
09/13/2022, 10:55 AMBit Unwise
09/14/2022, 10:24 PMowen
09/14/2022, 10:31 PMbuild_airbyte_assets(connection_id=..., destination_tables=["volleyball_volleyball"], asset_key_prefix=["airbyte_volleyball_volleyball"])
, is that accurate? If so, I think you'd want to swap out destination_tables=["airbyte_volleyball_details"]
Bit Unwise
09/14/2022, 10:33 PMversion: 2
sources:
# VOLLEYBALL
- name: airbyte_volleyball_volleyball
schema: airbyte_volleyball_volleyball
tables:
- name: volleyball_volleyball
identifier: _airbyte_raw_airbyte_volleyball_details
in sources.yml
_airbyte_raw_airbyte_volleyball_details
as well earlier (the output from the final table in airbyte), and thought I got the same error. rerunning nowowen
09/14/2022, 10:37 PMBit Unwise
09/14/2022, 10:41 PMairbyte_volleyball_details
(the name of the stream) and _airbyte_raw_airbyte_volleyball_details
(the final output table). Airbyte logs, in case I'm just doing something silly -- quite likely (op 'airbyte_sync_49b83' did not fire outputs {'airbyte_volleball_details'}
)
2022-09-14 22:26:54 destination > 2022-09-14 22:26:54 INFO i.a.i.d.r.InMemoryRecordBufferingStrategy(lambda$flushAll$1):86 - Flushing airbyte_volleyball_details: 34 records (1 MB)
2022-09-14 22:26:54 destination > 2022-09-14 22:26:54 INFO i.a.i.d.j.SqlOperations(onDestinationCloseOperations):137 - No onDestinationCloseOperations required for this destination.
2022-09-14 22:26:54 destination > 2022-09-14 22:26:54 INFO i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):162 - Finalizing tables in destination started for 1 streams
2022-09-14 22:26:54 destination > 2022-09-14 22:26:54 INFO i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):167 - Finalizing stream airbyte_volleyball_details. schema airbyte_volleyball_volleyball, tmp table _airbyte_tmp_fnm_airbyte_volleyball_details, final table _airbyte_raw_airbyte_volleyball_details
2022-09-14 22:26:54 destination > 2022-09-14 22:26:54 INFO i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):180 - Executing finalization of tables.
2022-09-14 22:26:54 destination > 2022-09-14 22:26:54 INFO i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):182 - Finalizing tables in destination completed.
2022-09-14 22:26:54 destination > 2022-09-14 22:26:54 INFO i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):185 - Cleaning tmp tables in destination started for 1 streams
2022-09-14 22:26:54 destination > 2022-09-14 22:26:54 INFO i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):189 - Cleaning tmp table in destination started for stream airbyte_volleyball_details. schema airbyte_volleyball_volleyball, tmp table name: _airbyte_tmp_fnm_airbyte_volleyball_details
2022-09-14 22:26:54 destination > 2022-09-14 22:26:54 INFO i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):194 - Cleaning tmp tables in destination completed.
2022-09-14 22:26:54 destination > 2022-09-14 22:26:54 INFO i.a.i.b.IntegrationRunner(runInternal):171 - Completed integration: io.airbyte.integrations.base.ssh.SshWrappedDestination
2022-09-14 22:26:54 destination > 2022-09-14 22:26:54 INFO i.a.i.d.p.PostgresDestination(main):92 - completed destination: class io.airbyte.integrations.destination.postgres.PostgresDestination
owen
09/14/2022, 10:42 PMBit Unwise
09/14/2022, 10:45 PMStream Name
. Super helpful, thank you!owen
09/14/2022, 10:45 PM