Ian Macleod
02/07/2024, 10:02 AMScout
02/07/2024, 10:02 AM@sensor
decorator. This sensor will be responsible for checking the status of the jobs and aggregating the results.
2. Fetch Job Results: Inside the sensor function, you would use the Dagster instance to fetch the results of the jobs you're interested in. This might involve querying the run storage to get the status and outputs of the jobs.
3. Aggregate Results: Once you have the job results, you would write the logic to aggregate these results as needed.
4. Write to Database: After aggregating the results, you would then write the aggregated data to a database. This step would involve using a database client or ORM within your sensor function to insert or update records in your database.
5. Return SkipReason
: Since you do not want to trigger any downstream jobs, you would return a SkipReason
from your sensor function to indicate that no further action is needed.
Here is a conceptual example based on the documentation:
python
from dagster import sensor, SkipReason, DagsterInstance
# Import your database client or ORM here
@sensor(job=my_aggregation_job)
def my_aggregation_sensor(context):
instance = DagsterInstance.get()
# Replace 'my_target_job' with the name of the job you want to monitor
runs = instance.get_runs(job_name='my_target_job')
# Logic to aggregate results from runs
aggregated_results = aggregate_job_results(runs)
# Logic to write aggregated results to a database
write_results_to_database(aggregated_results)
# Since we don't want to trigger any downstream jobs, return a SkipReason
return SkipReason("Aggregated results written to database, no downstream job triggered.")
In this example, my_aggregation_job
is a placeholder for the job that the sensor is part of (if applicable), aggregate_job_results
is a function you would define to perform the aggregation logic, and write_results_to_database
is a function you would define to handle the database write operation.
Please note that this is a high-level outline and does not include the actual implementation details for aggregating results or interacting with a database, as those would depend on your specific use case and database technology.
Remember to handle any exceptions or errors that might occur during the execution of your sensor, and ensuScout
02/07/2024, 10:03 AM