How can I alert on asset freshness and send a slac...
# ask-community
g
How can I alert on asset freshness and send a slack message if an asset is NOT fresh?
o
hi @geoHeil! There's actually an out of the box sensor for this behavior being added to
dagster-slack
in today's release. Essentially:
Copy code
make_slack_on_freshness_policy_status_change_sensor(
    asset_selection=..., # assets to alert on
    channel="#foo",
    slack_token=os.getenv("SLACK_API_TOKEN"),
)
g
What is the difference between https://docs.dagster.io/master/concepts/partitions-schedules-sensors/asset-sensors#freshness-policy-sensors- `
Copy code
freshness_policy_sensor
and
make_slack_on_freshness_policy_status_changed_sensor
is the Slack one only a special sub-kind of the first one?
I cannot find
make_slack_on_freshness_policy_status_changed_sensor
in code or in any dagster documentation. Can you provide more documentation on how it should be used? In particular 1) do the assets require a freshness policy? 2) when I have a Sensor which hourly checks if a new file has arrived (which usually will only arrive daily) how can I bridge/combine the notion of freshness (schedule) with the dynaic sensor (a file has arrived)? In particular, I want to use the freshness sensor as a workaround to detect issues (operation takes longer than expected to run) and wonder how this could work.
o
@geoHeil That sensor is a
freshness_policy_sensor
, but saves you from having to write the slack-specific code yourself (with some extra options thrown in on top). I accidentally typo'd the name (
changed
should have been
change
), edited above. It's in the
dagster-slack
library version
0.17.6
, and you can find the API docs here: https://docs.dagster.io/_apidocs/libraries/dagster-slack#dagster_slack.make_slack_on_freshness_policy_status_change_sensor
for 1) this sensor will only fire alerts for assets with a freshness policy defined and 2) this ties in with the asset versioning system which is high on our list of priorities to support here. In the (hopefully near) future, the upstream version of the root asset will be taken into account, and will only change when a new file arrives, meaning a FreshnessPolicy(maximum_lag_minutes=60) would only determine there to be an issue if the asset hadn't consumed that file within 60 minutes of that file being available. However, this is not currently how this works.
g
so does this mean that for now if the upstream asset is created based on an event-based sensor that it will not work?
If I would be happy for starters if I get alerted each day 09:00 in case the asset was not materialized successfully (i.e. perhaps the event-based sensor did not trigger -- or it did but the job got stuck/was taking longer to execute and materialize the asset than the deadline) -- would such a scenario work?
o
@geoHeil yep that can work! the freshness calculation is independent of how any upstream assets are materialized. It sounds like
FreshnessPolicy(maximum_lag_minutes=???, cron_schedule="0 9 * * *")
would basically work for you. this says "by 9AM, there should be a materialization of this asset which incorporates all upstream data from at least
???
minutes ago". the main friction with the current implementation (vs. the ideal version-based implementation) is selecting a good value for
???
. Right now, this will depend on when you expect the file to come in. If you set maximum_lag_minutes to be too low (let's say
60
minutes), then if the file comes in before that time (let's say 7AM) and everything runs as expected, then the current logic will interpret your downstream asset as "has all of the upstream data up until
7AM
". When 9AM rolls around, the freshness policy will demand that the asset has all the data from up until 60 minutes before 9AM (so
8AM
), meaning it will fail its freshness check, even though everything went as expected.
so you just want to pick a value that you're confident will be before the new file shows up (sounds like maybe midnight would be a reasonable cutoff time, so you could do
maximum_lag_minutes=9*60
)
in the future, version based world, the available data will be taken into account, so you could set
maximum_lag_minutes=0
, meaning "by 9AM, all available upstream data should be incorporated into this asset"
g
sounds great.
though it would be even better if more complex i.e. stateful SLA would also be available i.e. standard deviation of runtime and alert on these
o
that's an interesting idea -- I think the tools are there to build that sort of sensor out of the default sensor machinery (there are some internal methods to calculate how late any given asset materialization was with respect to its freshness policy, so you could query for the last N materialization and get their lateness values and come up with some threshold based off of that), but yeah it'd be pretty challenging to do that on your own at the moment
there's a separate track of work we're looking into along tracking execution time of assets, which maybe that would fit more naturally into
🌈 1
g
indeed - I think so
210 Views