https://dagster.io/ logo
Title
m

Marcel M

11/18/2021, 9:03 AM
Failure hook not called? I have implemented a failure hook:
@failure_hook(name='MyFailureHook', required_resource_keys={'reporting'})
def report_message_on_failure(context: HookContext):
    message = '===== Op {} failed'.format(context.op.name)
    print(message)
    context.resources.reporting.send_message(message)
I have “installed” the hook in my job:
job = ingest_openac.to_job(
        name='MyJobName',
        resource_defs={
            "openac": openac_service,
            "rel_wh": reldb_warehouse_service,
            "fs_wh": fs_warehouse_service,
            "df_io": df_table_io_manager,
            'reporting': rollbar_service,
        },
        config=config_from_files(["resources.yaml"]),
        hooks={report_message_on_failure},
    )
The reporting resource is defined as:
import rollbar


class RollbarService():
    def __init__(self, token: str, run_id: str):
        self.token = token
        rollbar.init(token, environment=run_id)

    def report_message(self, *args, **kwargs):
        rollbar.report_message(*args, **kwargs)

    def report_exc_info(self, *args, **kwargs):
        rollbar.report_exc_info(*args, **kwargs)


@resource(config_schema={"token": str})
def rollbar_service(init_context):
    token = init_context.resource_config["token"]
    # print('====================', init_context.pipeline_run.pipeline_name)
    return RollbarService(token, init_context.pipeline_run.pipeline_name)
Of course the token is defined in resources.yaml (I have verified that the resource works). To my (limited) knowledge this should be sufficient to have any error in any op be caught. However, if I raise an exception in an op, the job fails but the failure hook is not called. Am I missing something? TIA PS A similar success_hook does work.
@success_hook()
def report_message_on_success(context):
    message = '+++++ Op {} succeeded'.format(context.op.name)
    print(message)
s

sandy

11/18/2021, 4:23 PM
I would expect this to work. @yuhan - any idea why it's not working?
m

Marcel M

11/22/2021, 11:47 AM
@sandy @yuhan Some additional info. I can see the failure hook is called when running Dagit, but not when running job.execute_in_process(). HTH.
y

yuhan

11/22/2021, 3:07 PM
Hi @Marcel M sorry for the delayed response. if you run
job.execute_in_process(raise_on_error=False)
you should be seeing the failure hook get called.
❤️ 1
The cause is failure hook func get invoked in process. It means when we execute a job in process and something fails in process, the error will be raised and interrupt the process, therefore it will not reach the failure hook invocation. In dagit or when you specify not to raise on error, we choose not to let the error interrupt the process, so you can see hooks are called.
m

Marcel M

11/30/2021, 8:46 PM
@yuhan Thanks for the explanation. Maybe add this to the docs for the failure hook?
y

yuhan

11/30/2021, 9:17 PM
That’s a good idea. Will add it soon!
❤️ 1