Hello, is using Ops to materialize dbt assets reco...
# integration-dbt
p
Hello, is using Ops to materialize dbt assets recommended? Our use case is materializing 150+ dbt assets and then running tests afterwards across 100+ partitions. Currently running this with just a single dbt build (we need dbt seed, dbt run, dbt tests) and if one thing fails, all the models will need to be rerun (because of dagster’s single step). If I use ops to materialize these models, can I use the ops to materialize by asset groups? That way I can group those 150+ dbt assets into fewer ops and just have those ops rerun on failure, instead of the whole thing. Thanks!
🤖 1
t
Hi! If I'm reaidng this correctly, it's not that you want 1:1 op:model, right? but something like 150 models across 10 Ops? If that's case, you can leverage
dbt_assets
without using Ops-directly. When you load your project with
dbt_assets
, you can split it into multiple
dbt_assets
definitions, each one with a dbt selector choosing a subset of the project. Syntax might be off but you can do:
Copy code
@dbt_assets(manifest=dbt_manifest_path)
def upstream_models(context: OpExecutionContext, dbt: DbtCliResource):
    yield from dbt.cli(["build", "-s", "+model"], context=context).stream()

@dbt_assets(manifest=dbt_manifest_path)
def downstream_models(context: OpExecutionContext, dbt: DbtCliResource):
    yield from dbt.cli(["build", "-s", "model+"], context=context).stream()

@dbt_assets(manifest=dbt_manifest_path)
def finance_models(context: OpExecutionContext, dbt: DbtCliResource):
    yield from dbt.cli(["run", "-s", "+group:finance+"], context=context).stream()
Let me know if this doesn't help. because there are a few other options available.
p
Hi Tim, that is correct! I’ve considered this method but moving them into jobs would mean using define_asset_job for each of those and have 1 job per declaration? So based on above I’ll have:
Copy code
upstream_models_job
downstream_models_job
finance_models_job
Ideally I want the observability of the jobs based on ops where I can click on the job and see which “ops” failed.. it’s kind of hard to do when grouping them into asset jobs coz I’ll have to click on an asset job -> see failure, move to next asset job
q
You can customize your
@dbt_assets
to return the list of failed assets in the event logs. Then if you need to rerun from failure or something, you can materialize those assets only
p
Hi Qwame, would I be able to set that as a retry hook? Looking at the docs now and it seems that ops have it as a retry function. Do you have an example/snippet of your implementation?
o
Hi @Peter Lim! Some adjustments to the original suggestion -- assuming you have some predefined, non-overlapping, dbt groups, I would first define three separate dbt_assets:
Copy code
@dbt_assets(manifest=..., select="<group 1 select>")
def group_1_dbt_assets(
    context: OpExecutionContext,
    dbt: DbtCliResource,
):
    yield from dbt.cli(["run"], context=context)

# same thing for groups 2 and 3
From there, you can define a single job to run all of your dbt assets, doing something like:
Copy code
just_dbt_job = define_asset_job(
    "just_dbt", 
    selection=AssetSelection.assets(
        group_1_dbt_assets,
        group_2_dbt_assets,
        group_3_dbt_assets,
    ),
)
Because the assets within each of these groups have dependencies between each other, Dagster will be able to hook up the underlying ops to execute in the proper order
p
will this be able to rerun the group it fails? One of the main reasons for slicing these is to have the automatic retry just run a smaller group rather than the entire job (since dbt run uses 1 step in the dagster run)
o
yep -- you'll be able to execute the underlying op graph (in this case, three ops) from the point of failure.
e.g. if something goes wrong in group_2, you can just re-execute group_2 and group_3, without re-executing group_1
p
ooh that is cool! I’ll test this afternoon. Thank you!!
^ if it works then it’s exactly what i’m looking for
yes it worked, thank you!
o
great!
p
Hi @owen, another question - are we able to pull the assets as dbt test? i.e after ods_group_1 we have ods_group_1_test as a dag that continues
```@dbt_assets(manifest=..., select="<group 1 select>")
def group_1_dbt_assets(
context: OpExecutionContext,
dbt: DbtCliResource,
):
yield from dbt.cli(["run"], context=context)
# same thing for groups 2 and 3```
^ replace “run” with “test” and add an asset prefix to change the name the model comes in? I’m trying to do this but have naming collision with the dbt run models
o
so in this case, do you explicitly want the dbt tests to be represented as assets? this isn't currently supported, but if you just want to execute the run command, then the test command, you could do something like
Copy code
@dbt_assets(manifest=..., select="<group 1 select>")
def group_1_dbt_assets(
    context: OpExecutionContext,
    dbt: DbtCliResource,
):
    yield from dbt.cli(["run"], context=context)
    yield from dbt.cli(["test"], select="<some selection of tests you want to run>")
p
Thanks! this works as well. I saw the thread of showing dbt test warnings in the UI (and hopefully makes it easily observable) We were planning this internally and having test levels at “warn” might not warn us enough with the job succeeding, but having them fail outright will prevent the downstream jobs from being run.