Brian Stroh
03/04/2024, 7:13 PMScout
03/04/2024, 7:13 PM@asset
decorator. This asset will be the target of the asset job. 2. Define the Asset Job: Use define_asset_job
to create a job that will run computations on your asset. This job will be triggered by the sensor. 3. Define the Sensor: Create an asset sensor using the @asset_sensor
decorator. In the sensor function, you will create a RunRequest
that includes the cursor value in the run_config
that will be passed to the asset job. Here's an example of how you might set this up, based on the documentation: python from dagster import asset, define_asset_job, asset_sensor, RunRequest, SensorEvaluationContext, AssetKey # Define your asset @asset def my_asset(): # Asset computation logic here pass # Define your asset job my_asset_job = define_asset_job(name="my_asset_job", selection="my_asset") # Define your asset sensor @asset_sensor(asset_key=AssetKey("my_asset"), job=my_asset_job) def my_asset_sensor(context: SensorEvaluationContext): # Logic to determine if a RunRequest should be created # For example, checking for new materializations or other conditions # ... # Create a RunRequest with the cursor value in the run_config return RunRequest( run_key=context.cursor, # Use the sensor's cursor as the run key run_config={ "ops": { "my_asset": { "config": { "param": context.cursor # Pass the cursor value to the asset's op config } } } }, job_name="my_asset_job" )
In this example, the my_asset_sensor
function checks for certain conditions (like new materializations) and creates a RunRequest
when those conditions are met. The RunRequest
includes the cursor value in the run_config
under the ops
dictionary, which corresponds to the asset's op config. This way, when the asset job is triggered by the sensor, it receives the cursor value as part of its configuration. Please note that the actual logic for when to trigger a run and how to handle the cursor will depend on your specific use case. The cursor is typically used to keep track of the sensor's state between invocations, ensuring that each run processes new data since the last run.Brian Stroh
03/04/2024, 7:54 PMScout
03/04/2024, 7:54 PMmulti_asset_sensor
or a custom sensor, you may need to manually manage the cursor using context.update_cursor(...)
within the sensor function. If you forget to update the cursor or if the update is not done correctly, the cursor will not change.
2. Sensor Evaluation Errors: If there are errors during the sensor evaluation that prevent the sensor function from completing successfully, the cursor may not be updated. Check for any exceptions or errors in your sensor code.
3. SensorResult Issues: As mentioned in a GitHub issue, a MultiAssetSensor
definition that returns a SensorResult
object does not update the sensor cursor if not explicitly included in the response, even though the evaluation context has advanced the cursor. This behavior is expected when run requests are returned, but if you're using SensorResult
, you may need to ensure the cursor is being managed correctly.
4. Cursor Overwriting: There was an issue raised where the asset sensor would silently overwrite the user-provided cursor, leading to confusing results. If you are setting the cursor in an @asset_sensor
-decorated function, it might be getting overwritten by the system.
5. Sensor Configuration: Ensure that your sensor is configured correctly in your repository. If there is a misconfiguration, the sensor may not be running as expected, which could affect cursor updates.
6. Dagster Version: There might be a bug in the version of Dagster you are using that affects cursor behavior. Check the Dagster release notes and GitHub issues for any known bugs related to sensors and cursors.
7. Dagster Instance Issues: If there are issues with the Dagster instance, such as database connectivity problems or permission issues, it might affect the sensor's ability to update its cursor.
To troubleshoot the issue, you can:
- Check the sensor logs for any errors or warnings that might indicate why the cursor is not updating.
- Ensure that you are using the latest version of Dagster or a version where any known bugs related to your issue have been resolved.
- Review your sensor code to make sure the cursor is being updated correctly within the sensor evaluation function.
- If you are using SensorResult
, ensure that the cursor is managed as expected and not being inadvertently overwritten.
- Check the Dagster instance and its connectivity to the underlying metadata database to ensure there are no systemic issues affecting sensor execution.
If you continue to experience issues, you may want to consult the Dagster community or support channels with specific details about your sensor code and the behavior you'rScout
03/04/2024, 7:54 PMBrian Stroh
03/04/2024, 9:31 PMScout
03/04/2024, 9:31 PMAssetMaterialization
event to record metadata about the plot, such as its file path, in the Dagster asset catalog. Here's an example of how you might implement this in code: python from dagster import asset, AssetMaterialization, Output import matplotlib.pyplot as plt # Function to save the plot to a file def save_plot(data, filename): fig, ax = plt.subplots() ax.plot(data) fig.savefig(filename) plt.close(fig) # Asset that generates and saves a plot @asset def my_plot_asset(): data = [1, 2, 3, 4] # Replace with your actual data filename = "my_plot.png" # Replace with your desired file path save_plot(data, filename) # Log the materialization event for the plot file yield AssetMaterialization(asset_key="my_plot_asset", description="Plot saved to file.", metadata={"path": filename}) # Return the file path as the output of the asset yield Output(filename)
In this example, my_plot_asset
is an asset that generates a plot, saves it to a file, logs the materialization event, and outputs the file path. The file itself can be stored and managed using an I/O manager, and the file path can be used by downstream assets or processes that need to access the plot. Remember that the actual implementation details may vary depending on your specific use case, the plotting library you're using, and how you want to manage the plot files within your Dagster repository.Brian Stroh
03/07/2024, 4:59 PMScout
03/07/2024, 4:59 PMmanifest.json
file, which contains metadata about all the models in your dbt project. The manifest.json
file includes details such as the schema and name for each model.
Here's a general approach to using a dbt CLI resource to get the schema from a specific data model:
1. Set Up the Dbt CLI Resource: Define a dbt CLI resource in your Dagster repository. This resource will be configured with the necessary dbt project directory and profiles directory.
2. Invoke Dbt Commands: Use the dbt CLI resource to run dbt commands that generate or use the manifest.json
file.
3. Parse the Manifest File: After running the dbt command, parse the manifest.json
file to extract the schema for the specific model you're interested in.
Here's an example of how you might set up and use the dbt CLI resource in Dagster:
python
from dagster import resource, op, Out
from dagster_dbt import dbt_cli_resource
import json
# Define the dbt CLI resource
@resource(config_schema={"project_dir": str, "profiles_dir": str})
def my_dbt_cli_resource(context):
return dbt_cli_resource.configured({
"project_dir": context.resource_config["project_dir"],
"profiles_dir": context.resource_config["profiles_dir"],
})
# Op that uses the dbt CLI resource to get the schema of a specific model
@op(required_resource_keys={"dbt"}, out=Out(str))
def get_dbt_model_schema(context, model_name):
# Run the dbt command that generates the manifest.json file
context.resources.dbt.run(["deps"])
context.resources.dbt.run(["compile"])
# Load the manifest.json file
manifest_path = context.resources.dbt.config["project_dir"] + "/target/manifest.json"
with open(manifest_path) as f:
manifest = json.load(f)
# Extract the schema for the specific model
model_key = f"model.{context.resources.dbt.config['project_name']}.{model_name}"
schema = manifest['nodes'][model_key]['schema']
return schema
# Example usage in a job
@job(resource_defs={"dbt": my_dbt_cli_resource})
def my_dbt_job():
get_dbt_model_schema()
In this example, the my_dbt_cli_resource
resource is defined to configure the dbt CLI with the necessary project and profiles directories. The get_dbt_model_schema
op uses this resource to run dbt commands and then parses the manifest.json
file to get the schemaScout
03/07/2024, 5:00 PMBrian Stroh
03/07/2024, 9:41 PMScout
03/07/2024, 9:41 PMMetadataValue
due to its potentially large size and complex structure. Instead, you can include metadata that provides a reference to the DataFrame, such as a file path or a URL where the DataFrame has been saved. To make a Pandas DataFrame available for download from Dagster Cloud, you would typically write the DataFrame to a file in a supported format (like CSV or Parquet) and store that file in a location accessible to users, such as a cloud storage bucket. You can then log an AssetMaterialization
event with a MetadataValue
that includes the path or URL to the file. Here's an example of how you might do this: python from dagster import asset, AssetMaterialization, MetadataValue import pandas as pd # Function to save the DataFrame to a file and return the file path def save_dataframe(df, filename): df.to_csv(filename, index=False) return filename # Asset that generates a DataFrame and saves it to a file @asset def my_asset(): # Generate or load your DataFrame df = pd.DataFrame({'a': [1, 2, 3], 'b': [4, 5, 6]}) # Save the DataFrame to a CSV file file_path = save_dataframe(df, 'my_dataframe.csv') # Log the materialization event with the file path yield AssetMaterialization( description="DataFrame saved as CSV.", metadata={ "file_path": MetadataValue.path(file_path), # If the file is stored in a location with a URL, you can use MetadataValue.url # "file_url": MetadataValue.url("<https://example.com/path/to/my_dataframe.csv>") } ) # Return the DataFrame as the output of the asset yield Output(df)
In this example, the my_asset
function generates a DataFrame, saves it to a CSV file, and logs an AssetMaterialization
event with the file path. Users can then download the DataFrame from the provided file path or URL. If you're using Dagster Cloud, you'll need to ensure that the file storage location you're using (such as AWS S3, Google Cloud Storage, or Azure Blob Storage) is accessible to your users and that appropriate permissions are set up to allow for file downloads. Remember to always check the latest Dagster documentation for the most current best practices and APIs for working with assets and metadata.Brian Stroh
03/07/2024, 10:33 PMScout
03/07/2024, 10:33 PMAssetMaterialization
event. However, once a failure occurs, the execution of the asset is halted, and any subsequent code, including the emission of metadata, will not be executed. Therefore, you need to attach the metadata before raising the failure.
Here's an example of how you might attach metadata to an asset's output before intentionally failing:
python
from dagster import asset, AssetMaterialization, MetadataValue, Failure
@asset
def my_asset():
# Some logic that may lead to failure
try:
# Your asset's computation logic here
pass
except Exception as e:
# Log the materialization event with metadata before failing
yield AssetMaterialization(
description="An error occurred before failing.",
metadata={
"error_message": MetadataValue.text(str(e)),
# Include any other relevant metadata here
}
)
# Raise a Failure event to halt execution and mark the asset as failed
raise Failure(description="Intentional failure after logging metadata.")
# If no failure occurs, continue with normal execution
# ...
In this example, if an exception is caught, the asset logs an AssetMaterialization
event with metadata about the error before raising a Failure
event to halt execution. This allows you to capture and record metadata about the failure before the asset's execution is stopped.
It's important to note that once a Failure
event is raised, the asset will not produce any further output or metadata. Therefore, any metadata you wish to attach must be done prior to the failure.
As always, please refer to the latest Dagster documentation for the most up-to-date guidance on working with assets, metadata, and handling failures. Display metadata from a Failure event on... button Metadata for the Failure event using Ass... button How to add metadata for each asset withi... button Dagster Docs button Thank you for your feedback!Brian Stroh
03/07/2024, 10:41 PMpython
from dagster import asset, AssetMaterialization, MetadataValue, Failure
@asset
def my_asset():
# Some logic that may lead to failure
try:
# Your asset's computation logic here
pass
except Exception as e:
# Log the materialization event with metadata before failing
yield AssetMaterialization(
description="An error occurred before failing.",
metadata={
"error_message": MetadataValue.text(str(e)),
# Include any other relevant metadata here
}
)
# Raise a Failure event to halt execution and mark the asset as failed
raise Failure(description="Intentional failure after logging metadata.")
# If no failure occurs, continue with normal execution
# ...
Scout
03/07/2024, 10:41 PM@asset
decorator, the function is expected to return an output that corresponds to the asset being computed. However, in the provided code snippet, the function my_asset
uses yield
to emit an AssetMaterialization
event, and it may raise a Failure
exception. The use of yield
suggests that the function could be a generator. When a function decorated with @asset
yields AssetMaterialization
events, it is treated as a generator that can yield multiple events. The return type of such a function is implicitly a generator, and it does not return a value in the traditional sense. Instead, it yields events that are consumed by the Dagster framework. If the function successfully completes its computation without encountering an exception, it would typically yield an output value corresponding to the asset. If an exception occurs and a Failure
is raised, the function halts execution and does not yield further values. Therefore, the return type of the function my_asset
is a generator that yields AssetMaterialization
events, and potentially raises a Failure
exception. There is no explicit return type specified in the function signature, but if you were to type-annotate this function, you might use Generator[AssetMaterialization, None, None]
to indicate that it is a generator that yields AssetMaterialization
events and does not return any value. However, since Dagster handles these decorators and types internally, you typically do not need to provide such annotations in your Dagster assets.Brian Stroh
03/07/2024, 11:02 PMScout
03/07/2024, 11:02 PM@asset
decorator, the asset key is automatically derived from the asset's function name or can be explicitly provided. If you want to pass the current asset's AssetKey
to the AssetMaterialization
, you can do so by referencing the asset's name directly or by constructing the AssetKey
if you have set a custom one. Here's an example of how you might pass the current asset's AssetKey
to the `AssetMaterialization`: python from dagster import asset, AssetMaterialization, AssetKey @asset def my_asset(): # Your asset's computation logic here result = compute_something() # Create an AssetKey for the current asset # If you're using the default asset key (derived from the function name), you can do this: asset_key = AssetKey("my_asset") # If you have a custom asset key, construct it accordingly: # asset_key = AssetKey(["my_custom_namespace", "my_asset"]) # Emit an AssetMaterialization event with the asset key yield AssetMaterialization( asset_key=asset_key, description="Description of the materialization event", # Include any other relevant metadata here ) # Return the result of the asset computation return result
In this example, my_asset
is the name of the asset, and the AssetKey
is constructed using that name. If you have a custom asset key structure, you would construct the AssetKey
with the appropriate namespace and name. When you yield an AssetMaterialization
within an asset, Dagster automatically associates it with the asset being computed, so explicitly passing the asset key is often not necessary unless you have a specific reason to customize the key. Remember to refer to the latest Dagster documentation for the most current best practices and APIs for working with assets and materializations.Brian Stroh
03/07/2024, 11:06 PMyield AssetMaterialization
isn't picking up the current asset's asset_key
Scout
03/07/2024, 11:06 PM@asset
decorator and want to emit an AssetMaterialization
event, you typically use yield
to emit the event within the asset function. However, if yield AssetMaterialization
isn't picking up the current asset's AssetKey
, you might need to explicitly specify the AssetKey
when you create the AssetMaterialization
object. Here's an example of how to do this: python from dagster import asset, AssetMaterialization, AssetKey @asset def my_asset(): # Your asset's computation logic here result = compute_something() # Emit an AssetMaterialization event with the explicit asset key yield AssetMaterialization( asset_key=AssetKey("my_asset"), # Replace with your asset's name or custom asset key description="Description of the materialization event", # Include any other relevant metadata here ) # Return the result of the asset computation return result
In this example, the AssetMaterialization
event is emitted with an explicit AssetKey
. If you have a custom asset key structure, you would construct the AssetKey
with the appropriate namespace and name. However, if you are still experiencing issues with the asset key not being picked up, it's possible that there may be a bug or a version-specific issue with Dagster. In such cases, it's recommended to check the following: 1. Ensure you are using the latest version of Dagster, as there may have been bug fixes or changes to the behavior of asset materialization. 2. Review the Dagster documentation for any updates or changes to the recommended practices for emitting AssetMaterialization
events. 3. If the problem persists, consider reaching out to the Dagster community or support channels for further assistance. Please note that the behavior of asset materialization and the handling of asset keys can evolve, so always refer to the latest Dagster documentation for the most accurate and up-to-date guidance.Brian Stroh
03/07/2024, 11:19 PMdagster._core.errors.DagsterStepOutputNotFoundError: Core compute for op "match_da_reporting_product_vw_column_names" did not return an output for non-optional output "result"
@asset
def match_da_reporting_product_vw_column_names(
da_reporting_product_vw_sds: pd.DataFrame,
da_reporting_product_vw_swiftload: pd.DataFrame,
):
source_columns = set(da_reporting_product_vw_sds.columns)
destination_columns = set(da_reporting_product_vw_swiftload.columns)
if source_columns != destination_columns:
# get only the columns which do not match
columns_only_in_source = source_columns.difference(destination_columns)
columns_only_in_destination = destination_columns.difference(source_columns)
yield AssetMaterialization(
asset_key="match_da_reporting_product_vw_column_names",
description="Column names do not match.",
metadata={
"columns_only_in_source": MetadataValue.text(
str(columns_only_in_source)
),
"columns_only_in_destination": MetadataValue.text(
str(columns_only_in_destination)
),
},
)
raise Failure(description="Column names do not match.")
return Output(
value=True,
metadata={
"source_column_names": MetadataValue.text(str(source_columns)),
"destination_column_names": MetadataValue.text(str(destination_columns)),
},
)
Scout
03/07/2024, 11:19 PMmatch_da_reporting_product_vw_column_names
is expected to yield an output named "result", but it did not. In Dagster, when an asset function completes without yielding an Output
, and the asset is not marked with output_required=False
, Dagster expects a non-optional output named "result" by default.
Looking at your code, there are a couple of issues that need to be addressed:
1. The use of yield
with AssetMaterialization
is correct, but you need to ensure that an Output
is always yielded from the asset function, regardless of whether the columns match or not. If the columns do not match, you should still yield an Output
after the AssetMaterialization
and before raising a Failure
.
2. The Output
object is used in the context of ops, not assets. For assets, you simply return the value.
Here's a revised version of your asset function that ensures an output is always yielded:
python
from dagster import asset, AssetMaterialization, MetadataValue, Failure
import pandas as pd
@asset
def match_da_reporting_product_vw_column_names(
da_reporting_product_vw_sds: pd.DataFrame,
da_reporting_product_vw_swiftload: pd.DataFrame,
):
source_columns = set(da_reporting_product_vw_sds.columns)
destination_columns = set(da_reporting_product_vw_swiftload.columns)
if source_columns != destination_columns:
# get only the columns which do not match
columns_only_in_source = source_columns.difference(destination_columns)
columns_only_in_destination = destination_columns.difference(source_columns)
yield AssetMaterialization(
description="Column names do not match.",
metadata={
"columns_only_in_source": MetadataValue.text(
str(columns_only_in_source)
),
"columns_only_in_destination": MetadataValue.text(
str(columns_only_in_destination)
),
},
)
# Yield an output even in the case of failure to satisfy Dagster's output expectations
yield True # This represents a dummy output to satisfy Dagster's requirement for an output
raise Failure(description="Column names do not match.")
else:
# If the columns match, return True or some other meaningful result
return True
In this revised version, the asset function yields a dummy True
value before raising a Failure
when the columns do not match. This ensures that Dagster's expectation for an output is met. If the columns match, the function simply returns True
.
Please note that the AssetMaterialization
does not need an explicit asset_key
argument because Dagster automatically associates the materialiScout
03/07/2024, 11:19 PMBrian Stroh
03/08/2024, 11:02 PMdeps
in an asset decorator to get programmatically get values from upstream assets?Scout
03/08/2024, 11:02 PMdeps
parameter in the @asset
decorator to specify dependencies on upstream assets. This allows you to programmatically get values from upstream assets in Dagster. When you define an asset with dependencies, Dagster will ensure that the upstream assets are computed before the downstream asset, and it will pass the values of the upstream assets as arguments to the downstream asset function. Here's an example of how you might define assets with dependencies: python from dagster import asset @asset def upstream_asset(): # Compute and return the value for the upstream asset return "upstream_value" @asset(deps={"upstream_asset_dep": upstream_asset}) def downstream_asset(upstream_asset_dep): # Use the value from the upstream asset in the downstream asset's computation return f"downstream_value based on {upstream_asset_dep}"
In this example, downstream_asset
depends on upstream_asset
. The deps
parameter in the @asset
decorator for downstream_asset
specifies that there is a dependency named upstream_asset_dep
which corresponds to the upstream_asset
function. When downstream_asset
is computed, Dagster will automatically invoke upstream_asset
first and pass its return value as the argument upstream_asset_dep
to downstream_asset
. The deps
parameter is a dictionary where the keys are the names of the parameters in the downstream asset function, and the values are the upstream asset functions or asset keys that they depend on. Remember to refer to the latest Dagster documentation for the most current best practices and APIs for working with assets and dependencies.Brian Stroh
03/09/2024, 12:30 AMScout
03/09/2024, 12:30 AMworkspace.yaml
) is used to define code locations. If you have multiple code locations within the same Python package, you can specify each one using the module_name
property under load_from
. Each code location corresponds to a Python module that contains your Dagster definitions, such as assets, jobs, resources, schedules, and sensors. Here's an example of a workspace.yaml
file that defines multiple code locations sourcing from different modules within the same Python package: yaml load_from: - python_module: my_package.module_one - python_module: my_package.module_two - python_module: my_package.subpackage.module_three
In this example, my_package
is the name of your Python package, and module_one
, module_two
, and module_three
are different modules within that package. The subpackage
is a subpackage within my_package
. Each module specified here would be loaded as a separate code location in Dagster Cloud. Make sure that your Python package is installed in the Python environment where Dagster Cloud is running, and that the modules specified in the workspace.yaml
file are accessible from that environment. Please refer to the latest Dagster documentation for the most accurate and up-to-date guidance on configuring workspaces and code locations in Dagster Cloud.Brian Stroh
03/11/2024, 6:18 PMScout
03/11/2024, 6:18 PMconfig_schema
on the asset with a default value specified. This allows you to set a default configuration that will be used if no runtime config is provided.
Here's an example of how to define an asset with optional config and a default value:
python
from dagster import asset, Field
@asset(config_schema={"param": Field(str, is_required=False, default_value="default_value")})
def my_asset_with_default_config(context):
# Access the config value, which will be the default value if not overridden
param_value = context.op_config.get("param")
# Your asset logic here using the param_value
return f"Parameter value is: {param_value}"
In this example, the asset my_asset_with_default_config
has an optional config parameter param
with a default value of "default_value"
. The Field
constructor is used to define the config schema for param
, marking it as not required (is_required=False
) and specifying the default value (default_value="default_value"
).
When this asset is materialized, if no config is provided for param
, the default value will be used. If you want to override the default value, you can provide a runtime config like so:
yaml
ops:
my_asset_with_default_config:
config:
param: "overridden_value"
This runtime config would set the param
value to "overridden_value"
instead of the default value.
Remember to refer to the latest Dagster documentation for the most current best practices and APIs for working with asset configuration. Track the config used to bind with_reso... button How to define default values for config ... button Allow assets to accept config via
.conf... button Make it possible to specify config when ... button Thank you for your feedback!Brian Stroh
03/11/2024, 6:35 PMScout
03/11/2024, 6:35 PMMetadataValue.text
does not support formatting the text as a code block directly. The MetadataValue.text
is intended for plain text metadata entries. However, if you want to display text as a code block in the Dagster UI, you can use MetadataValue.md
to create a markdown-formatted string that renders as a code block. Here's an example of how you might use markdown to format a string as a code block in an `AssetMaterialization`: python from dagster import asset, AssetMaterialization, MetadataValue @asset def my_asset(): code_snippet = "def example_function():\n return 'Hello, World!'" markdown_code_block = f"```\n{code_snippet}\n```" # Format as a markdown code block yield AssetMaterialization( description="Example of code block formatting", metadata={ "code_snippet": MetadataValue.md(markdown_code_block) }, ) # Your asset logic here return "Asset result"
In this example, the code_snippet
string is formatted as a markdown code block using triple backticks (```) before and after the code snippet. When you use MetadataValue.md
with this markdown-formatted string, it will render as a code block in the Dagster UI. Please refer to the latest Dagster documentation for any updates or changes to the metadata entry system that might allow for new ways to format text as code blocks or other rich text formats.