Peter Lim
08/14/2023, 7:26 PMTim Castillo
08/14/2023, 8:28 PMdbt_assets
without using Ops-directly.
When you load your project with dbt_assets
, you can split it into multiple dbt_assets
definitions, each one with a dbt selector choosing a subset of the project.
Syntax might be off but you can do:
@dbt_assets(manifest=dbt_manifest_path)
def upstream_models(context: OpExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build", "-s", "+model"], context=context).stream()
@dbt_assets(manifest=dbt_manifest_path)
def downstream_models(context: OpExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build", "-s", "model+"], context=context).stream()
@dbt_assets(manifest=dbt_manifest_path)
def finance_models(context: OpExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["run", "-s", "+group:finance+"], context=context).stream()
Let me know if this doesn't help. because there are a few other options available.Peter Lim
08/14/2023, 8:35 PMupstream_models_job
downstream_models_job
finance_models_job
Peter Lim
08/14/2023, 8:36 PMQwame
08/14/2023, 8:51 PM@dbt_assets
to return the list of failed assets in the event logs. Then if you need to rerun from failure or something, you can materialize those assets onlyPeter Lim
08/14/2023, 9:11 PMowen
08/15/2023, 4:44 PM@dbt_assets(manifest=..., select="<group 1 select>")
def group_1_dbt_assets(
context: OpExecutionContext,
dbt: DbtCliResource,
):
yield from dbt.cli(["run"], context=context)
# same thing for groups 2 and 3
From there, you can define a single job to run all of your dbt assets, doing something like:
just_dbt_job = define_asset_job(
"just_dbt",
selection=AssetSelection.assets(
group_1_dbt_assets,
group_2_dbt_assets,
group_3_dbt_assets,
),
)
Because the assets within each of these groups have dependencies between each other, Dagster will be able to hook up the underlying ops to execute in the proper orderPeter Lim
08/15/2023, 4:47 PMowen
08/15/2023, 4:49 PMowen
08/15/2023, 4:49 PMPeter Lim
08/15/2023, 4:50 PMPeter Lim
08/15/2023, 4:50 PMPeter Lim
08/15/2023, 7:51 PMowen
08/15/2023, 8:50 PMPeter Lim
08/16/2023, 6:28 PM```@dbt_assets(manifest=..., select="<group 1 select>")
def group_1_dbt_assets(
context: OpExecutionContext,
dbt: DbtCliResource,
):
yield from dbt.cli(["run"], context=context)
# same thing for groups 2 and 3```^ replace “run” with “test” and add an asset prefix to change the name the model comes in? I’m trying to do this but have naming collision with the dbt run models
owen
08/16/2023, 6:40 PM@dbt_assets(manifest=..., select="<group 1 select>")
def group_1_dbt_assets(
context: OpExecutionContext,
dbt: DbtCliResource,
):
yield from dbt.cli(["run"], context=context)
yield from dbt.cli(["test"], select="<some selection of tests you want to run>")
Peter Lim
08/16/2023, 7:02 PM