Can you write a sensor that sends an email via AWS...
# ask-ai
h
Can you write a sensor that sends an email via AWS SES on run failures?
s
We do not have a built-in sensor specifically for sending emails via AWS SES on run failures. However, you can create a custom run failure sensor using the
run_failure_sensor
decorator and integrate it with AWS SES to send emails when a run fails. Here's a general approach based on the documentation: 1. Define a run failure sensor using the
@run_failure_sensor
decorator. 2. Inside the sensor function, use AWS SES to send an email. You would need to use the AWS SDK for Python (Boto3) to interact with AWS SES. 3. Extract the necessary information from the
RunFailureSensorContext
to include in your email, such as the job name, run ID, and error message. Please note that the documentation does not provide a specific code example for integrating with AWS SES. You would need to refer to AWS SES documentation for details on sending emails using Boto3 and ensure that you handle AWS credentials and permissions appropriately. For sending emails on run failures using SMTP, the documentation provides an example with
make_email_on_run_failure_sensor
, which creates a job failure sensor that sends emails via the SMTP protocol. However, this example uses SMTP settings for services like Gmail and would need to be adapted for AWS SES. If you decide to create a custom sensor for AWS SES, you would need to handle the AWS SES integration within your sensor function. Remember to manage your AWS credentials securely and follow best practices for error handling and logging within your sensor function.
h
@Scout I already have an SES client, can you just write the sensor using a stub
s
Certainly! Below is a stubbed example of how you might define a run failure sensor that sends an email via AWS SES when a run fails. This example assumes that you have a function
send_email_via_ses
that takes care of the actual email sending using your SES client.
Copy code
python from dagster import run_failure_sensor, RunFailureSensorContext  # This is a stub for the actual SES email sending function. # You should replace this with your actual implementation using boto3 or another SES client. def send_email_via_ses(subject, message, recipient):     # Your SES email sending logic goes here.     pass  @run_failure_sensor def ses_email_on_run_failure(context: RunFailureSensorContext):     run = context.dagster_run     failure_info = context.failure_event.message     subject = f"Run Failure: {run.run_id}"     message = f"Run {run.run_id} for job {run.job_name} failed with error: {failure_info}"     recipient = "<mailto:recipient@example.com|recipient@example.com>"  # Replace with your recipient email address      send_email_via_ses(subject, message, recipient)
In this stub: -
send_email_via_ses
is a placeholder for your SES email sending function. You should implement this function using the AWS SES client (
boto3
or any other SES client you're using). -
ses_email_on_run_failure
is the sensor function decorated with
@run_failure_sensor
. It constructs a subject and message for the email and then calls
send_email_via_ses
to send the email. -
context.dagster_run
provides access to the run that failed, and
context.failure_event.message
provides the failure message. - Replace
"<mailto:recipient@example.com|recipient@example.com>"
with the actual recipient's email address. Please ensure that you handle any exceptions that may occur during the email sending process and that you have the necessary permissions and configurations set up in AWS SES to send emails.
h
@Scout Can sensors depend on resources? And will the run failure sensor be triggered if I terminate a job via the UI? Does this run failure sensor apply to all jobs?
s
Sensors in Dagster, including run failure sensors, do not directly depend on resources in the same way that solids or ops do. Resources are typically used to provide context or configuration for pipeline execution, and sensors are meant to observe external state and trigger runs based on that state. However, you can access the instance in the sensor context to fetch information about runs, which can indirectly allow you to use some of the instance's capabilities. Regarding the triggering of run failure sensors upon job termination, the run failure sensor is designed to trigger when a run fails, which includes runs that fail due to an error during execution. If a job is terminated manually through the UI, it may not necessarily trigger a run failure sensor, as manual termination might be considered a different type of run status. The behavior can depend on how Dagster internally records the status of manually terminated runs. The run failure sensor, by default, applies to all jobs within the repository where it is defined. However, you can specify a list of jobs to monitor by using the
monitored_jobs
parameter in the sensor decorator. If you want the sensor to apply to specific jobs, you would configure it like this:
Copy code
python from dagster import run_failure_sensor, RunFailureSensorContext, JobDefinition  @run_failure_sensor(monitored_jobs=[JobDefinition(name="my_monitored_job")]) def ses_email_on_run_failure(context: RunFailureSensorContext):     # Sensor implementation...
In this example,
my_monitored_job
would be the name of the job you want to monitor for failures. If you do not specify
monitored_jobs
, the sensor will default to monitoring all jobs in the current repository. To summarize: - Sensors do not directly depend on resources. - Run failure sensors are triggered when a run fails, but manual termination through the UI may not be considered a failure in this context. - By default, a run failure sensor applies to all jobs unless you specify a subset of jobs to monitor using the
monitored_jobs
parameter.