Hi, we are currently dealing with one thing. We ha...
# ask-community
r
Hi, we are currently dealing with one thing. We have 2 assets (
daily
and
hourly
), one should be materialized once a day, while the other should be executed every hour. Another (
derived_asset
) depends on these assets, which should also run every hour. But when this
derived_asset
is started, the
daily_asset
also refreshes, which is undesirable. Is it possible to force
daily_asset
not to start automatically when
derived_asset
starts? Thanks a lot!
Copy code
from dagster import AutoMaterializePolicy, FreshnessPolicy, asset


@asset(
    auto_materialize_policy=AutoMaterializePolicy.lazy(),
    freshness_policy=FreshnessPolicy(
        maximum_lag_minutes=5, cron_schedule="*/10 * * * *"
    ),
    # non_argument_deps={source_asset.key}
)
def daily_asset():
    return "Daily asset"


@asset(
    auto_materialize_policy=AutoMaterializePolicy.lazy(),
    freshness_policy=FreshnessPolicy(
        maximum_lag_minutes=1, cron_schedule="*/2 * * * *"
    ),
)
def hourly_asset():
    return "Hourly asset"


@asset(
    auto_materialize_policy=AutoMaterializePolicy.lazy(),
    freshness_policy=FreshnessPolicy(
        maximum_lag_minutes=1,
        cron_schedule="*/2 * * * *",
    ),
)
def derived_asset(daily_asset, hourly_asset):
    return "Derived asset: " + daily_asset + " + " + hourly_asset
o
hi @Radek Tomšej! the reason this is happening is that
derived_asset
states that it needs to be completely up to date with respect to its upstream data every hour. In order to accomplish this, it needs data that is more up to date than the freshness policy of daily_asset would provide by default (i.e. it needs data from the last hour, but daily_asset would only have data from the last day). Essentially, if daily asset didn't execute that often, then derived asset would consistently be marked as late. Another way to formulate this would be to remove this freshness policy from derived_asset, then update its AutoMaterializePolicy to
eager
. This way, derived asset will just update whenever either of its parents update, and it won't impose any constraints on when daily_asset needs to be updated
r
Thanks a lot @owen, that makes sense and I thought of that too. But what seems strange is that if I use assets without partition, everything works as it should, but if I add partitioning,
derived_asset
never materializes.
Copy code
from dagster import (
    AutoMaterializePolicy,
    DailyPartitionsDefinition,
    FreshnessPolicy,
    HourlyPartitionsDefinition,
    asset,
)


@asset(
    auto_materialize_policy=AutoMaterializePolicy.lazy(),
    freshness_policy=FreshnessPolicy(maximum_lag_minutes=5, cron_schedule="0 0 * * *"),
    partitions_def=DailyPartitionsDefinition(start_date="2023-04-21")
    # non_argument_deps={source_asset.key}
)
def daily_asset():
    return "Daily asset"


@asset(
    auto_materialize_policy=AutoMaterializePolicy.lazy(),
    freshness_policy=FreshnessPolicy(maximum_lag_minutes=1, cron_schedule="0 * * * *"),
    partitions_def=HourlyPartitionsDefinition(start_date="2023-04-22-00:00"),
)
def hourly_asset():
    return "Hourly asset"


@asset(
    auto_materialize_policy=AutoMaterializePolicy.eager(),
)
def derived_asset(daily_asset, hourly_asset):
    return "Derived asset: " + daily_asset + " + " + hourly_asset
o
ah this is happening because a partition of an asset will only be materialized once all of its parent partitions have been materialized. i.e. "day N" of
derived_asset
can only be materialized once "day N" of daily_asset AND the last 24 hours of hourly_asset have been materialized
r
aaah ok, it is probably related to this one - https://github.com/dagster-io/dagster/issues/13848