Adam Ward
01/09/2023, 7:11 PMdagster.run_storage
and querying the asset using get_latest_materialization_by_asset_key
but I can't seem to find the best way to try to output the Exception stack trace that I can see in Dagit.
Any suggestions?sandy
01/10/2023, 5:04 PMfrom dagster import (
DagsterInstance,
EventRecordsFilter,
DagsterEventType,
AssetKey,
SerializableErrorInfo,
)
def get_run_failure_reason_for_asset(
instance: DagsterInstance, asset_key: AssetKey
) -> SerializableErrorInfo:
# find the latest run that targeted the given asset
records = instance.get_event_records(
EventRecordsFilter(
event_type=DagsterEventType.ASSET_MATERIALIZATION_PLANNED, asset_key=asset_key
),
limit=1,
)
latest_run_id = records[0].run_id
# see why it failed
run_records = instance.get_records_for_run(
run_id=latest_run_id, of_type=DagsterEventType.RUN_FAILURE
)
return run_records[0].event_specific_data.error
Adam Ward
01/11/2023, 1:56 PMsandy
01/11/2023, 3:58 PMFor instance, should assets which are "late", never previously materialized, and failed their first run still have a PLANNED event?Yes
If so, should I expect it to have an error object with the failure reason?The PLANNED event should never have an error on it. The code snippet I sent fetches the error from the corresponding run. Assets can be late even if there are no errors, if they don't complete in time.
Lastly, I was re-reading the info on the Run Failure Sensor. That may be a more out-of-the-box solution for this, but I'm not sure it works on assets.I was going to suggest that. It works for asset runs. The only downside is that it only fires if there's a failure, not if your assets are late for some other reason, which may or may not be what you're looking for.