Marcel M
11/18/2021, 9:03 AM@failure_hook(name='MyFailureHook', required_resource_keys={'reporting'})
def report_message_on_failure(context: HookContext):
message = '===== Op {} failed'.format(context.op.name)
print(message)
context.resources.reporting.send_message(message)
I have “installed” the hook in my job:
job = ingest_openac.to_job(
name='MyJobName',
resource_defs={
"openac": openac_service,
"rel_wh": reldb_warehouse_service,
"fs_wh": fs_warehouse_service,
"df_io": df_table_io_manager,
'reporting': rollbar_service,
},
config=config_from_files(["resources.yaml"]),
hooks={report_message_on_failure},
)
The reporting resource is defined as:
import rollbar
class RollbarService():
def __init__(self, token: str, run_id: str):
self.token = token
rollbar.init(token, environment=run_id)
def report_message(self, *args, **kwargs):
rollbar.report_message(*args, **kwargs)
def report_exc_info(self, *args, **kwargs):
rollbar.report_exc_info(*args, **kwargs)
@resource(config_schema={"token": str})
def rollbar_service(init_context):
token = init_context.resource_config["token"]
# print('====================', init_context.pipeline_run.pipeline_name)
return RollbarService(token, init_context.pipeline_run.pipeline_name)
Of course the token is defined in resources.yaml (I have verified that the resource works).
To my (limited) knowledge this should be sufficient to have any error in any op be caught. However, if I raise an exception in an op, the job fails but the failure hook is not called.
Am I missing something? TIA
PS A similar success_hook does work.
@success_hook()
def report_message_on_success(context):
message = '+++++ Op {} succeeded'.format(context.op.name)
print(message)
sandy
11/18/2021, 4:23 PMMarcel M
11/22/2021, 11:47 AMyuhan
11/22/2021, 3:07 PMjob.execute_in_process(raise_on_error=False)
you should be seeing the failure hook get called.Marcel M
11/30/2021, 8:46 PMyuhan
11/30/2021, 9:17 PM