won
06/11/2023, 7:22 PM@failure_hook(required_resource_keys={"discord_webhook"})
def discord_message_on_failure(context: HookContext):
run_url = f"<http://127.0.0.1:3000/instance/runs/{context.run_id}>"
error_reason = str(context.op_exception)[:500]
message = f"""
@here **failed** OP `{context.op.name}` in `{context.job_name}` job.\n{run_url}\nException: `{error_reason}...` """
url = context.resources.discord_webhook["url"]
post_message_to_discord(message, url)
error:
dagster._check.ParameterCheckError: Param "context" is not a HookContext. Got <dagster._core.definitions.assets.AssetsDefinition object at 0x000002BF7FCAAF50> which is type <class 'dagster._core.definitions.assets.AssetsDefinition'>.
owen
06/12/2023, 5:52 PMdefine_asset_job(..., hooks={discord_message_on_failure})
. How are you using that failure hook when you get that error?won
06/13/2023, 7:50 AM@discord_message_on_failure
@asset(key_prefix=["test_legs"])
def insects(context, legs) -> pd.DataFrame:
result = legs[legs['num_legs'] > 4]
<http://context.log.info|context.log.info>(2 / 0)
<http://context.log.info|context.log.info>(result.shape)
return result
won
06/13/2023, 8:34 AMdiscord_message_on_failure
decorator from assets def and add hook to define_asset_job
def schedule_for_acc(account_credentials: dict, cron_schedule: str):
@schedule(
cron_schedule=cron_schedule,
job=dev_report_job,
name=account_credentials.get("CLIENT_ID"),
description=account_credentials.get("DESCRIPTION"),
tags={
"project": PROJECT,
},
)
def _schedule(_context):
run_config = {
"ops": {
"update_service_second": {
"config": {
"table_name": "table_test",
},
},
}
}
return run_config
return _schedule
dev = create_repository_using_definitions_args(
name="dev",
schedules=[
schedule_for_acc(orjson.loads(os.getenv("SIP_1")), "08 17 * * *"),
],
jobs=[define_asset_job("daily_refresh", selection=AssetSelection.all(), hooks={discord_message_on_failure})],
resources={"discord_webhook": discord_webhook},
)
error massag:
dagster._core.errors.DagsterInvalidDefinitionError: resource with key 'discord_webhook' required by hook 'discord_message_on_failure' attached to job 'daily_refresh' was not provided. Please provide a <class 'dagster._core.definitions.resource_definition.ResourceDefinition'> to key 'discord_webhook', or change the required key to one of the following keys which points to an <class 'dagster._core.definitions.resource_definition.ResourceDefinition'>: ['io_manager']
Brian Githinji
06/23/2023, 11:32 AMTypeError: slack_message_on_success() missing 1 required positional argument: 'slack'
when running
import requests
from dagster import asset, HookContext, success_hook, define_asset_job, Definitions, ScheduleDefinition, AssetSelection
from dagster_slack import SlackResource
import os
SLACK_TOKEN = os.getenv("SLACK_TOKEN")
@asset
def hackernews_top_story_ids():
top_story_ids = requests.get("<https://hacker-news.firebaseio.com/v0/topstories.json>").json()
return top_story_ids[:10]
@success_hook()
def slack_message_on_success(context: HookContext, slack: SlackResource):
message = f"{context.job_name} succeded"
slack.get_client().chat_postMessage(channel="#123-data", text=message)
test_job = define_asset_job('my_job',
selection=AssetSelection.all(),
hooks={slack_message_on_success}
)
defs = Definitions(assets=[hackernews_top_story_ids],
resources={'slack': SlackResource(token=SLACK_TOKEN)},
schedules=[ScheduleDefinition(job=test_job, cron_schedule="@monthly")]
)
Brian Githinji
06/23/2023, 11:37 AMBrian Githinji
06/23/2023, 11:55 AMowen
06/23/2023, 9:51 PM