Brian Stroh
02/15/2024, 9:39 PMScout
02/15/2024, 9:39 PMRunRequest
if a change is detected that should trigger a pipeline run.
Here's a conceptual example based on the principles described in the documentation:
python
from dagster import sensor, RunRequest
import pandas as pd
# This is a placeholder function that you would replace with your actual logic
# for loading and checking your dataframe.
def check_dataframe_for_changes(df_path):
# Load the dataframe
df = pd.read_csv(df_path)
# Perform your check for changes
# ...
# Return True if changes are detected, False otherwise
return changes_detected
@sensor(job=my_pipeline)
def my_dataframe_sensor(context):
df_path = "path/to/your/dataframe.csv"
if check_dataframe_for_changes(df_path):
yield RunRequest(run_key="unique-key-for-this-run")
In this example, my_pipeline
would be the pipeline you want to trigger. The check_dataframe_for_changes
function is where you would implement the logic to check for changes in the DataFrame. If changes are detected, the sensor yields a RunRequest
, which tells Dagster to start a run of the specified pipeline.
Keep in mind that this is a conceptual example, and you would need to implement the actual logic for detecting changes in the DataFrame according to your specific requirements. Additionally, you would need to manage the state between sensor evaluations to determine what constitutes a "change" since the last check. This might involve storing a snapshot of the DataFrame's state, a hash of the DataFrame, or the last modified timestamp, depending on your use case. Dagster 0.14.0: Table Schema API + Pande... button How to find the latest observation for a... button Dagster with Pandas | Dagster button Data Compaction | Dagster Glossary button Thank you for your feedback!