won
04/22/2023, 7:08 AMfailure_hook
my new repo uses create_repository_using_definitions_args
dev = create_repository_using_definitions_args(
name="dev",
schedules=[
schedule_for_acc(orjson.loads(os.getenv("SIP_1")), "08 17 * * *"),
],
jobs=[dev_report_job],
resources={"db_res": PGWarehouse(connection_uri=EnvVar("POSTGRES_CONNECTION_STRING"), database="test")},
)
and everything is ok, until i decorate dev_report_job
with discord_message_on_failure
@discord_message_on_failure
@job
def dev_report_job():
update_service_second(start=update_service())
@failure_hook(required_resource_keys={"discord_webhook"})
def discord_message_on_failure(context: HookContext):
print('just print for test')
traceback
dagster._core.errors.DagsterInvalidDefinitionError: resource with key 'db_res' required by op 'update_service' was not provided. Please provide a <class 'dagster._core.definitions.resource_definition.ResourceDefinition'> to key 'db_res', or change the required key to one of the following keys which points to an <class 'dagster._core.definitions.resource_definition.ResourceDefinition'>: ['io_manager']
won
04/22/2023, 7:12 AMfrom dagster import In, job, op, Nothing, get_dagster_logger, Config
from src.resources import PGWarehouse
from src.utils.hooks import discord_message_on_failure
log = get_dagster_logger()
class OpConf(Config):
table_name: str
@op
def update_service(db_res: PGWarehouse):
r = db_res.execute_query("select now()")
<http://log.info|log.info>(r.fetchone())
@op(ins={"start": In(Nothing)})
def update_service_second(config: OpConf, db_res: PGWarehouse):
r = db_res.execute_query(f"select 'foo', '{config.table_name}'")
<http://log.info|log.info>(r.fetchone())
@discord_message_on_failure
@job
def dev_report_job():
update_service_second(start=update_service())