hi / is there any way to use my failure_hook with ...
# ask-community
w
hi / is there any way to use my failure_hook with assets?
Copy code
@failure_hook(required_resource_keys={"discord_webhook"})
def discord_message_on_failure(context: HookContext):
    run_url = f"<http://127.0.0.1:3000/instance/runs/{context.run_id}>"
    error_reason = str(context.op_exception)[:500]
    message = f"""
    @here **failed** OP `{context.op.name}` in `{context.job_name}` job.\n{run_url}\nException: `{error_reason}...` """
    url = context.resources.discord_webhook["url"]
    post_message_to_discord(message, url)
error:
Copy code
dagster._check.ParameterCheckError: Param "context" is not a HookContext. Got <dagster._core.definitions.assets.AssetsDefinition object at 0x000002BF7FCAAF50> which is type <class 'dagster._core.definitions.assets.AssetsDefinition'>.
❤️ 1
o
hi @won! as of a recent update, you're able to set hooks when using
define_asset_job(..., hooks={discord_message_on_failure})
. How are you using that failure hook when you get that error?
w
decorate asset
Copy code
@discord_message_on_failure
@asset(key_prefix=["test_legs"])
def insects(context, legs) -> pd.DataFrame:
    result = legs[legs['num_legs'] > 4]
    <http://context.log.info|context.log.info>(2 / 0)
    <http://context.log.info|context.log.info>(result.shape)
    return result
i'v remove
discord_message_on_failure
decorator from assets def and add hook to
define_asset_job
Copy code
def schedule_for_acc(account_credentials: dict, cron_schedule: str):

    @schedule(
        cron_schedule=cron_schedule,
        job=dev_report_job,
        name=account_credentials.get("CLIENT_ID"),
        description=account_credentials.get("DESCRIPTION"),
        tags={
            "project": PROJECT,
        },
    )
    def _schedule(_context):
        run_config = {
            "ops": {
                "update_service_second": {
                    "config": {
                        "table_name": "table_test",
                    },
                },
            }
        }
        return run_config
    return _schedule



dev = create_repository_using_definitions_args(
    name="dev",
    schedules=[
        schedule_for_acc(orjson.loads(os.getenv("SIP_1")), "08 17 * * *"),
    ],
    jobs=[define_asset_job("daily_refresh", selection=AssetSelection.all(), hooks={discord_message_on_failure})],
    resources={"discord_webhook": discord_webhook},
)
error massag:
Copy code
dagster._core.errors.DagsterInvalidDefinitionError: resource with key 'discord_webhook' required by hook 'discord_message_on_failure' attached to job 'daily_refresh' was not provided. Please provide a <class 'dagster._core.definitions.resource_definition.ResourceDefinition'> to key 'discord_webhook', or change the required key to one of the following keys which points to an <class 'dagster._core.definitions.resource_definition.ResourceDefinition'>: ['io_manager']
b
getting a similar issue
Copy code
TypeError: slack_message_on_success() missing 1 required positional argument: 'slack'
when running
Copy code
import requests
from dagster import asset, HookContext, success_hook, define_asset_job, Definitions, ScheduleDefinition, AssetSelection
from dagster_slack import SlackResource
import os

SLACK_TOKEN = os.getenv("SLACK_TOKEN")

@asset
def hackernews_top_story_ids():
    top_story_ids = requests.get("<https://hacker-news.firebaseio.com/v0/topstories.json>").json()
    return top_story_ids[:10]

@success_hook()
def slack_message_on_success(context: HookContext, slack: SlackResource):
    message = f"{context.job_name} succeded"
    slack.get_client().chat_postMessage(channel="#123-data", text=message)

test_job = define_asset_job('my_job', 
                 selection=AssetSelection.all(),
                 hooks={slack_message_on_success}
                 )

defs = Definitions(assets=[hackernews_top_story_ids], 
            resources={'slack': SlackResource(token=SLACK_TOKEN)},
            schedules=[ScheduleDefinition(job=test_job, cron_schedule="@monthly")]
            )
The resource itself is viewable in dagit. So seems that it isn't getting passed to the hook.
Looks like an issue was opened here
o
Hi! Two separate things going on here: • for @won, this was a bug involving hooks that had required resource keys in general, which will be fixed in next week's release • for @Brian Githinji , this involves an additional bug with passing resources through the arguments of success_hooks/failure_hooks, which I don't have an estimate for a fix at the moment. if you convert passing in required resource keys to the decorator / accessing them via context.resources, then you'll be back in won's camp (fix next week)
❤️ 3
210 Views