Aaron Bailey
04/22/2022, 3:35 PMjamie
04/22/2022, 3:41 PMAaron Bailey
04/22/2022, 3:42 PMdagster.PipelineFailureSensorContext
where there doesnt appear to be corresponding JobFailureSensorContext. I am trying to figure it out on my own - but having little success. Updated documentation for job failure / msteams notifications sure would be helpful. Thanks!jamie
04/26/2022, 1:55 PMAaron Bailey
04/26/2022, 2:32 PMteams_on_pipeline_failure = make_teams_on_pipeline_failure_sensor(
hook_url=os.getenv("TEAMS_WEBHOOK_URL")
)
@repository
def my_repo():
return [my_pipeline + teams_on_pipeline_failure]
def my_message_fn(context: PipelineFailureSensorContext) -> str:
return "Pipeline {pipeline_name} failed! Error: {error}".format(
pipeline_name=context.pipeline_run.pipeline_name,
error=context.failure_event.message,
)
teams_on_pipeline_failure = make_teams_on_pipeline_failure_sensor(
hook_url=os.getenv("TEAMS_WEBHOOK_URL"),
message_fn=my_message_fn,
dagit_base_url="<http://localhost:3000>",
)
jamie
04/26/2022, 2:41 PMAaron Bailey
04/26/2022, 2:45 PMjamie
04/26/2022, 2:49 PMAaron Bailey
04/26/2022, 2:49 PMjamie
04/26/2022, 2:50 PMAaron Bailey
04/26/2022, 2:51 PMjamie
04/26/2022, 2:51 PMAaron Bailey
04/26/2022, 2:53 PMjamie
04/26/2022, 3:01 PMdef my_message_fn(context: HookContext) -> str:
return "Op {op_name} success!".format(
op_name=context.op
)
@op()
def start_logging_def(context):
<http://logging.info|logging.info>('Seg Dim start time {}'.format(datetime.now()))
@job (description='This process does something cool.')
def seg_dim():
start_logging_def.with_hooks(hook_defs={teams_on_success(message_fn=my_message_fn, dagit_base_url="localhost:3000")})
I'm with you though, the docs are very misleading. we're doing an audit of our extranal library integrations (especially ones that are community contributions) in the next couple months. i'll add msteams to the list.
Another resource i found helpful was looking at the tests https://github.com/dagster-io/dagster/blob/master/python_modules/libraries/dagster-msteams/dagster_msteams_tests/test_hooks.py you can see how the original author is using the hooksAaron Bailey
04/26/2022, 5:28 PMjamie
04/26/2022, 5:37 PM@job (resource_defs={"msteams": msteams_resource.configured(hook_url="some/url")}, hooks={msteams_message_on_success})
def jobdef():
op1(op2(op3())),
Aaron Bailey
04/26/2022, 5:40 PM