Eric Goddard
10/28/2022, 8:23 PMresources:
dbt:
config:
vars:
refresh_periods:
- 2022Q2
The dbt command that gets run does not have the variables. setup details are 🧵DBT_PROJECT_DIR = os.environ.get("DBT_PROJECT_DIR")
DBT_PROFILES_DIR = os.environ.get("DBT_PROFILES_DIR")
dbt_resource = dbt_cli_resource.configured(
{"profiles_dir": DBT_PROFILES_DIR, "project_dir": DBT_PROJECT_DIR, "target": "prod"}
)
dbt_assets = with_resources(
load_assets_from_dbt_project(project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, use_build_command=True),
{"dbt": dbt_resource},
)
@repository
def repo():
definitions = [
dbt_assets,
]
return definitions
and a job:
update_asset_job = define_asset_job(
"update_asset_job",
selection=["+some_dbt_model"],
description="Builds dbt models",
)
shouldn’t i be able able to pass a var to the job via dagit with a configuration like this:
resources:
dbt:
config:
vars:
refresh_periods:
- 2022Q2
When the job runs the dbt command that gets logged is:
Executing command: dbt --no-use-color --log-format json build --project-dir /opt/dagster/dagster_home/ --profiles-dir /opt/dagster/dagster_home/ --target prod --select dbt_project.subdir.some_early_model dbt_project.subdir.some_dbt_model
define_asset_job
with dbt assets?Adam Bloom
10/31/2022, 6:42 PMEric Goddard
10/31/2022, 6:43 PMdefine_asset_job
as aboveAdam Bloom
10/31/2022, 6:45 PMload_assets_from_dbt_project
has two arguments that you can use for this. partitions_def
and partition_key_to_vars_fn
. It's a bit clunky for some use cases, but it does work. I set up static partitions. You'll just need any sensors/schedules/etc to trigger with the partition key to have the vars be passed onEric Goddard
11/01/2022, 12:23 PMdbt_resource = dbt_cli_resource.configured(
{"profiles_dir": DBT_PROFILES_DIR,
"project_dir": DBT_PROJECT_DIR,
"target": "prod"}
)
REFRESH_QUARTERS = [
f"{year}Q{quarter}"
for year in range(2020, date.today().year + 1)
for quarter in [1, 2, 3, 4]
]
quarterly_partitions_def = StaticPartitionsDefinition(partition_keys=REFRESH_QUARTERS)
def partition_key_to_dbt_vars(partition_key):
return {"refresh_periods": [partition_key]}
dbt_assets = with_resources(
load_assets_from_dbt_project(
project_dir=DBT_PROJECT_DIR,
profiles_dir=DBT_PROFILES_DIR,
select="+some_dbt_model +some_other_dbt_model",
use_build_command=True,
partitions_def=quarterly_partitions_def,
partition_key_to_vars_fn=partition_key_to_dbt_vars,
),
{"dbt": dbt_resource},
)