https://dagster.io/ logo
#ask-community
Title
# ask-community
j

Jonathan Neo

09/01/2022, 4:48 PM
Hi there, How do I create dependencies between two assets? I want airbyte to run first, then dbt. I’m looking at this repo, but it doesn’t seem to have dependencies between airbyte and dbt.
j

Jonathan Neo

09/01/2022, 4:53 PM
The assets are defined as :
Copy code
airbyte_assets = build_airbyte_assets(
    connection_id=AIRBYTE_CONNECTION_ID,
    destination_tables=["orders", "users"],
    asset_key_prefix=["postgres_replica"],
)


dbt_assets = load_assets_from_dbt_project(
    project_dir=DBT_PROJECT_DIR, io_manager_key="db_io_manager"
)
source: https://github.com/dagster-io/dagster/blob/master/examples/assets_modern_data_stack/assets_modern_data_stack/assets/forecasting.py
i don’t think that would work
f

fahad

09/01/2022, 5:03 PM
Hmm yeah I see the complexity now since the assets get defined internally
Even cross repo assets wouldn’t work I think
a

Adam Bloom

09/01/2022, 5:07 PM
I've got this setup with ops at the moment (dbt op downstream of airbyte op(s)), but very curious how this translates over to assets. I haven't done that conversion yet
j

Jonathan Neo

09/01/2022, 5:17 PM
@Adam Bloom i’m keen to see how it works for ops - do you mind sending me a snippet of your code? (i’ve been stuck on this for quite some time, would appreciate any help!)
j

jamie

09/01/2022, 5:38 PM
hi all, I'm not the most familiar with our airbyte and dbt integrations, but from poking around the docs and source code i don't think this is something we support right now. it seems like a super reasonable feature, so i'll add a GH issue for it
🌈 1
@Dagster Bot issue allow dbt assets to depend on upstream assets
d

Dagster Bot

09/01/2022, 5:39 PM
j

Jonathan Neo

09/01/2022, 5:43 PM
for anyone else stuck on this, i’ve worked around it using
ops
and
@job
Copy code
from dagster import (
    ScheduleDefinition,
    repository, 
    job
)

from dagster_airbyte import airbyte_resource, airbyte_sync_op
from dagster_dbt import dbt_run_op, dbt_cli_resource
from .utils.constants import AIRBYTE_CONNECTION_ID, AIRBYTE_CONFIG, DBT_CONFIG

sync_dvd_rental = airbyte_sync_op.configured({"connection_id": AIRBYTE_CONNECTION_ID}, name="sync_dvd_rental")

@job(resource_defs={
        "airbyte": airbyte_resource.configured(AIRBYTE_CONFIG),
        "dbt": dbt_cli_resource.configured(DBT_CONFIG)
    })
def elt_job():
    dbt_run_op(sync_dvd_rental())

@repository
def elt():
    return [
        # update all assets once a day
        ScheduleDefinition(
            job=elt_job, cron_schedule="@daily"
        )
    ]
a

Adam Bloom

09/01/2022, 6:05 PM
@Jonathan Neo sorry, I was tied up in a few meetings, but looks like you figured it out! I've got mine setup as a job factory that dynamically generates the ops - we've got close to 25 of these, some of them have one airbyte job dependency, some have multiple. they're configuring the dbt op differently to pass in different model selections. Works great for now, figuring out how to move it over to assets will be an adventure that I just haven't needed to tackle yet
🌈 1
j

Jonathan Neo

09/01/2022, 6:05 PM
awesome, thanks for providing more context Adam! 🙂
s

sandy

09/06/2022, 8:53 PM
@Jonathan Neo and @Adam Bloom - we're currently investigating some improvements to the dagster-airbyte integration. Would either of you be up for a chat about Airbyte?
j

Jonathan Neo

09/07/2022, 1:45 AM
@sandy I'm only playing with dagster-airbyte for personal / fun at the moment on my local machine. Nothing productionized. I don't think I'll have much value to contribute. It sounds like Adam has something running in prod.
a

Adam Bloom

09/07/2022, 3:23 PM
@sandy yeah, you bet. happy to chat/brainstorm.
o

owen

09/08/2022, 4:24 PM
hi all! this behavior actually is currently supported w/ assets 🙂 Dagster translates the sources of a dbt project to upstream asset keys. How it actually does this conversion is customizable, but by default it'll just take a table named "my_table" from a source named "foo" and translate that to an asset key of
AssetKey(["foo", "my_table"])
. These upstream asset keys can be assets that are also managed by dagster. So for the case of airbyte, if you have an airbyte sync that produces that key, dagster will know that this airbyte sync should happen before dbt runs. That assets_modern_data_stack repo does that, giving this asset graph:
but yeah the key here is that the all of the dependency information for the dbt assets (including their upstream assets) are defined in the dbt project. So adding an upstream asset that's produced by dagster will involve adding a source to the dbt project itself (if you don't already have a source defined for it)
j

Jonathan Neo

09/13/2022, 1:20 PM
Will take a look, thanks Owen!
8 Views