load_assets_from_dbt_project : How to define Asse...
# integration-dbt
c
load_assets_from_dbt_project : How to define Asset Dependency ? Let me give some context :
We have OPS that serves as Extract & Load steps. Those ops are used to define an asset. Lets call this asset "RAW_DATA"
I want my DBT assets (stage / model = transform steps) to come after these assets / steps. Which means my DBT_ASSETS should depend on "RAW_DATA"
Therefore, as DBT Assets are loaded from the : load_assets_from_dbt_project Function, I cannot access the function definition. Does anyone have any idea how to define asset dependency in that particular case ? 🙏
âś… 1
r
Hi Charles, if you want these dependencies to be reflected in your dbt project, you should ensure that you’re using the appropriate sources in your dbt project. We map these to assets dependencies in your Dagster project. So, if you set
RAW_DATA
as a source in your dbt project, if you use the same asset key
RAW_DATA
to describe a software defined asset in Dagster, the dependencies will automatically be linked.
c
Hi rex, thanks for your answer, I've looking at the link you forwarded, Therefore i'm not sure it fits my issue ? I will try tto rephrase : My question is the following : Imagine i have DBT Asset that can be selected through this asset key :
AssetKey(["social-media", "profiles"])
I want to create a Python asset, that needs to be executed before this asset. From the Dagster doc, I can see ways to define an asset that needs to be executed after a DBT Asset by using the asset decorater parameter "ins" aswell as the class AssetIn.
Copy code
from dagster import AssetIn, asset


@asset(key_prefix=["one", "two", "three"])
def upstream_asset():
    return [1, 2, 3]


@asset(ins={"upstream_asset": AssetIn(key_prefix="one/two/three")})
def downstream_asset(upstream_asset):
    return upstream_asset + [4]
Is there a way to do something similar to this, but instead of defining the asset input , to define the asset output ?
r
Here are my assumptions: 1. Your EL code defines a set of raw data tables. These are used by your dbt transformations to create models. 2. In dbt, there is a concept of Sources, which exactly maps to (1). In your dbt models, you can specify the sources (the raw data) for your models, which are transformed to create your models. 3. When you call
load_assets_from_dbt_project
, the sources from (2) are loaded as upstream dependencies for your models. The sources from (2) have an asset key that corresponds to its definition in the dbt project. 4. If you define SDAs (your EL code from (1)) that have the same asset key as the sources from (3), then the dependency will be encoded. Meaning, (1) will be upstream of (2). Does that make sense? Essentially, we are inferring the “ins” using the dbt project structure.
c
From your assumptions : I agree with 1,2 and 3. But i'm struggling to make the assumption 4 work.
image.png
r
@owen is there a way to get the exact asset key to use here? From the source asset that’s generated by Dagster, that corresponds to the source in dbt? it’s not enough to have the same key prefix - the entire asset key needs to line up. But I agree (4) is definitely a little tricky right now to get right
o
Ah I see -- it seems like you want to have a single operation (
quintly_ingest
) which maps to multiple different asset keys (i.e.
["QUNITLY", "instagram"]
,
["QUNITLY", "something_else"]
). For this, you'll need to use the
@multi_asset
decorator (docs here: https://docs.dagster.io/concepts/assets/multi-assets#multi-assets). Essentially, you have to be explicit about the asset keys this python operation updates, there's no concept of "this will update all asset keys with the
QUINTLY
prefix". So this would look something like
Copy code
@multi_asset(
    outs={
        "instagram": AssetOut(key_prefix="QUINTLY"),
        ... # same for other tables updated by this
    }
)
def my_multi_asset(context):
    # do thing...
    
    # let dagster know what things were updated
    for output_name in context.selected_output_names:
        yield Output(None, output_name)
đź‘€ 1
c
Ah I see -- it seems like you want to have a single operation (
quintly_ingest
) which maps to multiple different asset keys (i.e.
["QUNITLY", "instagram"]
,
["QUNITLY", "something_else"]
).
Exactly ! I've been trying to implement multi assets, therefore it seems like defining a multi asset with the same key doesn't update it, but generates an error :
Copy code
Error loading repository location dagster_project:dagster._core.errors.DagsterInvalidDefinitionError: Duplicate asset key: AssetKey(['QUINTLY', 'instagram'])
o
hm do you mind sharing your multi_asset definition/ how you're constructing your repository? I suspect that this is an easy fix -- loading assets from your dbt project should not generate an asset definition for your dbt sources (such as
instagram
), it just creates a reference from the downstream assets to that source. So adding a new definition shouldn't cause any problems there. My best guess is that somehow multiple copies of your multi_asset are making their way into your repository/Definitions, but it's possible something else is going on
c
Hi Owen, For some reason, when restarting the project today, I could not reproduce the error. You made the point very clear : 1 - "override" the DBT Source Assets by defining another assets with same "name" & "prefix". 2 - And from here : it is possible to define upstream dependencies as wish. (seems to work on my side) Thank you very much for you help 🙏
o
great! glad it's working now 🙂