hey folks. I just upgraded to `dagster@1.4.7` and ...
# integration-dbt
r
hey folks. I just upgraded to
dagster@1.4.7
and
dagster-dbt@0.20.7
all my DBT assets are partitioned daily, but they have different start dates depending on when they were added to the repo. previously I was looping over a config that paired each dbt model name with a start date, and loaded them one by one e.g.,
Copy code
my_assets = [
    load_assets_from_dbt_manifest(
        manifest=my_manifest,
        select=asset_name, 
        partitions_def=DailyPartitionsDefinition(start_date=asset_start_date),
        ...,
    )[0]
    for asset_name, asset_start_date in my_config
]
but I see
partitions_def
is now deprecated in this method, and
@dbt_assets
is needed in order to provide a partitions definition. what is the latest and greatest way to load these assets into Dagster? are we moving away from Dagster partitions? I could also convert this to a schedule, as long as there's a good way to feed the scheduled date into DBT (like the old
partition_key_to_vars_fn
)
r
Yeah, if you take a look at the migration guide for 1.4, it’s deemed superfluous in
@dbt_assets
. This is because you can define your
partition_key_to_vars_fn
in the body of your decorated function. So when migrating, you could do something like this:
Copy code
@dbt_assets(manifest=..., partition_def=...)
def my_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource):
    vars = partition_key_to_vars_fn(context.partition_key)
    
    yield from dbt.cli(["run", "--vars", json.dumps(vars)], context=context)
Our docs are here: https://docs.dagster.io/integrations/dbt/reference#building-incremental-models-using-partitions
r
@rex how does that code snippet allow me to pass in different partition start dates for my assets? that looks like one definition for all assets in the manifest
r
You can define different
@dbt_assets
decorated functions, similar to how you programmatically generating them in your snippet: https://dagster.slack.com/archives/C04CW71AGBW/p1692019873147479?thread_ts=1692019552.965309&cid=C04CW71AGBW
👍 1
r
@rex thanks! I'll give this a go. also will look into just making this a daily schedule since that seems more straightforward. in that case is it also required to explicitly spell out
--vars
to pass in the scheduled date to DBT?
r
yeah, that’s required since that’s what
partition_key_to_vars_fn
is doing in the implementation of
load_assets_from_dbt_manifest
r
@rex I started moving this over to a schedule, following the docs you linked. I need to pipe through the scheduled date to the DBT CLI, so my plan was to use the
ScheduleDefinition
run_config_fn
to pass in
context.scheduled_execution_time
. is this correct? I don't see a
config_schema
kwarg on
@dbt_assets
so I'm not sure if there's a better way to access the scheduled datetime within
@dbt_assets
r
You can use pythonic configuration. Check the API docs: https://docs.dagster.io/_apidocs/libraries/dagster-dbt#dagster_dbt.dbt_assets
Copy code
from pathlib import Path

from dagster import Config, OpExecutionContext
from dagster_dbt import DagsterDbtTranslator, DbtCliResource, dbt_assets


class MyDbtConfig(Config):
    full_refresh: bool


@dbt_assets(manifest=Path("target", "manifest.json"))
def my_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource, config: MyDbtConfig):
    dbt_build_args = ["build"]
    if config.full_refresh:
        dbt_build_args += ["--full-refresh"]

    yield from dbt.cli(dbt_build_args, context=context).stream()
r
ah ok I'll try that. would I still pass in the value of the config via `ScheduleDefinition`'s
run_config_fn
?
r
yup
👍 1
r
cool thanks rex. are there plans to support the standard
config_schema
to keep the same config interface as ops and assets?
r
no plans, since this Pythonic configuration is the new interface going forward as of 1.3: https://docs.dagster.io/concepts/configuration/config-schema
👍 1
r
@rex a couple of questions about this approach
Copy code
@dbt_assets(manifest=manifest_path)
def my_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource):
    yield from dbt.cli(["run"], context=context).stream()
• it looks like all DBT models will launch from a single process via
dbt run --select <all models>
. is there a scale at which this will choke? just want to be aware of any potential issues in the future as our model repo is growing very quickly • each dbt model run is a step in the job execution. do retries work as expected here? I had a failure after this cutover where it looks like even the successful steps reran. retries are configured via
Copy code
build_schedule_from_partitioned_job(
    name="daily_dbt_assets",
    job=define_asset_job(
        name="daily_dbt_assets",
        selection=build_dbt_asset_selection(
            [dbt_models],
            dbt_select="tag:daily",
        ),
        tags={
            "dagster/max_retries": 3,
            "dagster/retry_strategy": "FROM_FAILURE",
        },
    ),
),