Good morning everyone! :catjam: Is it possible to ...
# ask-community
l
Good morning everyone! catjam Is it possible to assign one unique Asset Key to a dbt asset group in Dagster? I have some asset groups, automatically defined by using folder division in my dbt project and by then importing all the dbt models as assets via
dbt_assets = load_assets_from_dbt_project(project_dir=DBT_PROJECT_DIR)
What I would like to do is to start a generic Python asset via schedule, and then, after this asset is materialized, I would like to run an asset group of dbt models. After the materialization of this asset group, I am going to start another dbt materialization, using a sensor. Lastly, by using another sensor, I am going to materialize a final Python step. In order to make it work I need to specify asset keys that describe a whole asset group. Using dbt tags is not working for me.
j
hi @Lorenzo couple of follow up questions - based on your setup, you have: • a standard asset - let’s call it
asset_a
• a bunch of dbt assets with groups - let’s call them
dbt_group_1
and
dbt_group_2
• a schedule that materializes
asset_a
then
dbt_group_1
• a sensor that then materializes
dbt_group_2
• a sensor that then materializes a final asset, let’s say
asset_b
is this correct? is the issue that the groups named
dbt_group_1
and
dbt_group_2
aren’t being correctly applied to the dbt assets? that specifying a group to materialize in the sensor/schedule isn’t working as expected? something else?
l
Hi @jamie! I've updated with few additional details your recap, plus I am attaching the picture of my DAG: • a standard asset - let’s call it
asset_a
• a bunch of dbt assets with groups - let’s call them
dbt_group_1
and
dbt_group_2
• a schedule that materializes
asset_a
• a sensor that checks for the materialization of
asset_a
and starts
dbt_group_1
materialization. • a sensor that that checks for the materialization of
dbt_group_1
then materializes
dbt_group_2
• a sensor that that checks for the materialization of
dbt_group_2
and then materializes a final asset, let’s say
asset_b
The issue is that I am not able to connect my assets as I would like to do. For example: I can't find a way to connect the
asset_a
, with his dependency, which should be
dbt_group_1
. Same issue with the final asset,
asset_b
. I would like to have a linear workflow where all my assets are connected so that I can see exactly where something "goes wrong", fix the issue, and then re-start my pipeline from the point of failure.
j
ah ok, so to try to translate the picture to our small example - you want
asset_a
to have
dbt_group_1
as downstream dependencies, and you want
asset_b
to have upstream dependencies on
dbt_group_2
If that is the case, I think the issue is a name matching one - the name of
asset_a
needs to match the name of the data source for your dbt models in
sources.yml
. Connecting the upstream dependencies for
asset_b
might be a bit more complicated. One option is to write out all of the dependencies as
AssetIns
to
asset_b
but that could be a huge list based on your graph. I’m not sure if we provide any functionality to say an asset depends on all assets from an assetgroup, i’ll need to confirm that. In the meantime, our dbt tutorial goes over what i just said in more detail (and with examples) so that could be useful to see how to set up some of the dependencies
l
Thank you @jamie. May I ask another question directly to you? 🥲 I am trying to start a dbt run using the dbt run op, but I cannot figure out what is the "context" I also looked into the docs and checked on Slack for similar question but I can't find an example of a working dbt_run_op. This is what I made:
Copy code
@op
def dbt_op():
    cli_resource = dbt_cli_resource(context)
    cli_resource.run(models=["my_model"])
@job
def dbt_job():
    dbt_op()
And then I import my job in the repository obviously.
j
you can just put the context as the first parameter to the op and dagster will autopopulate it!
Copy code
@op
def dbt_op(context):  # add the context here!
    cli_resource = dbt_cli_resource(context)
    cli_resource.run(models=["my_model"])
@job
def dbt_job():
    dbt_op()