David Bieber
12/04/2021, 9:39 PM############
@op(required_resource_keys={"warehouse"})
def get_message_status(context):
message_id = context.op_config["message_id"])
message_status = context.resources.warehouse.get_message(message_id)
return message_status
@op
def add(x, y):
return x+y
@op(required_resource_keys={"warehouse"})
def get_hdb_count(context):
sym = context.op_config["sym"])
count = context.resources.warehouse.get_hdb_sym_count(sym)
return count
@op(required_resource_keys={"warehouse"})
def get_mem_count(context):
sym = context.op_config["sym"])
count = context.resources.warehouse.get_mem_sym_count(sym)
return count
############
@job
def add_job():
add(get_mem_count(), get_hdb_count())
@job
def message_status_job():
get_message()
I run these jobs as a scheduled task so need to supply two configuration,
one for add_job
ops:
get_hdb_count:
config:
sym: "asset"
get_mem_count:
config:
sym: "asset"
and one for say_message_status_job
ops:
get_message_status:
config:
message_id: "tDefStatus"
in addition there is a common resource configuration
resources:
warehouse:
config:
port: "8888"
address: "localhost"
my plan was to have separate .yaml files for each job but combine them with a common resource .yaml file in the schedule ...
@schedule(cron_schedule="* * * * *",job=add_job)
def message_status_schedule(context):
resource_config = open_yaml_file("resource.yaml")
ops_config = open_yaml_file("env_message_status.yaml")
config = combine_configs(resource_config, ops_config)
return config
@schedule(cron_schedule="* * * * *",job=add_job)
def add_schedule(context):
resource_config = open_yaml_file("resource.yaml")
ops_config = open_yaml_file("env_add.yaml")
config = combine_configs(resource_config, ops_config)
return config
It felt a bit cumbersome and was trying to work out if there is a more pythonic/dagster concept that I am missing which would be better suited?
Many thanks
Davidmax
12/04/2021, 10:58 PMconfigured
API?