https://dagster.io/ logo
#ask-community
Title
# ask-community
w

won

04/22/2023, 7:08 AM
hi there! i try new dagster 1.31 and found error with
failure_hook
my new repo uses
create_repository_using_definitions_args
Copy code
dev = create_repository_using_definitions_args(
    name="dev",
        schedules=[
        schedule_for_acc(orjson.loads(os.getenv("SIP_1")), "08 17 * * *"),
    ],
    jobs=[dev_report_job],
    resources={"db_res": PGWarehouse(connection_uri=EnvVar("POSTGRES_CONNECTION_STRING"), database="test")},
)
and everything is ok, until i decorate
dev_report_job
with
discord_message_on_failure
Copy code
@discord_message_on_failure
@job
def dev_report_job():
    update_service_second(start=update_service())
Copy code
@failure_hook(required_resource_keys={"discord_webhook"})
def discord_message_on_failure(context: HookContext):
    print('just print for test')
traceback
Copy code
dagster._core.errors.DagsterInvalidDefinitionError: resource with key 'db_res' required by op 'update_service' was not provided. Please provide a <class 'dagster._core.definitions.resource_definition.ResourceDefinition'> to key 'db_res', or change the required key to one of the following keys which points to an <class 'dagster._core.definitions.resource_definition.ResourceDefinition'>: ['io_manager']
full job code
Copy code
from dagster import In, job, op, Nothing, get_dagster_logger, Config
from src.resources import PGWarehouse
from src.utils.hooks import discord_message_on_failure

log = get_dagster_logger()


class OpConf(Config):
    table_name: str


@op
def update_service(db_res: PGWarehouse):
    r = db_res.execute_query("select now()")
    <http://log.info|log.info>(r.fetchone())


@op(ins={"start": In(Nothing)})
def update_service_second(config: OpConf, db_res: PGWarehouse):
    r = db_res.execute_query(f"select 'foo', '{config.table_name}'")
    <http://log.info|log.info>(r.fetchone())

@discord_message_on_failure
@job
def dev_report_job():
    update_service_second(start=update_service())
3 Views