Justin Bricker
02/20/2024, 10:27 PMScout
02/20/2024, 10:27 PMcontext
object provided to the asset function. The context
object has an instance
attribute that allows you to query the Dagster event log for materialization events. Here's an example from the documentation that demonstrates how to access metadata from a historical materialization of one asset from inside another asset: python from dagster import asset, AssetKey, Output @asset def asset1(): # This asset returns some value and records metadata return Output(value=..., metadata={"num_rows": 5}) @asset # or op def asset_1_num_rows(context): instance = context.instance # Get the latest materialization event for the upstream asset materialization = instance.get_latest_materialization_event(AssetKey(["asset1"])).asset_materialization # Access the metadata from the latest materialization return materialization.metadata["num_rows"]
In this example, asset_1_num_rows
is a dependent asset that retrieves the metadata from the latest materialization of asset1
. The get_latest_materialization_event
method is used to fetch the most recent materialization event for the specified AssetKey
, and then the metadata is accessed from that materialization event. Please note that this approach assumes that the assets are defined within the same Dagster instance and that the event log is accessible from the context in which the asset function is running.Justin Bricker
02/20/2024, 10:53 PMdagster
repo are using the asset_key.path to determine where assets are saved when defining the load_input
function of iomanagers.Scout
02/20/2024, 10:53 PMasset_key.path
attribute is a list of strings that represents the hierarchical structure of the asset key. When defining the load_input
and handle_output
methods of a custom I/O manager, you can use the asset_key.path
to construct file paths or object keys that correspond to the asset's storage location. Here's an example from the documentation that shows how an I/O manager might use the asset_key.path
to determine file paths: python from dagster import AssetKey, ConfigurableIOManager, OutputContext, InputContext class MyIOManager(ConfigurableIOManager): root_path: str def _get_path(self, asset_key: AssetKey) -> str: return self.root_path + "/".join(asset_key.path) def handle_output(self, context: OutputContext, obj): # Write the object to the path determined by the asset key write_csv(self._get_path(context.asset_key), obj) def load_input(self, context: InputContext): # Read the object from the path determined by the asset key return read_csv(self._get_path(context.asset_key))
In this example, the _get_path
method constructs a file path using the root_path
of the I/O manager and the asset_key.path
. This path is then used to read or write data in the load_input
and handle_output
methods, respectively. The use of asset_key.path
for determining storage locations is a common pattern in Dagster, as it provides a consistent and logical way to map asset keys to their physical storage locations.Justin Bricker
02/21/2024, 9:53 AMasset_check
be used to fail a materialization and prevent the rest of the materialization from finishingScout
02/21/2024, 9:54 AMasset_check
can be used to fail a materialization and prevent the rest of the materialization from finishing. An asset_check
is a function that you can define to perform validations or checks on an asset's data. If the check fails, it can raise an exception or return an AssetCheckResult
with success=False
, which will cause the materialization to fail. Here's an example from the documentation that shows how an asset_check
can be used to fail a materialization: python from dagster import asset, AssetCheckResult, asset_check @asset def my_asset(): # ... compute the asset's data ... return data @asset_check(asset=my_asset) def my_asset_check(context, data): # Perform some validation on the data if not is_data_valid(data): # Return an AssetCheckResult with success=False to fail the materialization return AssetCheckResult(success=False, description="Data validation failed.") # If the check passes, return an AssetCheckResult with success=True return AssetCheckResult(success=True)
In this example, if is_data_valid(data)
returns False
, the my_asset_check
function will return an AssetCheckResult
with success=False
, which will cause the materialization of my_asset
to fail. If the check passes, it returns an AssetCheckResult
with success=True
, allowing the materialization to proceed. It's important to note that the behavior of asset checks and their impact on materialization can be configured based on the severity level of the check and the desired workflow. The documentation provides guidance on how to use asset checks effectively to enforce data contracts and ensure data quality within your Dagster pipelines.