https://dagster.io/ logo
#ask-ai
Title
# ask-ai
j

Justin Bricker

02/21/2024, 3:48 PM
@Scout can you have asset_checks on an @observed_source_asset?
s

Scout

02/21/2024, 3:48 PM
Based on the documentation,
@observable_source_asset
is used to define source assets with observation functions that return a
DataVersion
. When the observation function is run, an
AssetObservation
will be generated for the source asset and tagged with the returned data version. However, the documentation does not explicitly mention the use of
@asset_check
in conjunction with
@observable_source_asset
. Asset checks are typically used to define checks that can be executed to verify certain properties or expectations of an asset. These checks can be defined using the
@asset_check
decorator and can be associated with assets to ensure data quality. Since the documentation does not provide a clear example or statement about using
@asset_check
on an
@observable_source_asset
, it is not certain whether this is supported or recommended. If you need to perform checks on source assets, you might consider defining separate asset checks that target the assets materialized from the source data, rather than directly on the
@observable_source_asset
. Asset Observations | Dagster button auto-observe source assets button DBT source asset checks button Introducing Asset Checks | Dagster Blog button Thank you for your feedback!
j

Justin Bricker

02/21/2024, 4:54 PM
@Scout How can I get previous asset check metadata values from within an asset check?
s

Scout

02/21/2024, 4:54 PM
To access previous asset check metadata values from within an asset check, you can use the context object provided to the asset check function to query the Dagster instance for historical events related to the asset. Specifically, you can use methods like
context.instance.get_event_records
or
context.instance.get_latest_materialization_event
to retrieve historical materialization events, which may include metadata from previous asset checks. Here's a general approach to accessing previous asset check metadata values within an asset check: 1. Use the
context.instance
to access the Dagster instance. 2. Query the event log for the asset's historical materialization events using
get_event_records
with an
EventRecordsFilter
that specifies the asset key and the event type
ASSET_MATERIALIZATION
. 3. Access the metadata from the retrieved materialization events. Below is an example of how you might implement this in an asset check function:
Copy code
python from dagster import asset_check, AssetCheckContext, AssetCheckResult from dagster.core.events.log import EventRecordsFilter from dagster.core.events import DagsterEventType  @asset_check(asset=my_asset) def my_asset_check(context: AssetCheckContext):     # Retrieve historical materialization events for the asset     records = context.instance.get_event_records(         EventRecordsFilter(             event_type=DagsterEventType.ASSET_MATERIALIZATION,             asset_key=context.asset_key,         ),         limit=10,  # Adjust the limit as needed     )      # Extract metadata values from the historical events     historical_metadata_values = [         record.event_log_entry.dagster_event.asset_materialization.metadata         for record in records     ]      # Perform your check using the historical metadata values     # ...      # Return an AssetCheckResult based on your check     return AssetCheckResult(         success=True,         metadata={"my_check_metadata": "value"},  # Include any relevant metadata for the check     )
In this example,
my_asset
would be replaced with the actual asset you are checking, and you would include logic to perform your specific check using the historical metadata values. Please note that the exact implementation details may vary depending on the version of Dagster you are using and the structure of your assets and checks. Always refer to the latest Dagster documentation for the most up-to-date guidance on implementing asset checks and accessing historical metadata.
j

Justin Bricker

02/21/2024, 5:51 PM
@Scout
Copy code
historical_metadata_values = [
        record.event_log_entry.dagster_event.asset_materialization.metadata
        for record in records
    ]
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "<string>", line 2, in <listcomp>
AttributeError: 'function' object has no attribute 'metadata'
s

Scout

02/21/2024, 5:51 PM
It appears there was a misunderstanding in the example provided. The
record.event_log_entry.dagster_event.asset_materialization
is not directly accessible as it was described. Instead, you should access the
DagsterEvent
object's
event_specific_data
attribute, which contains the
AssetMaterialization
data when the event type is an asset materialization. Here's how you can correctly access the metadata from the historical materialization events:
Copy code
python from dagster import asset_check, AssetCheckContext, AssetCheckResult from dagster.core.events.log import EventRecordsFilter from dagster.core.events import DagsterEventType  @asset_check(asset=my_asset) def my_asset_check(context: AssetCheckContext):     # Retrieve historical materialization events for the asset     records = context.instance.get_event_records(         EventRecordsFilter(             event_type=DagsterEventType.ASSET_MATERIALIZATION,             asset_key=context.asset_key,         ),         limit=10,  # Adjust the limit as needed     )      # Extract metadata values from the historical events     historical_metadata_values = [         record.dagster_event.event_specific_data.materialization.metadata_entries         for record in records         if record.dagster_event and            record.dagster_event.event_specific_data and            record.dagster_event.event_type == DagsterEventType.ASSET_MATERIALIZATION     ]      # Perform your check using the historical metadata values     # ...      # Return an AssetCheckResult based on your check     return AssetCheckResult(         success=True,         metadata={"my_check_metadata": "value"},  # Include any relevant metadata for the check     )
In this corrected example, we are checking that the
record.dagster_event
and
record.dagster_event.event_specific_data
exist and that the event type is
ASSET_MATERIALIZATION
before attempting to access the
materialization.metadata_entries
. This should prevent the
AttributeError
you encountered. Please adjust the logic within the list comprehension to match the structure of your event records and the metadata you are trying to access. The
metadata_entries
attribute will contain the metadata associated with each materialization event, and you may need to further process this to extract the specific metadata values you are interested in.
j

Justin Bricker

02/21/2024, 5:52 PM
@Scout
Copy code
historical_metadata_values = [
        record.dagster_event.event_specific_data.materialization.metadata_entries
        for record in records
        if record.dagster_event and
           record.dagster_event.event_specific_data and
           record.dagster_event.event_type == DagsterEventType.ASSET_MATERIALIZATION
    ]
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "<string>", line 4, in <listcomp>
AttributeError: 'EventLogRecord' object has no attribute 'dagster_event'
s

Scout

02/21/2024, 5:53 PM
I apologize for the confusion. The error indicates that the
EventLogRecord
object does not have a
dagster_event
attribute directly accessible in the way it was referenced. Instead, you should use the
get_dagster_event
method to retrieve the
DagsterEvent
object from an
EventLogEntry
. Here's how you can correctly access the metadata from the historical materialization events:
Copy code
python from dagster import asset_check, AssetCheckContext, AssetCheckResult from dagster.core.events.log import EventRecordsFilter from dagster.core.events import DagsterEventType  @asset_check(asset=my_asset) def my_asset_check(context: AssetCheckContext):     # Retrieve historical materialization events for the asset     records = context.instance.get_event_records(         EventRecordsFilter(             event_type=DagsterEventType.ASSET_MATERIALIZATION,             asset_key=context.asset_key,         ),         limit=10,  # Adjust the limit as needed     )      # Extract metadata values from the historical events     historical_metadata_values = []     for record in records:         dagster_event = record.event_log_entry.get_dagster_event()         if dagster_event and dagster_event.event_specific_data:             materialization = dagster_event.event_specific_data.materialization             metadata_entries = materialization.metadata_entries             historical_metadata_values.append(metadata_entries)      # Perform your check using the historical metadata values     # ...      # Return an AssetCheckResult based on your check     return AssetCheckResult(         success=True,         metadata={"my_check_metadata": "value"},  # Include any relevant metadata for the check     )
In this corrected example, we use the
get_dagster_event
method to access the
DagsterEvent
from the
EventLogEntry
. Then, we check if the
event_specific_data
is present and extract the
metadata_entries
from the
materialization
. Please adjust the logic to match the structure of your event records and the metadata you are trying to access. The
metadata_entries
attribute will contain the metadata associated with each materialization event, and you may need to further process this to extract the specific metadata values you are interested in.