https://dagster.io/ logo
#ask-ai
Title
# ask-ai
j

Jérôme MONTIGNY

02/14/2024, 2:58 PM
show me an example using @dagster.run_status_sensor
s

Scout

02/14/2024, 2:58 PM
Certainly! Here's an example from the documentation of using the
@dagster.run_status_sensor
decorator to create a sensor that reacts to the success status of a job run:
Copy code
python from dagster import run_status_sensor, RunStatusSensorContext, DagsterRunStatus  @run_status_sensor(run_status=DagsterRunStatus.SUCCESS) def my_slack_on_run_success(context: RunStatusSensorContext):     slack_client = WebClient(token=os.environ["SLACK_DAGSTER_ETL_BOT_TOKEN"])      slack_client.chat_postMessage(         channel="#alert-channel",         text=f'Job "{context.dagster_run.job_name}" succeeded.',     )
In this example, the
my_slack_on_run_success
function is decorated with
@run_status_sensor
and configured to trigger when a job run succeeds. Inside the function, it uses a Slack client to send a message to a specified channel, indicating the success of the job. Please note that this example assumes you have the Slack client (
WebClient
) and the necessary Slack token configured in your environment. The
context.dagster_run.job_name
provides the name of the job that succeeded, which is included in the Slack message.
j

Jérôme MONTIGNY

02/14/2024, 3:00 PM
@Scout, how do I do it for a specific job ?
s

Scout

02/14/2024, 3:00 PM
There was an error processing your request. Please try again.
j

Jérôme MONTIGNY

02/14/2024, 3:02 PM
@Scout How to use @dagster.run_status_sensor only on a specific job's success
s

Scout

02/14/2024, 3:02 PM
To use the
@dagster.run_status_sensor
decorator to react only to the success of a specific job, you can check the
job_name
attribute of the
context.dagster_run
object within the sensor function. Here's an example from the documentation:
Copy code
python from dagster import run_status_sensor, DagsterRunStatus, RunRequest, SkipReason  # Assuming you have a job defined somewhere with the name 'my_specific_job' from my_jobs import my_specific_job  @run_status_sensor(     run_status=DagsterRunStatus.SUCCESS,     request_job=my_specific_job, ) def report_status_sensor(context):     # this condition prevents the sensor from triggering my_specific_job again after it succeeds     if context.dagster_run.job_name != my_specific_job.name:         run_config = {             "ops": {                 "status_report": {"config": {"job_name": context.dagster_run.job_name}}             }         }         return RunRequest(run_key=None, run_config=run_config)     else:         return SkipReason("Don't report status of my_specific_job")
In this example, the
report_status_sensor
function is decorated with
@run_status_sensor
and is configured to trigger when the job
my_specific_job
succeeds. The function checks if the job that triggered the sensor is not
my_specific_job
itself to prevent an infinite loop of triggering the same job upon its own success. If the condition is met, it creates a
RunRequest
with the appropriate
run_config
. If the job that succeeded is
my_specific_job
, it returns a
SkipReason
to avoid re-triggering. Make sure to replace
my_specific_job
with the actual job you want to monitor, and adjust the
run_config
as needed for your specific use case.
j

Jérôme MONTIGNY

02/14/2024, 3:05 PM
@Scout so, in your example, it triggers the sensor as long as the job run is not "my_specific_job", right ?
s

Scout

02/14/2024, 3:05 PM
There was an error processing your request. Please try again.
3 Views