Could someone please help me understand why the fo...
# ask-community
m
Could someone please help me understand why the following toy auto-materialization example never triggers any materializations:
Copy code
@asset(
    io_manager_key="io_manager",
    partitions_def=TimeWindowPartitionsDefinition(
        start=date.today().isoformat(),
        cron_schedule="0/5 * * * *",
        fmt="%Y-%m-%d %H:%M:%S",
    ),
    auto_materialize_policy=AutoMaterializePolicy.lazy(),
)
def upstream(context: AssetExecutionContext) -> str:
    return datetime.now().isoformat()


@asset(
    auto_materialize_policy=AutoMaterializePolicy.lazy(),
    freshness_policy=FreshnessPolicy(maximum_lag_minutes=2),
    ins={"upstream": AssetIn(partition_mapping=AllPartitionMapping())},
)
def downstream(context: AssetExecutionContext, upstream: str):
    return upstream
upstream
is a time-partitioned asset, with new partitions every 5 minutes.
downstream
has a
maximum_lag_minutes=2
, but regardless of how much time passes,
downstream
being overdue never causes one of `upstream`'s partitions to materialize. What am I missing?
🤖 1
Pinging the channel to bump this question.
o
hi @Muhammad Jarir Kanji -- in this case, I would recommend using an AutoMaterializePolicy.eager() for your time-partitioned root asset. this is due to an interaction between partitioned assets and freshness-based scheduling, but in the end, filling in new partitions as they pop into existence is likely the behavior that makes the most sense in this scenario. another thing to note is that all time partitions of the upstream will need to be filled in before the downstream can be materialized.
m
@owen Thank you for the answer! Could you explain this bit in a bit more detail:
this is due to an interaction between partitioned assets and freshness-based scheduling
Additionally, to provide a bit more detail about my specific problem: • I have a DBT project that I want to orchestrate with Dagster. • The DBT assets depend on a number of time-partitioned assets (as well as non-partitioned assets). However, they have different partition definitions and I can't bundle them together into one job and I'd rather also not create 20 different jobs (1 per asset). ◦ As a side-note, I'm using a factory to create the source assets, much like the example here. • I want the whole system to trigger at 6am. What's the best way to set this up? My initial plan was to make all the source assets (that DBT depends on) lazy so that the DBT job's freshness policy triggers them. However, I was running into the issue above. If I instead switch the partitioned assets to materialize eagerly, how do I ensure that DBT waits for all of the source assets to first finish updating for the day, instead f triggering for every single source asset?
o
ah I see -- considering that you're already using a factory pattern for your source assets, one option would be to use a factory to generate the separate
ScheduleDefinition(cron_schedule="...", job=define_asset_job("...", selection="assetX"))
objects for each of your sources. \ As for the "how do you ensure that dbt waits for all of the source assets to finish part", for that you can take advantage of the new
AutoMaterializeRule
system and create a policy that will skip materializing until all upstream assets have been updated:
Copy code
from dagster import AutoMaterializePolicy, AutoMaterializeRule

my_policy = AutoMaterializePolicy.with_rules(
    AutoMaterializeRule.skip_on_not_all_parents_updated(),
)
To apply this to your dbt assets, you'll want to use the DagsterDbtTranslator API: https://docs.dagster.io/integrations/dbt/reference#customizing-auto-materialize-policies
Ah and to explain that in a bit more detail, while time-partitioned assets can be evaluated for how fresh they are, they do not end up getting materialized by lazy policies This is definitely a very confusing part of the system, and we're currently evaluating options to make this entire system more ergonomic. This would include things like making it possible to schedule root assets on a schedule using AMPs rather than ScheduleDefinitions