Muhammad Jarir Kanji
09/04/2023, 3:23 PM@asset(
io_manager_key="io_manager",
partitions_def=TimeWindowPartitionsDefinition(
start=date.today().isoformat(),
cron_schedule="0/5 * * * *",
fmt="%Y-%m-%d %H:%M:%S",
),
auto_materialize_policy=AutoMaterializePolicy.lazy(),
)
def upstream(context: AssetExecutionContext) -> str:
return datetime.now().isoformat()
@asset(
auto_materialize_policy=AutoMaterializePolicy.lazy(),
freshness_policy=FreshnessPolicy(maximum_lag_minutes=2),
ins={"upstream": AssetIn(partition_mapping=AllPartitionMapping())},
)
def downstream(context: AssetExecutionContext, upstream: str):
return upstream
upstream
is a time-partitioned asset, with new partitions every 5 minutes. downstream
has a maximum_lag_minutes=2
, but regardless of how much time passes, downstream
being overdue never causes one of `upstream`'s partitions to materialize.
What am I missing?Muhammad Jarir Kanji
09/06/2023, 7:17 PMowen
09/07/2023, 9:02 PMMuhammad Jarir Kanji
09/12/2023, 11:42 AMthis is due to an interaction between partitioned assets and freshness-based schedulingAdditionally, to provide a bit more detail about my specific problem: • I have a DBT project that I want to orchestrate with Dagster. • The DBT assets depend on a number of time-partitioned assets (as well as non-partitioned assets). However, they have different partition definitions and I can't bundle them together into one job and I'd rather also not create 20 different jobs (1 per asset). ◦ As a side-note, I'm using a factory to create the source assets, much like the example here. • I want the whole system to trigger at 6am. What's the best way to set this up? My initial plan was to make all the source assets (that DBT depends on) lazy so that the DBT job's freshness policy triggers them. However, I was running into the issue above. If I instead switch the partitioned assets to materialize eagerly, how do I ensure that DBT waits for all of the source assets to first finish updating for the day, instead f triggering for every single source asset?
owen
09/12/2023, 10:49 PMScheduleDefinition(cron_schedule="...", job=define_asset_job("...", selection="assetX"))
objects for each of your sources.
\
As for the "how do you ensure that dbt waits for all of the source assets to finish part", for that you can take advantage of the new AutoMaterializeRule
system and create a policy that will skip materializing until all upstream assets have been updated:
from dagster import AutoMaterializePolicy, AutoMaterializeRule
my_policy = AutoMaterializePolicy.with_rules(
AutoMaterializeRule.skip_on_not_all_parents_updated(),
)
To apply this to your dbt assets, you'll want to use the DagsterDbtTranslator API: https://docs.dagster.io/integrations/dbt/reference#customizing-auto-materialize-policiesowen
09/12/2023, 10:51 PM