junsik KIM
08/03/2022, 10:52 AMrepository.py
from dagster import repository
from ts_dagster.jobs.say_hello import say_hello_job
from ts_dagster.schedules.my_hourly_schedule import my_hourly_schedule
from ts_dagster.sensors.slack_on_failure_sensor import make_slack_on_failure_sensor
@repository
def ts_dagster():
jobs = [say_hello_job]
schedules = [my_hourly_schedule]
sensors = [make_slack_on_failure_sensor]
return jobs + schedules + sensors
#sensors
slack_on_failure_sensor.py
from dagster_slack.sensors import make_slack_on_run_failure_sensor
from dagster import RunFailureSensorContext,SensorDefinition
def my_message_fn(context: RunFailureSensorContext) -> str:
return (
f"Job {context.pipeline_run.pipeline_name} failed!"
f"Error: {context.failure_event.message}"
)
def make_slack_on_failure_sensor() -> SensorDefinition:
return make_slack_on_run_failure_sensor(
channel="dagster_test",
slack_token="xoxb-3768110452066",
text_fn= my_message_fn,
dagit_base_url="<http://localhost:3000>",
)
# OP
hello.py
from dagster import op
@op
def fails():
return Exception("failure!")
# JOB
say_hello.py
from dagster import job
from ts_dagster.ops.hello import fails
@job
def say_hello_job():
fails()
owen
08/03/2022, 4:33 PMjunsik KIM
08/04/2022, 12:01 AMowen
08/04/2022, 12:02 AM@repository
def ts_dagster():
jobs = [say_hello_job]
schedules = [my_hourly_schedule]
sensors = [make_slack_on_failure_sensor()] # this line changed
return jobs + schedules + sensors
junsik KIM
08/04/2022, 12:08 AMdagster-daemon msg error
owen
08/04/2022, 12:19 AM~/.dagster
is a reasonable spot, but you might have other preferences). I believe your daemon should still function without setting that up (those are just warnings), but I could be wrong on that.junsik KIM
08/04/2022, 12:33 AMdaniel
08/04/2022, 1:38 AMjunsik KIM
08/04/2022, 2:04 AM