James O'Toole
02/07/2024, 9:30 PMScout
02/07/2024, 9:30 PM@success_hook
decorator to define a hook that will execute when the job succeeds. You can then apply this hook to your job definition using the hooks
argument. Here's an example from the documentation: python @success_hook(required_resource_keys={"my_conn"}) def my_success_hook(context): context.resources.my_conn.send("foo") @job(hooks={my_success_hook}) def my_job(): pass
For job-level failure, you can use the @failure_hook
decorator in a similar way to handle job failures. Additionally, Dagster provides a feature called pipeline failure sensors (now referred to as job failure sensors), which are designed to execute arbitrary code when a job fails, regardless of what went wrong. This is particularly useful for workflows that run in production, where it's critical to perform automated actions, such as sending alerts, when a job does not successfully complete. Here's an example of how you might define a job failure sensor: python from dagster import job_failure_sensor, JobFailureSensorContext @job_failure_sensor def my_failure_sensor(context: JobFailureSensorContext): # Your code to send an alert or perform any action pass
You can add this failure sensor to your repository in the same way that you would add any other sensor. If you want to have different failure behavior for different jobs, you can use the job_selection
argument to specify which jobs your sensor should monitor. Dagster also provides out-of-the-box support for sensors that send emails or post Slack messages when jobs fail. You can use these pre-built sensors or define your own custom logic to integrate with other external services. Remember to add these hooks or sensors to your repository definition so that they are recognized by the Dagster system and can be triggered appropriately.James O'Toole
02/07/2024, 9:35 PM@job
decorartor? here's an excerpt from the __init__.py
file:
resources = {
"job_completion_callback_url": callback_url_resource,
"job_completion_callback_auth": callback_auth_resource,
}
all_assets = load_assets_from_modules([assets], group_name=GROUP_NAME)
materialize_all_assets_job = define_asset_job(
name="job_name",
selection=AssetSelection.all(),
hooks={success_callback, failure_callback},
# auto-terminate run after 20 minutes; we can toggle this up or down as needed
tags={MAX_RUNTIME_SECONDS_TAG: 1200},
)
defs = Definitions(
assets=all_assets, jobs=[materialize_all_assets_job], resources=resources
)
Scout
02/07/2024, 9:35 PMdefine_asset_job
function to create your job, you can still specify hooks directly in the hooks
parameter of the define_asset_job
call, as you've shown in your code excerpt. The success_callback
and failure_callback
should be hook definitions that you've created using the @success_hook
and @failure_hook
decorators, respectively. Here's an example of how you might define these hooks: python from dagster import success_hook, failure_hook @success_hook(required_resource_keys={"job_completion_callback_url", "job_completion_callback_auth"}) def success_callback(context): # Logic to notify an external service on job success pass @failure_hook(required_resource_keys={"job_completion_callback_url", "job_completion_callback_auth"}) def failure_callback(context): # Logic to notify an external service on job failure pass
Then, in your __init__.py
file, you would include these hooks in the hooks
set when defining the job: python from dagster import define_asset_job, AssetSelection # Assuming success_callback and failure_callback are defined as shown above materialize_all_assets_job = define_asset_job( name="job_name", selection=AssetSelection.all(), hooks={success_callback, failure_callback}, tags={MAX_RUNTIME_SECONDS_TAG: 1200}, ) # Continue with the rest of your job and resource definitions
Make sure that the success_callback
and failure_callback
hooks are imported or defined in the same context where you're defining the job so that they can be referenced correctly. By specifying the hooks in the hooks
parameter, you're instructing Dagster to execute these hooks at the job level, which means they will be triggered based on the overall success or failure of the job, rather than individual ops within the job.James O'Toole
02/07/2024, 9:39 PMhooks
parameter, you're instructing Dagster to execute these hooks at the job level, which means they will be triggered based on the overall success or failure of the job, rather than individual ops within the job."Scout
02/07/2024, 9:39 PM@job
decorator and the GraphDefinition.to_job
function accept hooks
as a parameter. When you apply a hook to a job, it will apply the hook to every op instance within that job. This implies that the hooks are associated with the job's execution context and will respond to job-level events. Here's an excerpt from the documentation that supports this: > For example, you want to send a slack message to a channel when any op fails in a job. In this case, we will be applying a hook on a job, which will apply the hook on every op instance within in that job. The @job
decorator accepts hooks
as a parameter. Likewise, when creating a job from a graph, hooks are also accepted as a parameter in the GraphDefinition.to_job
function. This suggests that when you define hooks at the job level, they are intended to respond to events related to the job as a whole, which includes the overall success or failure of the job. While the documentation does not explicitly state that hooks defined at the job level will only be triggered by the overall job success or failure, the context in which hooks are described implies that they are tied to the job's lifecycle events. If you need more detailed information or clarification on the behavior of job-level hooks, you may refer to the specific sections in the documentation that discuss hooks and their usage with jobs and graphs.James O'Toole
02/07/2024, 9:41 PMScout
02/07/2024, 9:41 PM@job
decorator or the GraphDefinition.to_job
function, the success hook will be triggered when the entire job succeeds, not for every successful op. The job-level success hook is designed to respond to the job's completion event, indicating that all ops within the job have executed successfully. Here's a relevant excerpt from the documentation: > For example, you want to send a slack message to a channel when any op fails in a job. In this case, we will be applying a hook on a job, which will apply the hook on every op instance within in that job. This indicates that while the hook is applied to every op instance within the job, its execution is tied to the job-level events. Therefore, a job-level success hook will only be triggered once upon the successful completion of the entire job. If you want to trigger a hook after each op's success, you would need to apply the hook at the op level instead.