Jonathan Neo
09/01/2022, 4:48 PMfahad
09/01/2022, 4:52 PMJonathan Neo
09/01/2022, 4:53 PMairbyte_assets = build_airbyte_assets(
connection_id=AIRBYTE_CONNECTION_ID,
destination_tables=["orders", "users"],
asset_key_prefix=["postgres_replica"],
)
dbt_assets = load_assets_from_dbt_project(
project_dir=DBT_PROJECT_DIR, io_manager_key="db_io_manager"
)
source: https://github.com/dagster-io/dagster/blob/master/examples/assets_modern_data_stack/assets_modern_data_stack/assets/forecasting.pyJonathan Neo
09/01/2022, 4:53 PMfahad
09/01/2022, 5:03 PMfahad
09/01/2022, 5:04 PMAdam Bloom
09/01/2022, 5:07 PMJonathan Neo
09/01/2022, 5:17 PMjamie
09/01/2022, 5:38 PMjamie
09/01/2022, 5:39 PMDagster Bot
09/01/2022, 5:39 PMDagster Bot
09/01/2022, 5:39 PMJonathan Neo
09/01/2022, 5:43 PMops
and @job
from dagster import (
ScheduleDefinition,
repository,
job
)
from dagster_airbyte import airbyte_resource, airbyte_sync_op
from dagster_dbt import dbt_run_op, dbt_cli_resource
from .utils.constants import AIRBYTE_CONNECTION_ID, AIRBYTE_CONFIG, DBT_CONFIG
sync_dvd_rental = airbyte_sync_op.configured({"connection_id": AIRBYTE_CONNECTION_ID}, name="sync_dvd_rental")
@job(resource_defs={
"airbyte": airbyte_resource.configured(AIRBYTE_CONFIG),
"dbt": dbt_cli_resource.configured(DBT_CONFIG)
})
def elt_job():
dbt_run_op(sync_dvd_rental())
@repository
def elt():
return [
# update all assets once a day
ScheduleDefinition(
job=elt_job, cron_schedule="@daily"
)
]
Adam Bloom
09/01/2022, 6:05 PMJonathan Neo
09/01/2022, 6:05 PMsandy
09/06/2022, 8:53 PMJonathan Neo
09/07/2022, 1:45 AMAdam Bloom
09/07/2022, 3:23 PMowen
09/08/2022, 4:24 PMAssetKey(["foo", "my_table"])
. These upstream asset keys can be assets that are also managed by dagster. So for the case of airbyte, if you have an airbyte sync that produces that key, dagster will know that this airbyte sync should happen before dbt runs. That assets_modern_data_stack repo does that, giving this asset graph:owen
09/08/2022, 4:26 PMJonathan Neo
09/13/2022, 1:20 PM