https://dagster.io/ logo
#ask-ai
Title
# ask-ai
j

Justin Bricker

02/20/2024, 10:27 PM
@Scout From within the materialization function of an asset, how can I get metadata from the latest successful materialization of an upstream asset that it depends on?
s

Scout

02/20/2024, 10:27 PM
To access metadata from the latest successful materialization of an upstream asset within the materialization function of a dependent asset, you can use the
context
object provided to the asset function. The
context
object has an
instance
attribute that allows you to query the Dagster event log for materialization events. Here's an example from the documentation that demonstrates how to access metadata from a historical materialization of one asset from inside another asset:
Copy code
python from dagster import asset, AssetKey, Output  @asset def asset1():     # This asset returns some value and records metadata     return Output(value=..., metadata={"num_rows": 5})  @asset  # or op def asset_1_num_rows(context):     instance = context.instance     # Get the latest materialization event for the upstream asset     materialization = instance.get_latest_materialization_event(AssetKey(["asset1"])).asset_materialization     # Access the metadata from the latest materialization     return materialization.metadata["num_rows"]
In this example,
asset_1_num_rows
is a dependent asset that retrieves the metadata from the latest materialization of
asset1
. The
get_latest_materialization_event
method is used to fetch the most recent materialization event for the specified
AssetKey
, and then the metadata is accessed from that materialization event. Please note that this approach assumes that the assets are defined within the same Dagster instance and that the event log is accessible from the context in which the asset function is running.
j

Justin Bricker

02/20/2024, 10:53 PM
@Scout Is it expected that asset keys are directly translatable in to information that tells us where the asset exists? It looks like most of the examples in the
dagster
repo are using the asset_key.path to determine where assets are saved when defining the
load_input
function of iomanagers.
s

Scout

02/20/2024, 10:53 PM
Yes, it is expected that asset keys can be used to determine where an asset exists, especially when defining custom I/O managers. The asset key typically corresponds to the logical location of an asset, and it is often used within I/O managers to construct the path to the asset's data. The
asset_key.path
attribute is a list of strings that represents the hierarchical structure of the asset key. When defining the
load_input
and
handle_output
methods of a custom I/O manager, you can use the
asset_key.path
to construct file paths or object keys that correspond to the asset's storage location. Here's an example from the documentation that shows how an I/O manager might use the
asset_key.path
to determine file paths:
Copy code
python from dagster import AssetKey, ConfigurableIOManager, OutputContext, InputContext  class MyIOManager(ConfigurableIOManager):     root_path: str      def _get_path(self, asset_key: AssetKey) -> str:         return self.root_path + "/".join(asset_key.path)      def handle_output(self, context: OutputContext, obj):         # Write the object to the path determined by the asset key         write_csv(self._get_path(context.asset_key), obj)      def load_input(self, context: InputContext):         # Read the object from the path determined by the asset key         return read_csv(self._get_path(context.asset_key))
In this example, the
_get_path
method constructs a file path using the
root_path
of the I/O manager and the
asset_key.path
. This path is then used to read or write data in the
load_input
and
handle_output
methods, respectively. The use of
asset_key.path
for determining storage locations is a common pattern in Dagster, as it provides a consistent and logical way to map asset keys to their physical storage locations.
j

Justin Bricker

02/21/2024, 9:53 AM
@Scout Can an
asset_check
be used to fail a materialization and prevent the rest of the materialization from finishing
s

Scout

02/21/2024, 9:54 AM
Yes, an
asset_check
can be used to fail a materialization and prevent the rest of the materialization from finishing. An
asset_check
is a function that you can define to perform validations or checks on an asset's data. If the check fails, it can raise an exception or return an
AssetCheckResult
with
success=False
, which will cause the materialization to fail. Here's an example from the documentation that shows how an
asset_check
can be used to fail a materialization:
Copy code
python from dagster import asset, AssetCheckResult, asset_check  @asset def my_asset():     # ... compute the asset's data ...     return data  @asset_check(asset=my_asset) def my_asset_check(context, data):     # Perform some validation on the data     if not is_data_valid(data):         # Return an AssetCheckResult with success=False to fail the materialization         return AssetCheckResult(success=False, description="Data validation failed.")     # If the check passes, return an AssetCheckResult with success=True     return AssetCheckResult(success=True)
In this example, if
is_data_valid(data)
returns
False
, the
my_asset_check
function will return an
AssetCheckResult
with
success=False
, which will cause the materialization of
my_asset
to fail. If the check passes, it returns an
AssetCheckResult
with
success=True
, allowing the materialization to proceed. It's important to note that the behavior of asset checks and their impact on materialization can be configured based on the severity level of the check and the desired workflow. The documentation provides guidance on how to use asset checks effectively to enforce data contracts and ensure data quality within your Dagster pipelines.
2 Views