https://dagster.io/ logo
#ask-ai
Title
# ask-ai
a

Alex Berndt

02/27/2024, 6:35 PM
Can I access the metadata of an assetmaterialization event from within the asset's context?
s

Scout

02/27/2024, 6:35 PM
Yes, you can access the metadata of an asset materialization event from within the asset's context. The documentation provides an example of how to do this:
Copy code
python from dagster import asset, AssetKey, Output  @asset def asset1():     return Output(value=..., metadata={"num_rows": 5})  @asset  # or op def asset_1_num_rows(context):     instance = context.instance     materialization = instance.get_latest_materialization_event(AssetKey(["asset1"])).asset_materialization     return materialization.metadata["num_rows"]
In this example, the
asset_1_num_rows
function retrieves the metadata from the latest materialization event of the
asset1
by using the
context.instance.get_latest_materialization_event
method and accessing the
asset_materialization
attribute. The metadata is then accessed by key, in this case,
"num_rows"
. This approach allows you to access historical materialization metadata within the context of an asset or operation.
a

Alex Berndt

02/27/2024, 9:24 PM
@Scout I need the latest materialization for a specific partition. Please provide me with a code snippet on how to do this.
s

Scout

02/27/2024, 9:24 PM
To retrieve the latest materialization event for a specific partition of an asset, you can use the
get_latest_materialization_event
method of the
DagsterInstance
class, specifying the
AssetKey
and the partition key. Here's an example code snippet that demonstrates how to do this:
Copy code
python from dagster import asset, AssetKey, DagsterInstance  @asset def my_asset():     # Your asset logic here     pass  def get_latest_materialization_for_partition(asset_key, partition_key):     instance = DagsterInstance.get()  # Get the current Dagster instance     event_records = instance.get_event_records(         EventRecordsFilter(             event_type=DagsterEventType.ASSET_MATERIALIZATION,             asset_key=asset_key,             asset_partitions=[partition_key],         ),         limit=1,     )     if event_records:         return event_records[0].event_log_entry.dagster_event.asset_materialization     else:         return None  # Example usage: asset_key = AssetKey(["my_asset"]) partition_key = "2023-01-01"  # Replace with your specific partition key latest_materialization = get_latest_materialization_for_partition(asset_key, partition_key) if latest_materialization:     print(f"Latest materialization metadata: {latest_materialization.metadata}") else:     print("No materialization found for the specified partition.")
In this example,
get_latest_materialization_for_partition
is a function that takes an
AssetKey
and a
partition_key
as arguments. It retrieves the latest materialization event for the specified partition of the asset. If a materialization event is found, it returns the
asset_materialization
object, from which you can access the metadata. If no materialization event is found for the specified partition, it returns
None
. Please note that you need to replace
"my_asset"
with the actual name of your asset and
"2023-01-01"
with the partition key you are interested in.
a

Alex Berndt

02/27/2024, 9:36 PM
@Scout This portion
event_records[0].event_log_entry.dagster_event.asset_materialization
does not seem correct.
asset_materialization
is a function.
s

Scout

02/27/2024, 9:36 PM
You are correct; my apologies for the confusion. The
asset_materialization
is indeed not a function that can be called in that context. Instead, you should access the
AssetMaterialization
event directly from the
event_log_entry
object, which is part of the
EventLogRecord
returned by
get_event_records
. Here's the corrected code snippet:
Copy code
python from dagster import DagsterInstance, EventRecordsFilter, DagsterEventType, AssetKey  def get_latest_materialization_for_partition(asset_key, partition_key):     instance = DagsterInstance.get()  # Get the current Dagster instance     event_records = instance.get_event_records(         EventRecordsFilter(             event_type=DagsterEventType.ASSET_MATERIALIZATION,             asset_key=asset_key,             asset_partitions=[partition_key],         ),         limit=1,     )     if event_records:         return event_records[0].event_log_entry  # This is the EventLogEntry object     else:         return None  # Example usage: asset_key = AssetKey(["my_asset"]) partition_key = "2023-01-01"  # Replace with your specific partition key latest_materialization_entry = get_latest_materialization_for_partition(asset_key, partition_key) if latest_materialization_entry:     print(f"Latest materialization metadata: {latest_materialization_entry.dagster_event.event_specific_data.materialization.metadata_entries}") else:     print("No materialization found for the specified partition.")
In this corrected snippet,
get_latest_materialization_for_partition
will return the
EventLogEntry
object if a materialization event is found. You can then access the
materialization
object and its
metadata_entries
to get the metadata for the latest materialization of the specified partition. If no materialization event is found for the specified partition, it returns
None
.
2 Views