Paul Herzog
03/05/2024, 10:53 PMScout
03/05/2024, 10:53 PMpython
from dagster import sensor, RunRequest, SkipReason
from dagster_dbt import dbt_cli_resource
from dagster_gcp.bigquery.resources import bigquery_resource
# Define your resource configuration for BigQuery and dbt
resources = {
"bigquery": bigquery_resource.configured({"project": "my-gcp-project"}),
"dbt": dbt_cli_resource.configured({"project_dir": "path/to/your/dbt/project"})
}
# Define a sensor that checks for fresh data in a BigQuery table
@sensor(job=my_dbt_job, minimum_interval_seconds=3600) # Run this sensor hourly
def my_bigquery_sensor(context):
# Query BigQuery to check for fresh data
# Replace 'my_table' and 'my_column' with your actual table and column names
query = "SELECT MAX(my_column) FROM `my-gcp-project.my_dataset.my_table`"
result = context.resources.bigquery.execute_query(query)
latest_timestamp = result[0][0] if result else None
# Check if the latest timestamp is greater than the last recorded timestamp
last_recorded_timestamp = context.cursor or None
if latest_timestamp and (last_recorded_timestamp is None or latest_timestamp > last_recorded_timestamp):
# Update the cursor with the latest timestamp
context.update_cursor(str(latest_timestamp))
# Trigger the dbt Cloud job run
return RunRequest(run_key=str(latest_timestamp), run_config={})
else:
# Skip if no fresh data is found
return SkipReason(f"No fresh data found as of {context.scheduled_execution_time}.")
# Define your dbt job that should be triggered by the sensor
# This is a placeholder for your actual dbt job definition
my_dbt_job = ...
# Note: You will need to replace the placeholder `my_dbt_job` with your actual dbt job definition.
# The dbt job should be configured to run the dbt Cloud job using the dbt CLI or another method.
Please note that the above example assumes you have the necessary Dagster resources configured for BigQuery and dbt. The bigquery_resource
is used to execute the query in BigQuery, and the dbt_cli_resource
is a placeholder for how you might configure dbt within your Dagster environment.
The my_dbt_job
is a placeholder for your actual dbt job definition, which you would need to define separately. This job should be configured to run the dbt Cloud job using the dbt CLI or another method that is appropriate for your setup.
To trigger a dbt Cloud job specificallyScout
03/05/2024, 10:54 PMmy-gcp-project
, my_dataset
, my_table
, my_column
, and my_dbt_job
with your actual project, dataset, table, column, and job names. Lastly, please ensure that you have the necessary permissions and credentials set up for both BigQuery and dbt Cloud to allow the sensor to query BigQuery and trigger the dbt Cloud job.Paul Herzog
03/05/2024, 11:12 PMScout
03/05/2024, 11:12 PMrequests
library to make the API call. Please note that this example is for illustrative purposes, and you will need to adapt it to your specific requirements and environment.
python
import requests
from dagster import asset
# Define your dbt Cloud API configuration
DBT_CLOUD_API_URL = "<https://cloud.getdbt.com/api/v2/accounts/{account_id}/jobs/{job_id}/run/>"
DBT_CLOUD_ACCOUNT_ID = "your-dbt-cloud-account-id"
DBT_CLOUD_JOB_ID = "your-dbt-cloud-job-id"
DBT_CLOUD_API_TOKEN = "your-dbt-cloud-api-token"
# Define an asset that triggers a dbt Cloud job
@asset
def trigger_dbt_cloud_job(context):
# Construct the API URL with the account and job IDs
url = DBT_CLOUD_API_URL.format(account_id=DBT_CLOUD_ACCOUNT_ID, job_id=DBT_CLOUD_JOB_ID)
# Set up the headers with the API token for authorization
headers = {
"Authorization": f"Token {DBT_CLOUD_API_TOKEN}",
"Content-Type": "application/json"
}
# Make the POST request to trigger the dbt Cloud job
response = <http://requests.post|requests.post>(url, headers=headers)
# Check if the request was successful
if response.status_code == 200:
# Job triggered successfully, you can log or process the response as needed
<http://context.log.info|context.log.info>("dbt Cloud job triggered successfully.")
return response.json()
else:
# Handle errors
context.log.error(f"Failed to trigger dbt Cloud job: {response.text}")
raise Exception(f"Failed to trigger dbt Cloud job: {response.text}")
# Note: You will need to replace the placeholders like `your-dbt-cloud-account-id`,
# `your-dbt-cloud-job-id`, and `your-dbt-cloud-api-token` with your actual dbt Cloud account ID,
# job ID, and API token.
In this example, the trigger_dbt_cloud_job
asset makes an HTTP POST request to the dbt Cloud API to trigger a job run. It uses the requests
library to send the request with the necessary authorization header.
Please ensure that you replace the placeholders with your actual dbt Cloud account ID, job ID, and API token. You will also need to handle the response from the API call appropriately, depending on whether the job was triggered successfully or if there was an error.
Keep in mind that this asset should be part of a larger Dagster job or pipeline that includes any necessary dependencies and is scheduled or triggered as needed. Additionally, you should manaScout
03/05/2024, 11:12 PM