Brian Stroh
02/25/2024, 4:20 AMScout
02/25/2024, 4:20 AM@asset
decorator. You can attach definition metadata to the asset using the metadata
parameter within the decorator. 2. Enrich Metadata During Execution: During the execution of the asset, you can add or enrich the metadata by returning a MaterializeResult
object from your asset function. The MaterializeResult
allows you to include metadata that describes the contents produced by a particular run. 3. Use MaterializeResult
: To add metadata during execution, you replace the return type annotation from None
to MaterializeResult
and return a MaterializeResult
object with the desired metadata. Here is an example from the documentation that demonstrates how to add a row count and a preview of the asset as metadata during execution: python from dagster import AssetExecutionContext, MetadataValue, asset, MaterializeResult import pandas as pd import json import requests @asset def topstories(context: AssetExecutionContext) -> MaterializeResult: with open("data/topstory_ids.json", "r") as f: topstory_ids = json.load(f) results = [] for item_id in topstory_ids: item = requests.get( f"<https://hacker-news.firebaseio.com/v0/item/{item_id}.json>" ).json() results.append(item) if len(results) % 20 == 0: <http://context.log.info|context.log.info>(f"Got {len(results)} items so far.") df = pd.DataFrame(results) df.to_csv("data/topstories.csv") return MaterializeResult( metadata={ "num_records": len(df), # Metadata can be any key-value pair "preview": MetadataValue.md(df.head().to_markdown()), # The `MetadataValue` class has useful static methods to build Metadata } )
In this example, the topstories
asset function returns a MaterializeResult
with metadata that includes the number of records and a markdown preview of the data. This metadata is added during the execution of the asset and is not part of the initial asset definition. Remember that the metadata you add during execution is specific to the materialization of the asset and can vary from run to run, depending on the data processed or any other dynamic context.Brian Stroh
02/28/2024, 5:27 AMScout
02/28/2024, 5:27 AMmy_op
) within your Dagster job. Specifically, the error is saying that an unexpected configuration entry last_run_time
was provided, and it does not match the expected schema. Here's how you can resolve this error: 1. Review the Configuration Schema: Check the configuration schema for my_op
to understand what configuration it expects. The error message suggests that last_run_time
should be nested within a config
block and can either be a string or an environment variable reference. 2. Correct the Configuration: Update the configuration you are passing to the sensor to match the expected schema. If last_run_time
is intended to be part of the configuration for my_op
, it should be nested correctly. Here's an example of what the corrected configuration might look like: yaml ops: my_op: config: last_run_time: "2021-01-01T00:00:00" # Example of a string value
Or, if you are using an environment variable: yaml ops: my_op: config: last_run_time: env: "MY_ENV_VARIABLE" # Reference to an environment variable
3. Test the Configuration: After updating the configuration, test it to ensure that it is now valid. You can do this by running the sensor or job with the updated configuration and checking if the error persists. 4. Review Sensor Code: If the last_run_time
configuration is being generated or manipulated within the sensor code, review the code to ensure that it constructs the configuration dictionary correctly according to the schema expected by my_op
. 5. Validate Configuration: Optionally, you can use Dagster's configuration validation utilities to programmatically check if the configuration is valid before running the job. This can help catch errors early. If you continue to face issues, you may want to provide more context or the specific code that is causing the error, as that would help in giving a more precise solution.