Todd de Quincey
03/09/2024, 12:58 PMScout
03/09/2024, 12:58 PMTodd de Quincey
03/09/2024, 12:59 PMScout
03/09/2024, 1:00 PMTodd de Quincey
03/09/2024, 1:01 PMScout
03/09/2024, 1:01 PM--output
flag.
3. Custom logic for alerts: Implement custom logic that sends alerts based on the parsed test results. For example, you could send a Slack message for warnings and raise an exception for errors.
4. Integrate with Dagster: Use Dagster's sensors or hooks to integrate this custom logic with your asset materialization process.
Here's a simplified example of how you might write a Python function to parse dbt test results and send Slack alerts:
python
import subprocess
import json
import requests
def run_dbt_tests_and_alert():
# Run dbt tests and capture the output
result = subprocess.run(['dbt', 'test', '--output', 'json'], capture_output=True)
test_output = result.stdout.decode()
# Parse the JSON output
test_results = json.loads(test_output)
# Check for warnings and errors
warnings = []
errors = []
for test in test_results['results']:
if test['status'] == 'warn':
warnings.append(test['name'])
elif test['status'] == 'fail':
errors.append(test['name'])
# Send Slack alerts for warnings
if warnings:
message = f"dbt test warnings: {', '.join(warnings)}"
send_slack_alert(message)
# Raise an exception for errors to fail the asset materialization
if errors:
raise Exception(f"dbt test failures: {', '.join(errors)}")
def send_slack_alert(message):
webhook_url = 'YOUR_SLACK_WEBHOOK_URL'
slack_data = {'text': message}
response = <http://requests.post|requests.post>(
webhook_url, json=slack_data,
headers={'Content-Type': 'application/json'}
)
if response.status_code != 200:
raise ValueError(f"Request to Slack returned an error {response.status_code}, the response is:\n{response.text}")
# Example usage within a Dagster asset or op
@asset
def my_asset():
run_dbt_tests_and_alert()
# Proceed with asset materialization if no errors
# ...
Please note that this is a high-level example and does not include error handling or the specifics of integrating withScout
03/09/2024, 1:02 PMTodd de Quincey
03/09/2024, 1:04 PMScout
03/09/2024, 1:04 PM@dbt_assets
decorator in Dagster is used to define a collection of assets that correspond to dbt models, tests, and snapshots. However, the @dbt_assets
decorator itself does not provide direct functionality for parsing dbt test results and conditionally sending alerts based on those results.
To implement custom logic for parsing dbt test results and sending Slack alerts within the context of @dbt_assets
, you would need to manually handle the dbt test command execution and result parsing within the decorated function. Here's a conceptual example of how you might do this:
python
from dagster import AssetExecutionContext
from dagster_dbt import dbt_assets, DbtCliResource
import json
import requests
@dbt_assets
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
# Run dbt tests and capture the output
dbt_cli_invocation = dbt.cli(["test", "--output", "json"], context=context)
# Parse the JSON output
test_output = dbt_cli_invocation.stdout.decode()
test_results = json.loads(test_output)
# Check for warnings and errors
warnings = []
errors = []
for test in test_results['results']:
if test['status'] == 'warn':
warnings.append(test['name'])
elif test['status'] == 'fail':
errors.append(test['name'])
# Send Slack alerts for warnings
if warnings:
message = f"dbt test warnings: {', '.join(warnings)}"
send_slack_alert(message)
# Raise an exception for errors to fail the asset materialization
if errors:
raise Exception(f"dbt test failures: {', '.join(errors)}")
# If no errors, continue to yield asset events from dbt run
yield from dbt.cli(["run"], context=context).stream()
def send_slack_alert(message):
webhook_url = 'YOUR_SLACK_WEBHOOK_URL'
slack_data = {'text': message}
response = <http://requests.post|requests.post>(
webhook_url, json=slack_data,
headers={'Content-Type': 'application/json'}
)
if response.status_code != 200:
raise ValueError(f"Request to Slack returned an error {response.status_code}, the response is:\n{response.text}")
In this example, the my_dbt_assets
function is decorated with @dbt_assets
, which means it is responsible for running dbt commands and yielding the resulting asset events. The function first runs the dbt test
command and captures the output in JSON format. It then parses the test results to identify warnings and errors. If there are any warnings, it sends a Slack alert using the send_slack_alert
function. If there are any errors, it raises an exception to fail the asset materialization. If there are no errors, it proceeds to run the dbt run
command and yields the resulting asset events.
Please note thScout
03/09/2024, 1:05 PM@dbt_assets
deco... button dbt integration - adding python downstre... button I’m migrating from the `load_assets_from... button Thank you for your feedback!