Alexander Gregory
03/13/2024, 11:58 AMScout
03/13/2024, 11:58 AM--full-refresh
option periodically, you can define a job that specifies this option in the run configuration. The documentation provides an example of how to use --full-refresh
with the @dbt_assets
decorator, which allows you to pass custom configuration to the underlying operation that backs your dbt assets. Here's an example based on the documentation: python from pathlib import Path from dagster import AssetExecutionContext, Config from dagster_dbt import DagsterDbtTranslator, DbtCliResource, dbt_assets class MyDbtConfig(Config): full_refresh: bool @dbt_assets(manifest=Path("target", "manifest.json")) def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource, config: MyDbtConfig): dbt_build_args = ["build"] if config.full_refresh: dbt_build_args += ["--full-refresh"] yield from dbt.cli(dbt_build_args, context=context).stream() # Define a job that will run with --full-refresh dbt_incremental_full_refresh_job = define_asset_job( "dbt_incremental_full_refresh_job", selection=AssetSelection(my_dbt_assets), config=RunConfig({"my_dbt_assets": MyDbtConfig(full_refresh=True)}), )
In this example, the my_dbt_assets
function is decorated with @dbt_assets
and takes a config
parameter of type MyDbtConfig
. This configuration class has a full_refresh
attribute that determines whether to run dbt with the --full-refresh
option. When defining the job dbt_incremental_full_refresh_job
, you can use AssetSelection
to select the assets you want to include in the job and provide a RunConfig
that sets full_refresh
to True
. This will ensure that when the job is run, it will execute the dbt command with the --full-refresh
option, fully refreshing all incremental models. To run this job periodically, you can schedule it using Dagster's scheduling system. Here's an example of how to schedule the job to run every day at midnight: python from dagster import schedules @schedules.daily_schedule( pipeline_name="dbt_incremental_full_refresh_job", start_date=datetime.datetime(2021, 1, 1), execution_time=datetime.time(0, 0), # Midnight job=dbt_incremental_full_refresh_job, ) def daily_dbt_full_refresh_schedule(date): return {"my_dbt_assets": MyDbtConfig(full_refresh=True)}
This schedule will trigger the dbt_incremental_full_refresh_job
every day at midnight, passing the configuration to run dbt with the --full-refresh
option.