Hi all! :rainbow-daggy: I have a dagster + dbt pro...
# ask-community
l
Hi all! 🌈 I have a dagster + dbt project where I am using an @op to run a group of assets (aka dbt models). After the run, in the "assets group" section of dagit, I can't see the "last materialization" of my assets. The models were correctly ran by my op. Is there a way to automatically update the "state" of materialization of my assets, in the assets groups window of Dagit, after I ran them with an op? I tried using AssetsMaterialization, but it's not the right choice because if one of my models returns an error, the whole OP stops and the AssetsMaterialization step is not even executed. I am thinking at something like "refresh manifest.json" for example. So the my nodes materializations would be updated with ease and I could fix the problem in DBT, go to my asset group in Dagit and relaunch only stale and missing assets.
j
Hey @Lorenzo can you share how you’re using an op to run these assets? In general we recommend using
define_asset_job
to create a job that will materialize a set of assets and then running the job. if you do that you should see the asset materialization in the asset group section
l
Hi @jamie , you helped me yesterday to set up this op. 😂 The code is this:
Copy code
@op(
    required_resource_keys={"dbt"},
    description="An operation that starts the materialization of TMP models"
)
def dbt_tmp_models_op(context):
    cli_resource = context.resources.dbt
    cli_resource.run(select=["tag:tmp"])
I am using ops because I need to attach some python operations that do not materialize anything, before and after the materialization of dbt Asset Groups. I tried in so many ways to do so by creating assets, but I could not figure out a way to make them work like this.
j
ah i see! sorry for forgetting, lots of folks to help out this week! So using the dbt resource to execute a dbt run command will just run a bare bones
dbt run
and it won’t hook into the dagster machinery to tell us that certain assets have been materialized. tagging @owen since he might have some advice too
❤️ 1
l
Just for context: this is what I originally wanted to do. Doing this only with assets seems impossible, so I switched to an op-based approach. • The python steps do not materialize any data, dbt doesn't need data from these steps, they are generic. (E.G. an HTTP request) • There is dependency between the python steps and the dbt asset group (arrows), because the materialization of all the models in my group should start right after the first python step. • If one of the steps breaks, whether it's one of the python steps or just one model inside the asset group, I want to restart the computation from that point onwards.
o
hi @Lorenzo! the assets in the graph are not updating because the asset keys that are emitted by the op are different from the ones created for the assets. One way to fix this would be by making the asset key generated for the dbt assets align with the one generated from the op. I think you could do this with something like:
Copy code
load_assets_from...(
    ...,
    node_info_to_asset_key = lambda node_info: ["dbt"] + node_info["unique_id"].split(".")
)
❤️ 1
l
Thank you @owen. Could you please share a little more about how this statement works/the creation of an asset key from an OP works? In particular I'd like to know the difference between an asset-key from a normal dbt asset and an asset-key produced by a dbt OP. - I changed your code a little bit because it wasn't working. Now I can notice a change in the asset keys of my project, but the materializations launched by the OP, aren't reflected on the assets DAGs yet. I am trying to reverse-engineer the creation of the asset key. 👀 These are the different versions of your statement that I tried:
Copy code
dbt_assets = load_assets_from_dbt_project(project_dir=DBT_PROJECT_DIR, node_info_to_asset_key = lambda node_info: AssetKey((node_info["unique_id"].split("."))))
#
dbt_assets = load_assets_from_dbt_project(project_dir=DBT_PROJECT_DIR, node_info_to_asset_key = lambda node_info: AssetKey(str(["dbt"] + [node_info["unique_id"].split(".")])))
#
dbt_assets = load_assets_from_dbt_project(project_dir=DBT_PROJECT_DIR, node_info_to_asset_key = lambda node_info: AssetKey(str("dbt" + str(node_info["unique_id"])).replace(".","/").replace("dbt","dbt/")))
The last version in particular creates an asset key of this type:
dbt/model/odi_replatform_um/TMP_UM_CM_S_CAMPAIGN_P_AU_NEW_RECORDS
that seemed plausible to me. Unfortunately it still doesn't update materializations in the asset-group view.
Tagging @owen because I've edited the original message.
o
This confusion comes about from the fact that ops have a different default way of transforming dbt node info to dagster asset keys (basically because the op implementation is legacy and we don't want to change it until the dagster-dbt library goes to
1.x
). Here's the implementation that the ops use: https://sourcegraph.com/github.com/dagster-io/dagster/-/blob/python_modules/libraries/dagster-dbt/dagster_dbt/utils.py?L20:5&subtree=true. The ops will also by default add a
dbt
prefix to the asset keys, beyond that function. If you don't like having that prefix there, you can remove it when configuring your op (asset_key_prefix). The issue with your implementation is that the AssetKey object considers
AssetKey("some/key")
different from
AssetKey(["some", "key"])
. so you'll want
Copy code
dbt_assets = load_assets_from_dbt_project(project_dir=DBT_PROJECT_DIR, node_info_to_asset_key = lambda node_info: AssetKey(["dbt"] + (node_info["unique_id"].split("."))))
your second option listed above looks very similar to this, but it wraps that in a
str()
call, so you're passing in the string
s = "['some', 'key']"
instead of the list
s = ['some', 'key']
.