Alex Berndt
02/27/2024, 6:35 PMScout
02/27/2024, 6:35 PMpython from dagster import asset, AssetKey, Output @asset def asset1(): return Output(value=..., metadata={"num_rows": 5}) @asset # or op def asset_1_num_rows(context): instance = context.instance materialization = instance.get_latest_materialization_event(AssetKey(["asset1"])).asset_materialization return materialization.metadata["num_rows"]
In this example, the asset_1_num_rows
function retrieves the metadata from the latest materialization event of the asset1
by using the context.instance.get_latest_materialization_event
method and accessing the asset_materialization
attribute. The metadata is then accessed by key, in this case, "num_rows"
. This approach allows you to access historical materialization metadata within the context of an asset or operation.Alex Berndt
02/27/2024, 9:24 PMScout
02/27/2024, 9:24 PMget_latest_materialization_event
method of the DagsterInstance
class, specifying the AssetKey
and the partition key. Here's an example code snippet that demonstrates how to do this: python from dagster import asset, AssetKey, DagsterInstance @asset def my_asset(): # Your asset logic here pass def get_latest_materialization_for_partition(asset_key, partition_key): instance = DagsterInstance.get() # Get the current Dagster instance event_records = instance.get_event_records( EventRecordsFilter( event_type=DagsterEventType.ASSET_MATERIALIZATION, asset_key=asset_key, asset_partitions=[partition_key], ), limit=1, ) if event_records: return event_records[0].event_log_entry.dagster_event.asset_materialization else: return None # Example usage: asset_key = AssetKey(["my_asset"]) partition_key = "2023-01-01" # Replace with your specific partition key latest_materialization = get_latest_materialization_for_partition(asset_key, partition_key) if latest_materialization: print(f"Latest materialization metadata: {latest_materialization.metadata}") else: print("No materialization found for the specified partition.")
In this example, get_latest_materialization_for_partition
is a function that takes an AssetKey
and a partition_key
as arguments. It retrieves the latest materialization event for the specified partition of the asset. If a materialization event is found, it returns the asset_materialization
object, from which you can access the metadata. If no materialization event is found for the specified partition, it returns None
. Please note that you need to replace "my_asset"
with the actual name of your asset and "2023-01-01"
with the partition key you are interested in.Alex Berndt
02/27/2024, 9:36 PMevent_records[0].event_log_entry.dagster_event.asset_materialization
does not seem correct. asset_materialization
is a function.Scout
02/27/2024, 9:36 PMasset_materialization
is indeed not a function that can be called in that context. Instead, you should access the AssetMaterialization
event directly from the event_log_entry
object, which is part of the EventLogRecord
returned by get_event_records
. Here's the corrected code snippet: python from dagster import DagsterInstance, EventRecordsFilter, DagsterEventType, AssetKey def get_latest_materialization_for_partition(asset_key, partition_key): instance = DagsterInstance.get() # Get the current Dagster instance event_records = instance.get_event_records( EventRecordsFilter( event_type=DagsterEventType.ASSET_MATERIALIZATION, asset_key=asset_key, asset_partitions=[partition_key], ), limit=1, ) if event_records: return event_records[0].event_log_entry # This is the EventLogEntry object else: return None # Example usage: asset_key = AssetKey(["my_asset"]) partition_key = "2023-01-01" # Replace with your specific partition key latest_materialization_entry = get_latest_materialization_for_partition(asset_key, partition_key) if latest_materialization_entry: print(f"Latest materialization metadata: {latest_materialization_entry.dagster_event.event_specific_data.materialization.metadata_entries}") else: print("No materialization found for the specified partition.")
In this corrected snippet, get_latest_materialization_for_partition
will return the EventLogEntry
object if a materialization event is found. You can then access the materialization
object and its metadata_entries
to get the metadata for the latest materialization of the specified partition. If no materialization event is found for the specified partition, it returns None
.