Radek Tomšej
04/20/2023, 2:25 PMdaily
and hourly
), one should be materialized once a day, while the other should be executed every hour. Another (derived_asset
) depends on these assets, which should also run every hour. But when this derived_asset
is started, the daily_asset
also refreshes, which is undesirable. Is it possible to force daily_asset
not to start automatically when derived_asset
starts? Thanks a lot!
from dagster import AutoMaterializePolicy, FreshnessPolicy, asset
@asset(
auto_materialize_policy=AutoMaterializePolicy.lazy(),
freshness_policy=FreshnessPolicy(
maximum_lag_minutes=5, cron_schedule="*/10 * * * *"
),
# non_argument_deps={source_asset.key}
)
def daily_asset():
return "Daily asset"
@asset(
auto_materialize_policy=AutoMaterializePolicy.lazy(),
freshness_policy=FreshnessPolicy(
maximum_lag_minutes=1, cron_schedule="*/2 * * * *"
),
)
def hourly_asset():
return "Hourly asset"
@asset(
auto_materialize_policy=AutoMaterializePolicy.lazy(),
freshness_policy=FreshnessPolicy(
maximum_lag_minutes=1,
cron_schedule="*/2 * * * *",
),
)
def derived_asset(daily_asset, hourly_asset):
return "Derived asset: " + daily_asset + " + " + hourly_asset
owen
04/21/2023, 9:37 PMderived_asset
states that it needs to be completely up to date with respect to its upstream data every hour. In order to accomplish this, it needs data that is more up to date than the freshness policy of daily_asset would provide by default (i.e. it needs data from the last hour, but daily_asset would only have data from the last day).
Essentially, if daily asset didn't execute that often, then derived asset would consistently be marked as late.
Another way to formulate this would be to remove this freshness policy from derived_asset, then update its AutoMaterializePolicy to eager
. This way, derived asset will just update whenever either of its parents update, and it won't impose any constraints on when daily_asset needs to be updatedRadek Tomšej
04/22/2023, 5:03 AMderived_asset
never materializes.
from dagster import (
AutoMaterializePolicy,
DailyPartitionsDefinition,
FreshnessPolicy,
HourlyPartitionsDefinition,
asset,
)
@asset(
auto_materialize_policy=AutoMaterializePolicy.lazy(),
freshness_policy=FreshnessPolicy(maximum_lag_minutes=5, cron_schedule="0 0 * * *"),
partitions_def=DailyPartitionsDefinition(start_date="2023-04-21")
# non_argument_deps={source_asset.key}
)
def daily_asset():
return "Daily asset"
@asset(
auto_materialize_policy=AutoMaterializePolicy.lazy(),
freshness_policy=FreshnessPolicy(maximum_lag_minutes=1, cron_schedule="0 * * * *"),
partitions_def=HourlyPartitionsDefinition(start_date="2023-04-22-00:00"),
)
def hourly_asset():
return "Hourly asset"
@asset(
auto_materialize_policy=AutoMaterializePolicy.eager(),
)
def derived_asset(daily_asset, hourly_asset):
return "Derived asset: " + daily_asset + " + " + hourly_asset
owen
04/24/2023, 5:28 PMderived_asset
can only be materialized once "day N" of daily_asset AND the last 24 hours of hourly_asset have been materializedRadek Tomšej
04/24/2023, 5:45 PM