Is it possible to respond to `AssetObservation` ev...
# ask-ai
c
Is it possible to respond to
AssetObservation
events with a sensor?
s
Yes, it is possible to respond to
AssetObservation
events with a sensor. The documentation provides an example of how to write a sensor that listens for observations of a particular asset and triggers runs based on them. Here's a snippet from the documentation that illustrates how to get the latest observation of a given asset using a sensor:
Copy code
python from dagster import sensor, DagsterEventType, EventRecordsFilter, AssetKey  @sensor def asset1_a_num_rows(context):     instance = context.instance     records = instance.get_event_records(         event_records_filter=EventRecordsFilter(             event_type=DagsterEventType.ASSET_OBSERVATION,             asset_key=AssetKey(["asset1"]),         ),         limit=1,     )     latest_observation = records[0].asset_observation     # ... additional logic based on the observation
This sensor starts by querying the event log for the latest
AssetObservation
event for the asset with key
["asset1"]
. You can then use the information from the latest observation to decide whether to trigger a run or perform other actions. Additionally, if you want to "tail" the event log for new observation events, you can use the
context.cursor
to keep track of the last processed event and only process new events:
Copy code
python from dagster import sensor, DagsterEventType, EventRecordsFilter, AssetKey  @sensor def asset1_a_num_rows(context):     instance = context.instance     records = instance.get_event_records(         event_records_filter=EventRecordsFilter(             event_type=DagsterEventType.ASSET_OBSERVATION,             asset_key=AssetKey(["asset1"]),             after_cursor=int(context.cursor),         ),     )     if len(records) == 0:         return  # No new observations      # ... additional logic based on new observations      context.set_cursor(str(records[-1].storage_id))
In this example, the sensor checks for new
AssetObservation
events that have occurred since the last processed event, identified by the cursor. If new observations are found, you can perform actions based on them and update the cursor to the latest event's storage ID.