I still have quite a hard time grokking how freshn...
# dagster-feedback
s
I still have quite a hard time grokking how freshness policies work, and how they map to "when will the job be run". Take the policy of
FreshnessPolicy(maximum_lag_minutes=60)
-- does this amount to different things depending on the dependency structure? ā€¢ If no upstream dependencies, it is basically an hourly schedule ā€¢ If one upstream dependency (but just one layer), it is dependent on upstream assets being fresher, and will be updated w/i one hour ā€¢ If two or more upstream dependencies (but just one layer), it is dependent on all upstream assets being fresher, and will be updated w/i one hour ā€¢ If two or more layers of dependencies, it is dependent on all upstream assets being fresher, and will be updated w/i one hour Do i understand that right? I'm not sure which docs page is best for understanding this, but I have learned the most from the class docstring. Feels like a table of some sort in the docs would be useful.
šŸ’” 1
āž• 4
dagster docs feedback 2
r
hi @Stephen Bailey I'm deep into figuring out the exact same thing.
I assumed that setting a 60 min FreshnessPolicy would refresh the asset hourly but it's much more complicated than that
m
Hi, all. As I understand, settings 60 min FreshnessPolicy to an asset is equivalent to say : "At any time, you must manage to refresh all the dependencies you need, with a tolerance threshold of 60 minutes.". This does not necessarily mean that all dependencies will refresh at the same time. Dagster will make sure this rule, and potentially others, are always true. This allows him to avoid materializing assets unnecessarily.
d
As I understand, the rule actually won't be true because materializing upstream assets can take arbitrary amount of time, right?
So the materialization will be kicked off in 60 minutes, but as it can take a while to run them the asset will still be outdated for some time
n
At a first read, the FreshnessPolicy sounds like an upper-bound: Dagster guarantees that the asset will be up-to-date with its dependencies after maximum X minutes. But the implementation of such thing seems quite complex: you'd have to keep average statistics of execution of dependencies and add a tolerance to make sure that the policy will be respected on average. And it may be impossible to respect, for example : if you ask for 1h freshness, but the average execution of dependencies is 2h. So, I don't think this is it. (but it would be interesting nonetheless, with a failure to meet expectations notification system) Hinted by the point that is made to avoid unnecessary materialization, should the lag be interpreted as a lower bound before which a materialization will not happen? The asset will wait for X minutes before checking if its dependencies were updated, if they are, only then, will it start its materialization.
d
a lower bound before which a materialization will not happen seems like a clear explanation to me
n
I feel like it contradicts some description and naming though.
n
@Nicolas Parot Alvarez it would be very smart solution to look at materialization statistics and I recall someone from Elemementl mentioned it is in their plan, however, cureent implementation kicks materialization when it has 10% of time left before asset gets outdated. See https://github.com/dagster-io/dagster/blob/abe4aaec391a9082c14984044ff8772c1a45f343/python_modules/dagster/dagster/_core/definitions/freshness_based_auto_materialize.py#L63 Lower bound = 0.9 * maximum_lag_minutes, when cron schedule is not defined. @sandy would it make sense to make it as a configurable asset parameter? Eliminating complex logic based on statistics users might just specify expected materialization time based on their judgment.
n
Thank you for pointing to the implementation @Nikolaj Galak. So it seems it's currently an in-between, it is factually a lower bound, but it pretends to be an upper-bound by triggering 10% of the lag earlier and betting that most materializations won't take more than 10% of the lag. I guess this is a first short and simple implementation to get users familiar with the idea that may leave room for smarter stuff in the future.
šŸ‘ 2
s
Would it make sense to have a
freshness_dependency_depth
parameter, where you want the scheduler to only consider the immediate parents (or parents of parents) when calculating freshness? Being able to specify
depth=1
(refresh after parent) or
depth=0
(refresh on schedule) would mitigate some of the concerns about not knowing how long upstream things would take, and also simplify the reasoning for why something has or hasn't kicked off.
r
@Stephen Bailey are you understanding that today it's considering all upstream assets when determining an asset's lag? is it considering all of them equally?
s
yes, for example, i have some assets that sit on top of the end of my dbt graph, and they are not materializing, because some source assets (5 layers earlier) are not emitting Materialize events, so I hit this
Skip Conditions
error. Dagster doesn't auto-materialize these upstream assets (they are controlled on a schedule), so despite it's parent emitting events, the asset itself does not run
r
ok, and why does it wait for them exactly? is it because these upstream assets were never materialized or because their materialization is too old or another reason?
I'm unclear what issue I'm running into. This asset has a 1-day freshness policy and it hasn't built in 2 days. I would expect at least to see the condition "Required to meet this asset's freshness policy" met. šŸ¤·
s
in my case, it's because an upstream asset (5 layers up) hasn't had materialization events in a long time. (apparently, because someone renamed it in dagster, and it's in a different code location upsidedown tears )
šŸ¤£ 1
o
hey everyone! appreciate all the great context / discussion on this thread. What we're trying to figure out internally is: a) what behavior people expect out of freshness-based scheduling b) how to make it easy to express this in terms of a set of policies Currently, the system is quite hard to comprehend, as small issues upstream can grind execution to a halt far downstream, as @Stephen Bailey notes (and I would agree that this is likely what's going with @Remi Gabilletā€™s graph), and it can be hard (nearly impossible) to translate a given freshness policy to any precise expected execution time. The "lower bound pretending to be an upper bound" comment from @Nicolas Parot Alvarez is a great way of summing up the current behavior, and indeed if there were easy access to asset runtime statistics, this would be immediately factored in to the algorithm (however, this would require some storage changes to keep track of these things explicitly). We don't have any immediate planned work in this area, but it is something that would be useful independent of any automated scheduling and in general would be a nice feature (e.g. you can imagine the ability to see historical execution time trends) The idea of having a "depth" parameter is interesting, as it would prevent some categories of issues (in which small issues far upstream cause execution far downstream to halt). However, I think the main issue would come if a policy of the form "this asset should never be more than 60 minutes older than any of its parents" were to be applied to many assets at once. In the extreme, imagine a fully-linear asset graph of the form
A -> B -> ... -> Z
. An update to
A
would take ~26 hours to propagate all the way down to the bottom, as each asset in the chain would feel comfortable waiting ~60 minutes to propagate the change from their specific parent. My suspicion is that there's some other way of describing the desired behavior in a lot of these cases which would vastly simplify the mental model. If people have specific ways of stating what they'd like (independent from any current implementation / freshness policies), I'd love to talk through what that might be. Some possible starting points: ā€¢ "This asset should execute at around 8AM every day, after its parents have been updated" ā€¢ "If this asset hasn't been materialized in the last 60 minutes, materialize it as soon as any of its parents have been updated" ā€¢ "Every 60 minutes, this asset should be materialized, as long as one of its parents has been updated since the last time it ran"
D 2
s
To me, the expected behavior is,
maximum_lag_minutes=0
-- i.e. I want as little lag between any asset materializations as possible. So in the
A->Z
case, I would set 0 lag between assets, and then everything would refresh as soon as its parent refreshed, which is what an event-driven system would do. (i.e. a standard
asset_sensor
today). For the case where there are no parents, a more active way of expressing
maximum_lag_minutes
would be to call it
minimum_time_between_materializations
, or
*/60
in cron terminology. In this case, I personally think just using cron_schedules is how this should be done --
FreshnessPolicy(maximum_lag_minutes=0, cron_schedule="5 * * * *")
.
šŸŒˆ 1
d
@owen just fyi storing & displaying historical asset & job execution time would extremely useful
šŸ‘ 1
n
Why not being more opened about the fact that it is a lag in minutes before materialization to prevent unnecessary or excessive materialization? It sounds less like cool magic, but it seems more understandable.
FreshnessPolicy(minimum_lag_minutes=0, cron_schedule="5 * * * *")
minimum_lag_minutes (float) ā€“ If the asset's materialization is triggered by its parent(s), and respects its optional CRON schedule, it awaits
minimum_lag_minutes
before starting its own materialization. For example, if the current asset's parent(s) is updated every 1 min, but you only need the current asset to be updated every 1 h, because less than that would be wasted computation, then you can set
minimum_lag_minutes = 60
to enforce a lag of 1 h that will be waited before the current asset starts materializing. I think this prevents wondering about it works. A downside is that it may go against initial intentions of a smarter features. On our side, currently to prevent unnecessary materialization, we've been playing with our sensor definitions. We've set reasonable frequencies, and in the sensor evaluation we check that certain jobs are not already running before triggering new jobs. Example: https://gist.github.com/NicolasPA/854392e22dc1410977cc7ddb8b8605a4
šŸ‘€ 1