Hello - I'm trying to create a FreshnessPoicy sens...
# ask-community
a
Hello - I'm trying to create a FreshnessPoicy sensor that will run every 15 mins, look in the run history for all assets, and return helpful info for the failing run. I'm touching things like the
dagster.run_storage
and querying the asset using
get_latest_materialization_by_asset_key
but I can't seem to find the best way to try to output the Exception stack trace that I can see in Dagit. Any suggestions?
s
Hey Adam - something like this might be helpful:
Copy code
from dagster import (
    DagsterInstance,
    EventRecordsFilter,
    DagsterEventType,
    AssetKey,
    SerializableErrorInfo,
)


def get_run_failure_reason_for_asset(
    instance: DagsterInstance, asset_key: AssetKey
) -> SerializableErrorInfo:
    # find the latest run that targeted the given asset
    records = instance.get_event_records(
        EventRecordsFilter(
            event_type=DagsterEventType.ASSET_MATERIALIZATION_PLANNED, asset_key=asset_key
        ),
        limit=1,
    )
    latest_run_id = records[0].run_id

    # see why it failed
    run_records = instance.get_records_for_run(
        run_id=latest_run_id, of_type=DagsterEventType.RUN_FAILURE
    )
    return run_records[0].event_specific_data.error
a
Hi Sandy - Thanks so much for that! I had to modify a bit as most of the valuable error info to us was in the RUN_FAILURE events and many of the late assets we had did not have an ASSET_MATERIALIZATION_PLANNED even for some reason, but it got me pretty close to where I needed to be. Can you point me to some documentation on a couple of things? 1. I'd like to better understand the events surrounding assets and how they correspond to materialization runs. For instance, should assets which are "late", never previously materialized, and failed their first run still have a PLANNED event? If so, should I expect it to have an error object with the failure reason? In our repository's current state, most of the assets that were considered Late did not have an error object on the PLANNED event, but I wondered if perhaps that was because this has not been fully deployed yet. 2. I noticed you used only pulled 1 event record and explicitly pulled first record from the run_records. For a bit more context, our assets will all have a 3 hour freshness policy and will be managed by a reconciliation sensor. We are planning to have this sensor run every 15 mins, but want it to report all assets which have become late over the last 15 AND those that continue to be late. It should then generate an email with the exception info from the last failing materialization. Are those selections still going to be valid for our case OR should we look at some kind of RunsFilter using date/time? 3. Lastly, I was re-reading the info on the Run Failure Sensor. That may be a more out-of-the-box solution for this, but I'm not sure it works on assets. Can you speak to if/how I could use that? Thanks again for all your help here! I'm really enjoying learning and using Dagster and have been super impressed by this Slack community and how helpful everyone has been. Appreciate any info you can add here.
s
For instance, should assets which are "late", never previously materialized, and failed their first run still have a PLANNED event?
Yes
If so, should I expect it to have an error object with the failure reason?
The PLANNED event should never have an error on it. The code snippet I sent fetches the error from the corresponding run. Assets can be late even if there are no errors, if they don't complete in time.
Lastly, I was re-reading the info on the Run Failure Sensor. That may be a more out-of-the-box solution for this, but I'm not sure it works on assets.
I was going to suggest that. It works for asset runs. The only downside is that it only fires if there's a failure, not if your assets are late for some other reason, which may or may not be what you're looking for.
❤️ 1