Hello all. Happy Tuesday! We’re new to Dagster, a...
# integration-dbt
p
Hello all. Happy Tuesday! We’re new to Dagster, and working with dbt from a few weeks back. First, we simply loaded our assets. Then, we realised we needed them as partitions, so we have:
Copy code
my_partitions_def = DailyPartitionsDefinition(start_date="2022-02-02")

def partition_key_to_dbt_vars(partition_key):
  return {"p_load_datetime": partition_key}

# Load dbt models as assets
dbt_uk_assets = load_assets_from_dbt_project(
  project_dir = DBT_PROJECT_PATH,
  profiles_dir = DBT_PROFILES,
  use_build_command = True,
  select = 'tag:uk',
  exclude = 'tag:wip',
  partitions_def = my_partitions_def,
  partition_key_to_vars_fn = partition_key_to_dbt_vars,
  key_prefix = "uk"
)

dbt_uk_companies_assets = load_assets_from_dbt_project(
  project_dir = DBT_PROJECT_PATH,
  profiles_dir = DBT_PROFILES,
  use_build_command = True,
  partitions_def = my_partitions_def,
  partition_key_to_vars_fn = partition_key_to_dbt_vars,
  select = 'uk_d_companies',
  key_prefix = "uk_companies"
)
Both groups materialize daily though,
uk_d_companies
should materialize after
uk
(they have a dbt dependency) and it runs at the same time. So, I’ve been researching how to do it, and it seems that there are multiple ways we can do it, so I focussed on: Add a freshness policy to my dbt configuration
uk_d_companies.sql
Copy code
{{
  config(
    materialized = 'incremental',
    transient = false,
    incremental_strategy = 'delete+insert',
    on_schema_change = 'sync_all_columns',
    unique_key = ['COMPANY_SK', 'DBT_VALID_FROM'],
    dagster_freshness_policy = { "maximum_lag_minutes": 15 }
  )
}}
+ a reconciliation sensor in dagster
Copy code
uk_companies_sensor = build_asset_reconciliation_sensor(
  AssetSelection.key_prefixes("uk_companies"),
  name = "uk_companies_sensor"
)
OR + a auto materialize policy in dagster when loading the uk_companies asset
Copy code
dbt_uk_companies_assets = load_assets_from_dbt_project(
  project_dir = DBT_PROJECT_PATH,
  profiles_dir = DBT_PROFILES,
  use_build_command = True,
  partitions_def = my_partitions_def,
  partition_key_to_vars_fn = partition_key_to_dbt_vars,
  select = 'uk_d_companies',
  key_prefix = "uk_companies", 
  node_info_to_auto_materialize_policy_fn = lambda _: AutoMaterializePolicy.lazy()
)
But none of the strategies seems to do what I need. They materialize
uk_companies
when I first activate them, but not - 15 minutes after - the daily partition (
uk
) materialization finishes. Is this the way to go and I’m missing something? Or should I take another route?
b
When you say they have a dbt dependency, do they also have a dagster dependency?
p
yes, we can see the dependancy in the Global Asset Lineage
b
You could create a dbt job that materialises those two assets, and a schedule that runs that job daily.
But I don't understand why they materialise at the same time, in this case.
p
maybe a dumb question but, do they’ll run in order?
b
I guess I'm confused. You said:
Both groups materialize daily though,
uk_d_companies
should materialize after
uk
(they have a dbt dependency) and it runs at the same time.
What makes them materialise at all, right now? (Before the policies and sensors were added)
p
This schedule for
uk
Copy code
uk_models = build_schedule_from_partitioned_job(
    name = "uk_models"
    , job = uk
    , hour_of_day = 2
    , minute_of_hour = 00
)
b
Got it. And what's the
uk
job?
p
yep
b
If the job is to materialise both of those assets, it should materialise them in order of their dependencies.
p
makes sense, I’ll give it a try. Thanks a lot 🙂
b
Good luck!
fistbump 1
p
Quick update. Sadly the several assets job didn’t do the trick, it materialize both pipelines at the same time. Though, because both are daily partitions, I loaded them together and it materialize them in sequence. Here is the code: ASSETS
Copy code
dbt_uk_assets = load_assets_from_dbt_project(
    project_dir = DBT_PROJECT_PATH,
    profiles_dir = DBT_PROFILES,
    use_build_command = True,
    select = 'tag:uk uk_d_companies',
    exclude = 'tag:wip',
    partitions_def = daily_partition_def,
    partition_key_to_vars_fn = partition_key_to_dbt_vars,
    key_prefix = "uk_daily"
)
JOBS
Copy code
uk_daily = define_asset_job( 
    "uk_daily", 
    selection = AssetSelection.key_prefixes( "uk_daily" ) 
)
SCHEDULES
Copy code
uk_daily = build_schedule_from_partitioned_job(
    name = "uk_daily"
    , job = uk_daily
    , hour_of_day = 4
    , minute_of_hour = 00
)
Let’s see how it goes with the weekly and monthly partitions… 🫣
👍 1
b
Yes, you got it - one job for both assets.
thank you box 1