Matthew Heguy
03/22/2023, 5:39 PMjamie
03/22/2023, 8:31 PMMatthew Heguy
03/23/2023, 12:53 PMfrom datetime import datetime
from dagster import (
InitResourceContext,
ScheduleDefinition,
ScheduleEvaluationContext,
job,
op,
resource,
validate_run_config,
)
@resource(config_schema={"environment": str})
def api_client(context: InitResourceContext) -> str:
return f'Resource: {context.resource_config["environment"]}'
@op(required_resource_keys={"api_client"})
def my_op(context) -> None:
api_client_res = context.resources.api_client
print(api_client_res)
@job(resource_defs={"api_client": api_client})
def my_job() -> None:
my_op()
my_schedule = ScheduleDefinition(
job=my_job,
cron_schedule="* * * * *",
run_config=validate_run_config(
my_job,
{"resources": {"api_client": {"config": {"environment": "prod"}}}},
),
)
# Running the job with the config works
my_job.execute_in_process(
run_config={"resources": {"api_client": {"config": {"environment": "prod"}}}}
)
# Running the schedule produces a bad run config
result = my_schedule.evaluate_tick(
context=ScheduleEvaluationContext(None, datetime(2000, 1, 1, 0, 0, 0))
)
run_config = result.run_requests[0].run_config
print(run_config)
validate_run_config(my_job, run_config) # Throws exception
jamie
03/23/2023, 1:47 PMvalidate_run_config
returns full configuration for the entire run of the job. This will include configuration information for how to execute the run (ie multiprocess, single process, on kubernetes, etc), logging, and all kinds of other stuff. However, the configuration just needed for my_job
is {"resources": {"api_client": {"config": {"environment": "prod"}}}}
so when you run validate_run_config
in your schedule definition, you’re telling dagster to use the full configuration (including logging info, execution info etc) as the configuration for my_job
instead you should do this
my_schedule = ScheduleDefinition(
job=my_job,
cron_schedule="* * * * *",
run_config={"resources": {"api_client": {"config": {"environment": "prod"}}}},
)
Matthew Heguy
03/23/2023, 1:50 PM