Charles Couble
03/09/2023, 5:26 PMWe have OPS that serves as Extract & Load steps. Those ops are used to define an asset. Lets call this asset "RAW_DATA"
I want my DBT assets (stage / model = transform steps) to come after these assets / steps. Which means my DBT_ASSETS should depend on "RAW_DATA"Therefore, as DBT Assets are loaded from the : load_assets_from_dbt_project Function, I cannot access the function definition. Does anyone have any idea how to define asset dependency in that particular case ? 🙏
rex
03/10/2023, 2:29 AMRAW_DATA
as a source in your dbt project, if you use the same asset key RAW_DATA
to describe a software defined asset in Dagster, the dependencies will automatically be linked.rex
03/10/2023, 2:29 AMCharles Couble
03/13/2023, 2:37 PMAssetKey(["social-media", "profiles"])
I want to create a Python asset, that needs to be executed before this asset.
From the Dagster doc, I can see ways to define an asset that needs to be executed after a DBT Asset by using the asset decorater parameter "ins" aswell as the class AssetIn.
from dagster import AssetIn, asset
@asset(key_prefix=["one", "two", "three"])
def upstream_asset():
return [1, 2, 3]
@asset(ins={"upstream_asset": AssetIn(key_prefix="one/two/three")})
def downstream_asset(upstream_asset):
return upstream_asset + [4]
Is there a way to do something similar to this, but instead of defining the asset input , to define the asset output ?rex
03/13/2023, 2:45 PMload_assets_from_dbt_project
, the sources from (2) are loaded as upstream dependencies for your models. The sources from (2) have an asset key that corresponds to its definition in the dbt project.
4. If you define SDAs (your EL code from (1)) that have the same asset key as the sources from (3), then the dependency will be encoded. Meaning, (1) will be upstream of (2).
Does that make sense? Essentially, we are inferring the “ins” using the dbt project structure.Charles Couble
03/13/2023, 4:21 PMCharles Couble
03/13/2023, 4:21 PMrex
03/13/2023, 5:28 PMowen
03/13/2023, 6:18 PMquintly_ingest
) which maps to multiple different asset keys (i.e. ["QUNITLY", "instagram"]
, ["QUNITLY", "something_else"]
).
For this, you'll need to use the @multi_asset
decorator (docs here: https://docs.dagster.io/concepts/assets/multi-assets#multi-assets). Essentially, you have to be explicit about the asset keys this python operation updates, there's no concept of "this will update all asset keys with the QUINTLY
prefix".
So this would look something like
@multi_asset(
outs={
"instagram": AssetOut(key_prefix="QUINTLY"),
... # same for other tables updated by this
}
)
def my_multi_asset(context):
# do thing...
# let dagster know what things were updated
for output_name in context.selected_output_names:
yield Output(None, output_name)
Charles Couble
03/13/2023, 8:14 PMAh I see -- it seems like you want to have a single operation (Exactly ! I've been trying to implement multi assets, therefore it seems like defining a multi asset with the same key doesn't update it, but generates an error :) which maps to multiple different asset keys (i.e.quintly_ingest
,["QUNITLY", "instagram"]
).["QUNITLY", "something_else"]
Error loading repository location dagster_project:dagster._core.errors.DagsterInvalidDefinitionError: Duplicate asset key: AssetKey(['QUINTLY', 'instagram'])
owen
03/14/2023, 9:04 PMinstagram
), it just creates a reference from the downstream assets to that source. So adding a new definition shouldn't cause any problems there. My best guess is that somehow multiple copies of your multi_asset are making their way into your repository/Definitions, but it's possible something else is going onCharles Couble
03/15/2023, 9:36 AMowen
03/15/2023, 4:14 PM