Linton Ye
02/25/2024, 2:50 AMScout
02/25/2024, 2:50 AM@job
decorator or the GraphDefinition.to_job
method. Hooks are functions that are triggered by certain events during the execution of a job, such as the success or failure of an op within the job. Here's how you can configure hooks on a job: 1. Define your hook using the @success_hook
or @failure_hook
decorators, depending on when you want the hook to be triggered. 2. Apply the hook to a job by passing it to the hooks
parameter of the @job
decorator or the to_job
method. Here is an example from the documentation that shows how to apply a hook to every op within a job: python from dagster import job, op @op def my_op(): # Your op logic here pass @job(hooks={my_hook}) def my_job(): my_op()
In this example, my_hook
is a hook that you have previously defined, and it will be applied to every op instance within the my_job
. When you run this job, you can provide configuration to any resources that the hook might require in the run config. Remember that hooks can also be applied to individual ops within a job if you want more granular control over when hooks are triggered. This can be done by using the with_hooks
method on an op instance: python from dagster import job, op @op def my_op(): # Your op logic here pass @job def my_job(): my_op.with_hooks({my_hook})()
In this case, my_hook
will only be triggered by the execution of my_op
. For more detailed information on how to define and use hooks, you can refer to the Dagster documentation on hooks and job configuration.Linton Ye
02/25/2024, 2:54 AMin_process_full_analysis_job = define_asset_job(
name="in_process_full_analysis",
config={
"ops": {"project_settings": {"config": {"project_id": "gq-pjfitz-5row-5ldR3"}}}
},
selection=AssetSelection.all(),
executor_def=in_process_executor,
hooks=[on_failure],
)
Scout
02/25/2024, 2:54 AMon_failure
to an asset job defined with define_asset_job
. However, to ensure that the on_failure
hook is correctly defined and used, you need to follow these steps: 1. Define the on_failure
hook using the @failure_hook
decorator. This hook will be triggered when an op within the job fails. Here's an example of how you might define an on_failure
hook: python from dagster import failure_hook, HookContext @failure_hook(required_resource_keys={"slack"}) def on_failure(context: HookContext): # You can access resources in the context like context.resources.slack message = f"Op {context.op.name} failed!" # Send a message to Slack or perform other failure handling context.resources.slack.send_message(message)
2. Attach the hook to your job definition. The hooks
parameter expects a set of hook definitions, so you need to pass a set containing the on_failure
hook: python from dagster import define_asset_job, AssetSelection, in_process_executor # Assuming on_failure is defined as shown above in_process_full_analysis_job = define_asset_job( name="in_process_full_analysis", config={ "ops": {"project_settings": {"config": {"project_id": "gq-pjfitz-5row-5ldR3"}}} }, selection=AssetSelection.all(), executor_def=in_process_executor, hooks={on_failure}, # Pass the hook as a set )
Make sure that the on_failure
hook is defined before you use it in the hooks
parameter, and ensure that it is passed as a set (enclosed in curly braces {}
) rather than a list. Additionally, if your hook requires resources (like a Slack client in the example), you need to define and provide those resources in your job definition. The required_resource_keys
parameter in the hook decorator specifies which resources the hook depends on. Please check the Dagster documentation for the most up-to-date and detailed instructions on defining and using hooks with jobs.