Tara Larrue
02/14/2024, 1:10 PMScout
02/14/2024, 1:10 PMcontext
object provided to the asset function to interact with the Dagster instance and query the event log for observation events related to the asset. The documentation provides an example of how to access the latest observation record for an asset: python from dagster import asset, AssetKey, DagsterEventType, EventRecordsFilter @asset def downstream_asset(context, upstream_asset): latest_observation_record = context.instance.get_event_records( EventRecordsFilter( event_type=DagsterEventType.ASSET_OBSERVATION, asset_key=AssetKey(["upstream_asset"]), ), ascending=False, limit=1, )[0] metadata = latest_observation_record.event_log_entry.dagster_event.event_specific_data.asset_observation.metadata # Use the metadata as needed within your asset function
In this example, context.instance.get_event_records
is used to fetch the latest observation event for the asset with the key ["upstream_asset"]
. The EventRecordsFilter
is configured to filter for ASSET_OBSERVATION
events, and the limit=1
ensures that only the most recent observation is fetched. The metadata associated with this observation can then be accessed and used within the asset function as needed. Please note that this example assumes that you are working within an asset function and that you have the necessary context available to perform the query. If you are outside of an asset function, you would need to obtain a reference to the DagsterInstance
to perform similar queries.Tara Larrue
02/14/2024, 1:14 PMScout
02/14/2024, 1:14 PMcontext.instance
to query the event log for the asset's observation events. However, the specific method to retrieve the timestamp may vary depending on the available APIs in that version. The documentation provides an example of how to find the latest observation for an asset using a sensor, which can be adapted to retrieve the timestamp within an asset function: python from dagster import sensor, DagsterEventType, EventRecordsFilter, AssetKey @sensor def asset_observation_sensor(context): instance = context.instance records = instance.get_event_records( event_records_filter=EventRecordsFilter( event_type=DagsterEventType.ASSET_OBSERVATION, asset_key=AssetKey(["your_asset_name"]), ), limit=1, ) if records: latest_observation = records[0] timestamp = latest_observation.timestamp # Use the timestamp as needed
In this example, latest_observation.timestamp
would give you the exact time of the latest observation record. You can adapt this code to work within an asset function by using the context
provided to the asset function to access the DagsterInstance
and perform the query. Please ensure that the APIs used in this example are available in Dagster version 1.6.3, as there may have been changes in the API in subsequent versions. If the method get_event_records
or the properties used are not available in version 1.6.3, you may need to refer to the version-specific documentation or upgrade to a newer version of Dagster to use these features.