geoHeil
01/10/2023, 4:42 PM@asset(freshness=600seconds)
tag for an asset which is defined inside DBT?Benoit Perigaud
01/10/2023, 4:46 PMdagster_freshness_policy
e.g.
{{
config(
dagster_freshness_policy={"maximum_lag_minutes": 60}
)
}}
geoHeil
01/10/2023, 4:48 PMmaximum_lag_minutes
can be configured. Is it also possible to pass the cron expression? I.e. tell dagster somehow that the dbt asset foo should be up-to-dte by 09:00 in the morning i.e. to trigger sufficiently up-front the reconciliation of all the data dependencies so it is on-time even if the various steps to materialize the dependencies are not instant?Benoit Perigaud
01/10/2023, 4:52 PMmaximum_lag_minutes
will always be required but you can also define a cron_schedule
i.e, for this table, I want data at 9am every day to be fresh by x minutesgeoHeil
01/10/2023, 4:56 PMBenoit Perigaud
01/10/2023, 5:00 PMSean Lopp
01/10/2023, 6:03 PM{{
config(
dagster_freshness_policy={"cron_schedule": "0 9 * * *", "maximum_lag_minutes": 0}
)
}}
I think this does what you want @geoHeil
Can you open a GitHub issue for the request to support DBT variable references?geoHeil
01/10/2023, 8:36 PMSean Lopp
01/10/2023, 9:54 PMowen
01/10/2023, 9:59 PM9:00
limit. In cases like yours (a once a day asset, without other freshness policies in the mix), it will generally kick off the execution as soon as possible -- so as soon as data for the previous day is made available, your asset will start to execute.9*60
. The cron schedule specifies "when do you want the data by"
and the maximum lag minutes indicates "at that time, how old can the data be"
Jonathan Neo
01/11/2023, 12:56 PMgeoHeil
01/11/2023, 1:49 PMowen
01/11/2023, 2:55 PMFreshnessPolicy(maximum_lag_minutes=9*60, cron_schedule="0 9 * * *")
would start just after midnight (i.e. a run of that asset and all of its upstream assets would be kicked off at that time). It would not be possible to start that run earlier than midnight, as then the root data that was pulled in at the start of the run would not have all of the records for the previous day yet. So in essence, it's giving the asset the best possible chance of having "all of yesterday's data"
by 9AM
.
If you have some upstream source data that lands at different times (i.e. not at exactly midnight), you can model that by removing those upstream assets from being targeted by the reconciliation sensor (so the sensor won't automatically kick off runs for those assets), and kick them off with your own custom logic (such as a custom sensor). In this case, the reconciliation sensor will kick off of the asset with a freshness policy (and all of the targeted upstream assets) as soon as all the relevant upstream data is made available (which again is the earliest possible time to kick off this computation).
Things can get more complicated when more freshness policies with different deadlines are in the mix, but for simple cases like this, it seems like this would be the desired behavior.geoHeil
01/11/2023, 4:12 PMowen
01/11/2023, 4:16 PMgeoHeil
01/11/2023, 4:17 PMowen
01/11/2023, 4:19 PMgeoHeil
01/11/2023, 4:21 PMowen
01/11/2023, 4:25 PMbuild_asset_reconciliation_sensor(asset_selection=AssetSelection.all() - AssetSelection.keys("asset", "keys", "to", "block", "on"))
, then you'll have the behavior you're looking for (as long as you have some separate sensor(s) to manage those keys.
But for the fully-integrated approach with observable source assets, I'd guess around a month or so?geoHeil
01/11/2023, 4:26 PMJonathan Neo
01/12/2023, 2:36 PMFreshnessPolicy(maximum_lag_minutes=9*60, cron_schedule="0 9 * * *")
.
is that the same as doing @schedule(cron_schedule="0 0 * * *"
and having it complete at 9:00am because I know that the asset takes 9 hours to run?Sean Lopp
01/12/2023, 3:43 PMFreshnessPolicy(maximum_lag_minutes=9*60, cron_schedule="0 9 * * *")
means we want all of the data from before midnight processed by 9am. The reconciliation sensor does this as eagerly as possible, so it starts a run around midnight.
FreshnessPolicy(maximum_lag_minutes=30, cron_schedule="0 9 * * *")
means we want all of the data from before 8:30am ready by 9am. The reconciliation sensor still does this as eagerly as possible, so it starts a run around 8:30
In the case of multiple assets it starts to get more complicated. That is where the reconciliation sensor does a lot of its magic, and I think its well described here: https://dagster.io/blog/declarative-schedulingnode_info_to_group_fn
to translate dbt tags into asset groups and then use AssetSelection.groups
. Another option would be to call load_assets_from_dbt_*
using the selection
argument and then pass the resulting object to AssetSelection.assets
owen
01/13/2023, 5:46 PMDbtManifestAssetSelection
, so if you have a pre-compiled manifest.json file, then you can use dbt selection syntax directly: https://docs.dagster.io/_apidocs/libraries/dagster-dbt#dagster_dbt.DbtManifestAssetSelectionJonathan Neo
01/14/2023, 4:22 AMFreshnessPolicy(maximum_lag_minutes=9*60, cron_schedule="0 9 * * *")
as you say.
If the asset takes:
• 1 hour to run, then the freshness policy will be fulfilled at 9am.
• 9 hours to run, then the freshness policy will be fulfilled at 9am, because the asset would have been refreshed by exactly 9am.
• 10 hours to run, then the freshness policy will fail when checked at 9am, because the asset is still materializing.
That's where my comment about "historical data to calculate the median time it takes to run the asset to determine when the asset should be triggered" would be useful. If I want my asset to be fresh by 9 AM, then I can define that declaratively, and let dagster calculate what time it needs to trigger the asset refresh. If the asset takes 10 hours to run, then it needs to trigger it at 11pm the day before.
This feature would be useful for pipelines that are slowly increasing in execution time.
Where it stands today, the developer has to manually update maximum_lag_minutes
to a higher number when the asset's execution time increases.Sean Lopp
01/17/2023, 8:46 PMJonathan Neo
01/18/2023, 1:09 AM