Chris Stefano
03/13/2024, 2:45 PMAssetObservation
events with a sensor?Scout
03/13/2024, 2:45 PMAssetObservation
events with a sensor. The documentation provides an example of how to write a sensor that listens for observations of a particular asset and triggers runs based on them. Here's a snippet from the documentation that illustrates how to get the latest observation of a given asset using a sensor: python from dagster import sensor, DagsterEventType, EventRecordsFilter, AssetKey @sensor def asset1_a_num_rows(context): instance = context.instance records = instance.get_event_records( event_records_filter=EventRecordsFilter( event_type=DagsterEventType.ASSET_OBSERVATION, asset_key=AssetKey(["asset1"]), ), limit=1, ) latest_observation = records[0].asset_observation # ... additional logic based on the observation
This sensor starts by querying the event log for the latest AssetObservation
event for the asset with key ["asset1"]
. You can then use the information from the latest observation to decide whether to trigger a run or perform other actions. Additionally, if you want to "tail" the event log for new observation events, you can use the context.cursor
to keep track of the last processed event and only process new events: python from dagster import sensor, DagsterEventType, EventRecordsFilter, AssetKey @sensor def asset1_a_num_rows(context): instance = context.instance records = instance.get_event_records( event_records_filter=EventRecordsFilter( event_type=DagsterEventType.ASSET_OBSERVATION, asset_key=AssetKey(["asset1"]), after_cursor=int(context.cursor), ), ) if len(records) == 0: return # No new observations # ... additional logic based on new observations context.set_cursor(str(records[-1].storage_id))
In this example, the sensor checks for new AssetObservation
events that have occurred since the last processed event, identified by the cursor. If new observations are found, you can perform actions based on them and update the cursor to the latest event's storage ID.