Hey all - Im really new to this and Im a little co...
# ask-community
a
Hey all - Im really new to this and Im a little confused. Trying to implement hooks (specifically msteams) and as I go through the documentation - everything for the hook is referring to @solid & @pipeline. Is this just a matter of the documentation not keeping up to speed of innovation? I believe we're supposed to use ops and jobs now... so just trying to work though it. I appologize for the newbie question and thanks in advance.
j
Hi aaron! probably just some out of date docs. Can you link the pages you're looking at so we can update them? solid and pipeline are deprecated in favor of op and job respectively. You should be able to use op everywhere you see solid and job everywhere you see pipeline
Hey Jamie - sorry to be a pain - but the documenation refers to class
dagster.PipelineFailureSensorContext
where there doesnt appear to be corresponding JobFailureSensorContext. I am trying to figure it out on my own - but having little success. Updated documentation for job failure / msteams notifications sure would be helpful. Thanks!
j
no worries! do you have a code snippet you can share that shows how you're trying to access the sensor context?
a
So I am trying to simply create a notification to Teams on failure on any failure within the job - indicating the op name that failed. I think this aligns with your documentation around '`dagster_msteams.make_teams_on_pipeline_failure_sensor'`
Copy code
teams_on_pipeline_failure = make_teams_on_pipeline_failure_sensor(
    hook_url=os.getenv("TEAMS_WEBHOOK_URL")
)

@repository
def my_repo():
    return [my_pipeline + teams_on_pipeline_failure]
def my_message_fn(context: PipelineFailureSensorContext) -> str:
    return "Pipeline {pipeline_name} failed! Error: {error}".format(
        pipeline_name=context.pipeline_run.pipeline_name,
        error=context.failure_event.message,
    )

teams_on_pipeline_failure = make_teams_on_pipeline_failure_sensor(
    hook_url=os.getenv("TEAMS_WEBHOOK_URL"),
    message_fn=my_message_fn,
    dagit_base_url="<http://localhost:3000>",
)
j
ok - are you using this code snippet directly in your code? are you getting an error message? full disclosure, the msteams integration looks like it was a community contribution from a while ago. this is the first time i'm looking at it in depth, so any information about what you're trying, the errors you're getting, behavior you're seeing, etc. will be really helpful
a
Got it - i thought it looked older - even in the source code cant see how it would really work in the new ops/jobs world. But - I would even be willing to get the notification working in the individual op (not even set at the job level)... I created this code to do so... _@teams_on_success(dagit_base_url="http://localhost:3000")_ _def my_message_fn(context: HookContext) -> str:_ _return "Op {op_name} success!".format(_ _op_name=context.op_ ) @op() _def start_logging_def(context):_ logging.info('Seg Dim start time {}'.format(datetime.now())) @job (description='This process does something cool.') def seg_dim(): start_logging_def.with_hooks(hook_defs={teams_on_success("#foo", my_message_fn)}) A couple issues here though - I dont see where to set the hook url. I'd think this would just fire off to never neverland. But the error i receive from it is _UserWarning: Error loading repository location te_seg_dim.pydagster.check.ParameterCheckError Param "context" is not a HookContext. Got <function my_message_fn at 0x7f6d77900310> which is type <class 'function'>._
j
the new ops/jobs paradigm should be backcompatible with solids/pipelines etc (so that you can use an op somewhere that asks for a solid) in most cases. although you may have found a case where that doesnt hold. looking at the code now...
a
And if it helps - I have been able to send to our teams environment with the '`dagster_msteams.msteams_resource` instructions. import os from xml.etree.ElementPath import ops from dagster import ModeDefinition, execute_pipeline, job, op, JobDefinition from dagster_msteams import Card, msteams_resource @op(required_resource_keys={"msteams"}) def teams_solid(context): card = Card() card.add_attachment(text_message="Hello There !!") context.resources.msteams.post_message(payload=card.payload) @job(resource_defs={"msteams": msteams_resource}) def teams_pipeline(): teams_solid() teams_pipeline, {"resources": {"msteams": {"config": {"hook_url": 'https://XXXXXX.COM/XXXXX'}}}}
j
ok that's promising!
a
I bet there is an easy way to get job or op failure/success context and inject it into the card - Im just not familiar enough to do so without some instructions. Thanks!
j
if it seems like the existing msteams hooks aren't working with your setup, you can always author your own hook that uses the msteams resource. although since the resource is working, it seems like we should be able to get the hook working too
a
I'll give it go.
j
i haven't tested this out, but based on the implementation of the hooks, i'm thinking this might work
Copy code
def my_message_fn(context: HookContext) -> str:
    return "Op {op_name} success!".format(
        op_name=context.op
    )

@op()
def start_logging_def(context):
    <http://logging.info|logging.info>('Seg Dim start time {}'.format(datetime.now()))

@job (description='This process does something cool.')
def seg_dim():
    start_logging_def.with_hooks(hook_defs={teams_on_success(message_fn=my_message_fn, dagit_base_url="localhost:3000")})
I'm with you though, the docs are very misleading. we're doing an audit of our extranal library integrations (especially ones that are community contributions) in the next couple months. i'll add msteams to the list. Another resource i found helpful was looking at the tests https://github.com/dagster-io/dagster/blob/master/python_modules/libraries/dagster-msteams/dagster_msteams_tests/test_hooks.py you can see how the original author is using the hooks
a
Hey Jamie! Your info on the op hooks for success/failure was the missing piece. In case it helps someone else - here is the scrubbed write up. I also noticed I still got prompted to put in a config inside of dagit - but I let it scaffold it and put in the correct URL there at it works fine. Guess I dont have the resources config in the right place. Oh well. It working. Thanks a ton for your help!! You all are SOO helpful and fast to respond! Thanks again. @success_hook(required_resource_keys={"msteams"}) def msteams_message_on_success(context: HookContext): message = f"Op {context.op.name} finished successfully" card = Card() card.add_attachment(text_message=message) context.resources.msteams.post_message(payload=card.payload) @op1() @op2() @op3() @job (resource_defs={"msteams": msteams_resource}, hooks={msteams_message_on_success}) def jobdef(): op1(op2(op3())), {"resources": {"msteams": {"config": {"hook_url": '<https:xxxxxxxx.com/xxxxxx'}}}}>
j
awesome! glad it worked. If you're looking for other ways to set the config on the msteams resource (so you dont have the enter the url every time) you can use the configured api to do something like this
Copy code
@job (resource_defs={"msteams": msteams_resource.configured(hook_url="some/url")}, hooks={msteams_message_on_success})
def jobdef():
    op1(op2(op3())),
a
Awesome! Thanks! And here is a pretty picture of my teams notifications... complete with the dagster logo. 🙂 Thanks again!
🎉 1