Justin Bricker
02/21/2024, 3:48 PMScout
02/21/2024, 3:48 PM@observable_source_asset
is used to define source assets with observation functions that return a DataVersion
. When the observation function is run, an AssetObservation
will be generated for the source asset and tagged with the returned data version. However, the documentation does not explicitly mention the use of @asset_check
in conjunction with @observable_source_asset
.
Asset checks are typically used to define checks that can be executed to verify certain properties or expectations of an asset. These checks can be defined using the @asset_check
decorator and can be associated with assets to ensure data quality.
Since the documentation does not provide a clear example or statement about using @asset_check
on an @observable_source_asset
, it is not certain whether this is supported or recommended. If you need to perform checks on source assets, you might consider defining separate asset checks that target the assets materialized from the source data, rather than directly on the @observable_source_asset
. Asset Observations | Dagster button auto-observe source assets button DBT source asset checks button Introducing Asset Checks | Dagster Blog button Thank you for your feedback!Justin Bricker
02/21/2024, 4:54 PMScout
02/21/2024, 4:54 PMcontext.instance.get_event_records
or context.instance.get_latest_materialization_event
to retrieve historical materialization events, which may include metadata from previous asset checks. Here's a general approach to accessing previous asset check metadata values within an asset check: 1. Use the context.instance
to access the Dagster instance. 2. Query the event log for the asset's historical materialization events using get_event_records
with an EventRecordsFilter
that specifies the asset key and the event type ASSET_MATERIALIZATION
. 3. Access the metadata from the retrieved materialization events. Below is an example of how you might implement this in an asset check function: python from dagster import asset_check, AssetCheckContext, AssetCheckResult from dagster.core.events.log import EventRecordsFilter from dagster.core.events import DagsterEventType @asset_check(asset=my_asset) def my_asset_check(context: AssetCheckContext): # Retrieve historical materialization events for the asset records = context.instance.get_event_records( EventRecordsFilter( event_type=DagsterEventType.ASSET_MATERIALIZATION, asset_key=context.asset_key, ), limit=10, # Adjust the limit as needed ) # Extract metadata values from the historical events historical_metadata_values = [ record.event_log_entry.dagster_event.asset_materialization.metadata for record in records ] # Perform your check using the historical metadata values # ... # Return an AssetCheckResult based on your check return AssetCheckResult( success=True, metadata={"my_check_metadata": "value"}, # Include any relevant metadata for the check )
In this example, my_asset
would be replaced with the actual asset you are checking, and you would include logic to perform your specific check using the historical metadata values. Please note that the exact implementation details may vary depending on the version of Dagster you are using and the structure of your assets and checks. Always refer to the latest Dagster documentation for the most up-to-date guidance on implementing asset checks and accessing historical metadata.Justin Bricker
02/21/2024, 5:51 PMhistorical_metadata_values = [
record.event_log_entry.dagster_event.asset_materialization.metadata
for record in records
]
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "<string>", line 2, in <listcomp>
AttributeError: 'function' object has no attribute 'metadata'
Scout
02/21/2024, 5:51 PMrecord.event_log_entry.dagster_event.asset_materialization
is not directly accessible as it was described. Instead, you should access the DagsterEvent
object's event_specific_data
attribute, which contains the AssetMaterialization
data when the event type is an asset materialization. Here's how you can correctly access the metadata from the historical materialization events: python from dagster import asset_check, AssetCheckContext, AssetCheckResult from dagster.core.events.log import EventRecordsFilter from dagster.core.events import DagsterEventType @asset_check(asset=my_asset) def my_asset_check(context: AssetCheckContext): # Retrieve historical materialization events for the asset records = context.instance.get_event_records( EventRecordsFilter( event_type=DagsterEventType.ASSET_MATERIALIZATION, asset_key=context.asset_key, ), limit=10, # Adjust the limit as needed ) # Extract metadata values from the historical events historical_metadata_values = [ record.dagster_event.event_specific_data.materialization.metadata_entries for record in records if record.dagster_event and record.dagster_event.event_specific_data and record.dagster_event.event_type == DagsterEventType.ASSET_MATERIALIZATION ] # Perform your check using the historical metadata values # ... # Return an AssetCheckResult based on your check return AssetCheckResult( success=True, metadata={"my_check_metadata": "value"}, # Include any relevant metadata for the check )
In this corrected example, we are checking that the record.dagster_event
and record.dagster_event.event_specific_data
exist and that the event type is ASSET_MATERIALIZATION
before attempting to access the materialization.metadata_entries
. This should prevent the AttributeError
you encountered. Please adjust the logic within the list comprehension to match the structure of your event records and the metadata you are trying to access. The metadata_entries
attribute will contain the metadata associated with each materialization event, and you may need to further process this to extract the specific metadata values you are interested in.Justin Bricker
02/21/2024, 5:52 PMhistorical_metadata_values = [
record.dagster_event.event_specific_data.materialization.metadata_entries
for record in records
if record.dagster_event and
record.dagster_event.event_specific_data and
record.dagster_event.event_type == DagsterEventType.ASSET_MATERIALIZATION
]
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "<string>", line 4, in <listcomp>
AttributeError: 'EventLogRecord' object has no attribute 'dagster_event'
Scout
02/21/2024, 5:53 PMEventLogRecord
object does not have a dagster_event
attribute directly accessible in the way it was referenced. Instead, you should use the get_dagster_event
method to retrieve the DagsterEvent
object from an EventLogEntry
. Here's how you can correctly access the metadata from the historical materialization events: python from dagster import asset_check, AssetCheckContext, AssetCheckResult from dagster.core.events.log import EventRecordsFilter from dagster.core.events import DagsterEventType @asset_check(asset=my_asset) def my_asset_check(context: AssetCheckContext): # Retrieve historical materialization events for the asset records = context.instance.get_event_records( EventRecordsFilter( event_type=DagsterEventType.ASSET_MATERIALIZATION, asset_key=context.asset_key, ), limit=10, # Adjust the limit as needed ) # Extract metadata values from the historical events historical_metadata_values = [] for record in records: dagster_event = record.event_log_entry.get_dagster_event() if dagster_event and dagster_event.event_specific_data: materialization = dagster_event.event_specific_data.materialization metadata_entries = materialization.metadata_entries historical_metadata_values.append(metadata_entries) # Perform your check using the historical metadata values # ... # Return an AssetCheckResult based on your check return AssetCheckResult( success=True, metadata={"my_check_metadata": "value"}, # Include any relevant metadata for the check )
In this corrected example, we use the get_dagster_event
method to access the DagsterEvent
from the EventLogEntry
. Then, we check if the event_specific_data
is present and extract the metadata_entries
from the materialization
. Please adjust the logic to match the structure of your event records and the metadata you are trying to access. The metadata_entries
attribute will contain the metadata associated with each materialization event, and you may need to further process this to extract the specific metadata values you are interested in.