Lorenzo
02/03/2023, 3:55 PMjamie
02/03/2023, 4:06 PMdefine_asset_job
to create a job that will materialize a set of assets and then running the job. if you do that you should see the asset materialization in the asset group sectionLorenzo
02/03/2023, 4:11 PM@op(
required_resource_keys={"dbt"},
description="An operation that starts the materialization of TMP models"
)
def dbt_tmp_models_op(context):
cli_resource = context.resources.dbt
cli_resource.run(select=["tag:tmp"])
I am using ops because I need to attach some python operations that do not materialize anything, before and after the materialization of dbt Asset Groups. I tried in so many ways to do so by creating assets, but I could not figure out a way to make them work like this.jamie
02/03/2023, 4:14 PMdbt run
and it won’t hook into the dagster machinery to tell us that certain assets have been materialized. tagging @owen since he might have some advice tooLorenzo
02/03/2023, 4:39 PMowen
02/03/2023, 9:12 PMload_assets_from...(
...,
node_info_to_asset_key = lambda node_info: ["dbt"] + node_info["unique_id"].split(".")
)
Lorenzo
02/06/2023, 10:36 AMdbt_assets = load_assets_from_dbt_project(project_dir=DBT_PROJECT_DIR, node_info_to_asset_key = lambda node_info: AssetKey((node_info["unique_id"].split("."))))
#
dbt_assets = load_assets_from_dbt_project(project_dir=DBT_PROJECT_DIR, node_info_to_asset_key = lambda node_info: AssetKey(str(["dbt"] + [node_info["unique_id"].split(".")])))
#
dbt_assets = load_assets_from_dbt_project(project_dir=DBT_PROJECT_DIR, node_info_to_asset_key = lambda node_info: AssetKey(str("dbt" + str(node_info["unique_id"])).replace(".","/").replace("dbt","dbt/")))
The last version in particular creates an asset key of this type: dbt/model/odi_replatform_um/TMP_UM_CM_S_CAMPAIGN_P_AU_NEW_RECORDS
that seemed plausible to me. Unfortunately it still doesn't update materializations in the asset-group view.Lorenzo
02/06/2023, 4:19 PMowen
02/06/2023, 5:47 PM1.x
). Here's the implementation that the ops use: https://sourcegraph.com/github.com/dagster-io/dagster/-/blob/python_modules/libraries/dagster-dbt/dagster_dbt/utils.py?L20:5&subtree=true. The ops will also by default add a dbt
prefix to the asset keys, beyond that function. If you don't like having that prefix there, you can remove it when configuring your op (asset_key_prefix).
The issue with your implementation is that the AssetKey object considers AssetKey("some/key")
different from AssetKey(["some", "key"])
. so you'll want
dbt_assets = load_assets_from_dbt_project(project_dir=DBT_PROJECT_DIR, node_info_to_asset_key = lambda node_info: AssetKey(["dbt"] + (node_info["unique_id"].split("."))))
your second option listed above looks very similar to this, but it wraps that in a str()
call, so you're passing in the string s = "['some', 'key']"
instead of the list s = ['some', 'key']
.