Jérôme MONTIGNY
02/14/2024, 2:58 PMScout
02/14/2024, 2:58 PM@dagster.run_status_sensor
decorator to create a sensor that reacts to the success status of a job run: python from dagster import run_status_sensor, RunStatusSensorContext, DagsterRunStatus @run_status_sensor(run_status=DagsterRunStatus.SUCCESS) def my_slack_on_run_success(context: RunStatusSensorContext): slack_client = WebClient(token=os.environ["SLACK_DAGSTER_ETL_BOT_TOKEN"]) slack_client.chat_postMessage( channel="#alert-channel", text=f'Job "{context.dagster_run.job_name}" succeeded.', )
In this example, the my_slack_on_run_success
function is decorated with @run_status_sensor
and configured to trigger when a job run succeeds. Inside the function, it uses a Slack client to send a message to a specified channel, indicating the success of the job. Please note that this example assumes you have the Slack client (WebClient
) and the necessary Slack token configured in your environment. The context.dagster_run.job_name
provides the name of the job that succeeded, which is included in the Slack message.Jérôme MONTIGNY
02/14/2024, 3:00 PMScout
02/14/2024, 3:00 PMJérôme MONTIGNY
02/14/2024, 3:02 PMScout
02/14/2024, 3:02 PM@dagster.run_status_sensor
decorator to react only to the success of a specific job, you can check the job_name
attribute of the context.dagster_run
object within the sensor function. Here's an example from the documentation: python from dagster import run_status_sensor, DagsterRunStatus, RunRequest, SkipReason # Assuming you have a job defined somewhere with the name 'my_specific_job' from my_jobs import my_specific_job @run_status_sensor( run_status=DagsterRunStatus.SUCCESS, request_job=my_specific_job, ) def report_status_sensor(context): # this condition prevents the sensor from triggering my_specific_job again after it succeeds if context.dagster_run.job_name != my_specific_job.name: run_config = { "ops": { "status_report": {"config": {"job_name": context.dagster_run.job_name}} } } return RunRequest(run_key=None, run_config=run_config) else: return SkipReason("Don't report status of my_specific_job")
In this example, the report_status_sensor
function is decorated with @run_status_sensor
and is configured to trigger when the job my_specific_job
succeeds. The function checks if the job that triggered the sensor is not my_specific_job
itself to prevent an infinite loop of triggering the same job upon its own success. If the condition is met, it creates a RunRequest
with the appropriate run_config
. If the job that succeeded is my_specific_job
, it returns a SkipReason
to avoid re-triggering. Make sure to replace my_specific_job
with the actual job you want to monitor, and adjust the run_config
as needed for your specific use case.Jérôme MONTIGNY
02/14/2024, 3:05 PMScout
02/14/2024, 3:05 PM