Rob Sicurelli
08/22/2023, 5:52 PMdagster@1.4.7
and dagster-dbt@0.20.7
all my DBT assets are partitioned daily, but they have different start dates depending on when they were added to the repo.
previously I was looping over a config that paired each dbt model name with a start date, and loaded them one by one e.g.,
my_assets = [
load_assets_from_dbt_manifest(
manifest=my_manifest,
select=asset_name,
partitions_def=DailyPartitionsDefinition(start_date=asset_start_date),
...,
)[0]
for asset_name, asset_start_date in my_config
]
but I see partitions_def
is now deprecated in this method, and @dbt_assets
is needed in order to provide a partitions definition.
what is the latest and greatest way to load these assets into Dagster? are we moving away from Dagster partitions? I could also convert this to a schedule, as long as there's a good way to feed the scheduled date into DBT (like the old partition_key_to_vars_fn
)rex
08/22/2023, 5:57 PM@dbt_assets
.
This is because you can define your partition_key_to_vars_fn
in the body of your decorated function.
So when migrating, you could do something like this:
@dbt_assets(manifest=..., partition_def=...)
def my_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource):
vars = partition_key_to_vars_fn(context.partition_key)
yield from dbt.cli(["run", "--vars", json.dumps(vars)], context=context)
Our docs are here: https://docs.dagster.io/integrations/dbt/reference#building-incremental-models-using-partitionsRob Sicurelli
08/22/2023, 6:02 PMrex
08/22/2023, 6:03 PM@dbt_assets
decorated functions, similar to how you programmatically generating them in your snippet:
https://dagster.slack.com/archives/C04CW71AGBW/p1692019873147479?thread_ts=1692019552.965309&cid=C04CW71AGBWRob Sicurelli
08/22/2023, 6:53 PM--vars
to pass in the scheduled date to DBT?rex
08/22/2023, 7:05 PMpartition_key_to_vars_fn
is doing in the implementation of load_assets_from_dbt_manifest
Rob Sicurelli
08/23/2023, 3:05 PMScheduleDefinition
run_config_fn
to pass in context.scheduled_execution_time
. is this correct? I don't see a config_schema
kwarg on @dbt_assets
so I'm not sure if there's a better way to access the scheduled datetime within @dbt_assets
rex
08/23/2023, 3:08 PMfrom pathlib import Path
from dagster import Config, OpExecutionContext
from dagster_dbt import DagsterDbtTranslator, DbtCliResource, dbt_assets
class MyDbtConfig(Config):
full_refresh: bool
@dbt_assets(manifest=Path("target", "manifest.json"))
def my_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource, config: MyDbtConfig):
dbt_build_args = ["build"]
if config.full_refresh:
dbt_build_args += ["--full-refresh"]
yield from dbt.cli(dbt_build_args, context=context).stream()
Rob Sicurelli
08/23/2023, 3:10 PMrun_config_fn
?rex
08/23/2023, 3:11 PMRob Sicurelli
08/23/2023, 3:15 PMconfig_schema
to keep the same config interface as ops and assets?rex
08/23/2023, 3:16 PMRob Sicurelli
08/24/2023, 5:37 PM@dbt_assets(manifest=manifest_path)
def my_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["run"], context=context).stream()
• it looks like all DBT models will launch from a single process via dbt run --select <all models>
. is there a scale at which this will choke? just want to be aware of any potential issues in the future as our model repo is growing very quickly
• each dbt model run is a step in the job execution. do retries work as expected here? I had a failure after this cutover where it looks like even the successful steps reran. retries are configured via
build_schedule_from_partitioned_job(
name="daily_dbt_assets",
job=define_asset_job(
name="daily_dbt_assets",
selection=build_dbt_asset_selection(
[dbt_models],
dbt_select="tag:daily",
),
tags={
"dagster/max_retries": 3,
"dagster/retry_strategy": "FROM_FAILURE",
},
),
),