Andreas
03/07/2024, 6:47 AMScout
03/07/2024, 6:47 AM@sensor
decorator provided by Dagster. The decorated function is an evaluation function that takes in a SensorEvaluationContext
and can yield RunRequest
objects to create job runs or SkipReason
objects to indicate why no runs were created. Here's a basic example of how to define a sensor: python from dagster import sensor, SensorEvaluationContext, RunRequest @sensor(job=my_job) def my_sensor(context: SensorEvaluationContext): # Your custom logic to check for external events goes here # For example, you might check for a new file in a directory if new_data_available(): # If the condition is met, create a RunRequest to trigger a job run run_config = {"solids": {"my_solid": {"config": {"param": "value"}}}} yield RunRequest(run_key="unique_run_key", run_config=run_config) else: # If the condition is not met, you can yield a SkipReason yield SkipReason("No new data to process")
In this example, my_job
is the job that you want to trigger. The new_data_available
function represents a custom check for external events, such as the arrival of new data. If the condition is met, a RunRequest
is yielded with a unique run_key
and the necessary run_config
for the job. If the condition is not met, a SkipReason
is yielded to indicate why no run was created. Remember to replace my_job
, new_data_available
, and the run_config
with your actual job, condition-checking logic, and configuration, respectively. This is a simplified example to illustrate the concept. In practice, your sensor's logic can be as simple or complex as needed to meet your requirements.