Patricia Ayuso
06/13/2023, 10:16 AMmy_partitions_def = DailyPartitionsDefinition(start_date="2022-02-02")
def partition_key_to_dbt_vars(partition_key):
return {"p_load_datetime": partition_key}
# Load dbt models as assets
dbt_uk_assets = load_assets_from_dbt_project(
project_dir = DBT_PROJECT_PATH,
profiles_dir = DBT_PROFILES,
use_build_command = True,
select = 'tag:uk',
exclude = 'tag:wip',
partitions_def = my_partitions_def,
partition_key_to_vars_fn = partition_key_to_dbt_vars,
key_prefix = "uk"
)
dbt_uk_companies_assets = load_assets_from_dbt_project(
project_dir = DBT_PROJECT_PATH,
profiles_dir = DBT_PROFILES,
use_build_command = True,
partitions_def = my_partitions_def,
partition_key_to_vars_fn = partition_key_to_dbt_vars,
select = 'uk_d_companies',
key_prefix = "uk_companies"
)
Both groups materialize daily though, uk_d_companies
should materialize after uk
(they have a dbt dependency) and it runs at the same time.
So, I’ve been researching how to do it, and it seems that there are multiple ways we can do it, so I focussed on:
Add a freshness policy to my dbt configuration
uk_d_companies.sql
{{
config(
materialized = 'incremental',
transient = false,
incremental_strategy = 'delete+insert',
on_schema_change = 'sync_all_columns',
unique_key = ['COMPANY_SK', 'DBT_VALID_FROM'],
dagster_freshness_policy = { "maximum_lag_minutes": 15 }
)
}}
+ a reconciliation sensor in dagster
uk_companies_sensor = build_asset_reconciliation_sensor(
AssetSelection.key_prefixes("uk_companies"),
name = "uk_companies_sensor"
)
OR
+ a auto materialize policy in dagster when loading the uk_companies asset
dbt_uk_companies_assets = load_assets_from_dbt_project(
project_dir = DBT_PROJECT_PATH,
profiles_dir = DBT_PROFILES,
use_build_command = True,
partitions_def = my_partitions_def,
partition_key_to_vars_fn = partition_key_to_dbt_vars,
select = 'uk_d_companies',
key_prefix = "uk_companies",
node_info_to_auto_materialize_policy_fn = lambda _: AutoMaterializePolicy.lazy()
)
But none of the strategies seems to do what I need. They materialize uk_companies
when I first activate them, but not - 15 minutes after - the daily partition (uk
) materialization finishes.
Is this the way to go and I’m missing something? Or should I take another route?Brendan Jackson
06/13/2023, 1:12 PMPatricia Ayuso
06/13/2023, 1:15 PMBrendan Jackson
06/13/2023, 1:54 PMBrendan Jackson
06/13/2023, 1:55 PMPatricia Ayuso
06/13/2023, 1:55 PMBrendan Jackson
06/13/2023, 1:57 PMBoth groups materialize daily though,What makes them materialise at all, right now? (Before the policies and sensors were added)should materialize afteruk_d_companies
(they have a dbt dependency) and it runs at the same time.uk
Patricia Ayuso
06/13/2023, 2:00 PMuk
uk_models = build_schedule_from_partitioned_job(
name = "uk_models"
, job = uk
, hour_of_day = 2
, minute_of_hour = 00
)
Brendan Jackson
06/13/2023, 2:00 PMuk
job?Patricia Ayuso
06/13/2023, 2:00 PMBrendan Jackson
06/13/2023, 2:00 PMPatricia Ayuso
06/13/2023, 2:01 PMBrendan Jackson
06/13/2023, 2:01 PMPatricia Ayuso
06/13/2023, 4:27 PMdbt_uk_assets = load_assets_from_dbt_project(
project_dir = DBT_PROJECT_PATH,
profiles_dir = DBT_PROFILES,
use_build_command = True,
select = 'tag:uk uk_d_companies',
exclude = 'tag:wip',
partitions_def = daily_partition_def,
partition_key_to_vars_fn = partition_key_to_dbt_vars,
key_prefix = "uk_daily"
)
JOBS
uk_daily = define_asset_job(
"uk_daily",
selection = AssetSelection.key_prefixes( "uk_daily" )
)
SCHEDULES
uk_daily = build_schedule_from_partitioned_job(
name = "uk_daily"
, job = uk_daily
, hour_of_day = 4
, minute_of_hour = 00
)
Let’s see how it goes with the weekly and monthly partitions… 🫣Brendan Jackson
06/13/2023, 4:29 PM