David Bieber
11/28/2021, 11:58 PMcontext.op_config
the execution time, which would be used by the db function. However the scheduler does not appear to have access to all the parameters in the context
so the job fails.
Here is a bare bones example which is similar to the example given in github at scheduler example
class DBWarehouse
def __init__(params)
self.db_instance = db_conn(param)
def read_db(time_period):
self.db_instance.read(time_period)
@resourse
def local_warehouse_resource(context):
returns DBWarehouse(context.resource_config.parma]
@op(required_resource_key="warehouse")
def do_stuff(context)
time_period = context.op_config["datetime"]
context.resources.warehouse.read_db[time_period]
@job(resource_def={"warehouse":local_warehouse_resource})
def main_job()
do_suff()
@schedule(
cron_schedule="* * * * *",
job=main_job
)
def minute_schedule(context):
datetime = str(context.scheduled_execution_time)
return {"ops":{"do_stuff":{"config":{"datetime":datetime}}}}
@repository
def do_stuff_repo():
return [main_job, minute_schedule]
and the corresponding yaml
ops:
do_stuff:
config:
params: "ops config stuff"
datatime: "datetime"
resources:
warehouse:
config:
parmas: "resource warehouse stuff"
this function works when I running it as a single job do_stuff_repo
in dagit
however when I run it in dagit in the scheduler, it fails since it does have full access the the context... only to the execution datetime
ops:
do_stuff:
config:
datatime: "datetime"
I would really appreciate any help on this.
Thanks
Daviddaniel
11/29/2021, 12:19 AMDavid Bieber
11/29/2021, 12:50 AMdagit -f kdb_scheduler.py
and then I paste in the configuration into the launch pad. I must be doing something wrong but it is not clear from the docs where I am going wrong. Does this help?
dagster.core.errors.DagsterSubprocessError: During multiprocess execution errors occurred in child processes:
In process 12972: dagster.core.errors.DagsterResourceFunctionError: Error executing resource_fn on ResourceDefinition warehouse
Stack Trace:
File "c:\users\db70524\documents\python\dagster\venv36\lib\site-packages\dagster\core\executor\child_process_executor.py", line 65, in _execute_command_in_child_process
for step_event in command.execute():
File "c:\users\db70524\documents\python\dagster\venv36\lib\site-packages\dagster\core\executor\multiprocess.py", line 83, in execute
instance=instance,
File "c:\users\db70524\documents\python\dagster\venv36\lib\site-packages\dagster\core\execution\api.py", line 856, in _iter_
yield from self.execution_context_manager.prepare_context()
File "c:\users\db70524\documents\python\dagster\venv36\lib\site-packages\dagster\utils\_init_.py", line 447, in generate_setup_events
obj = next(self.generator)
File "c:\users\db70524\documents\python\dagster\venv36\lib\site-packages\dagster\core\execution\context_creation_pipeline.py", line 256, in execution_context_event_generator
yield from resources_manager.generate_setup_events()
File "c:\users\db70524\documents\python\dagster\venv36\lib\site-packages\dagster\utils\_init_.py", line 447, in generate_setup_events
obj = next(self.generator)
File "c:\users\db70524\documents\python\dagster\venv36\lib\site-packages\dagster\core\execution\resources_init.py", line 238, in resource_initialization_event_generator
pipeline_def_for_backwards_compat=pipeline_def_for_backwards_compat,
File "c:\users\db70524\documents\python\dagster\venv36\lib\site-packages\dagster\core\execution\resources_init.py", line 190, in _core_resource_initialization_event_generator
raise dagster_user_error
File "c:\users\db70524\documents\python\dagster\venv36\lib\site-packages\dagster\core\execution\resources_init.py", line 161, in _core_resource_initialization_event_generator
for event in manager.generate_setup_events():
File "c:\users\db70524\documents\python\dagster\venv36\lib\site-packages\dagster\utils\_init_.py", line 447, in generate_setup_events
obj = next(self.generator)
File "c:\users\db70524\documents\python\dagster\venv36\lib\site-packages\dagster\core\execution\resources_init.py", line 316, in single_resource_event_generator
raise dagster_user_error
File "c:\users\db70524\documents\python\dagster\venv36\lib\site-packages\dagster\core\execution\resources_init.py", line 310, in single_resource_event_generator
"Resource generator {name} must yield one item.".format(name=resource_name)
File "C:\Program Files (x86)\Microsoft Visual Studio\Shared\Anaconda3_64\lib\contextlib.py", line 99, in _exit_
self.gen.throw(type, value, traceback)
File "c:\users\db70524\documents\python\dagster\venv36\lib\site-packages\dagster\core\errors.py", line 194, in user_code_error_boundary
) from e
The above exception was the direct cause of the following exception:
TypeError: 'NoneType' object is not subscriptable
Stack Trace:
File "c:\users\db70524\documents\python\dagster\venv36\lib\site-packages\dagster\core\errors.py", line 185, in user_code_error_boundary
yield
File "c:\users\db70524\documents\python\dagster\venv36\lib\site-packages\dagster\core\execution\resources_init.py", line 293, in single_resource_event_generator
if is_context_provided(get_function_params(resource_def.resource_fn))
File "kdb_scheduler.py", line 187, in local_warehouse_resource
local_host=context.resource_config["local_host"],
File "c:\users\db70524\documents\python\dagster\venv36\lib\site-packages\dagster\core\execution\api.py", line 774, in pipeline_execution_iterator
for event in pipeline_context.executor.execute(pipeline_context, execution_plan):
File "c:\users\db70524\documents\python\dagster\venv36\lib\site-packages\dagster\core\executor\multiprocess.py", line 236, in execute
subprocess_error_infos=list(errs.values()),
daniel
11/29/2021, 3:35 PMdagster-daemon
process that you run separately from dagit: https://docs.dagster.io/concepts/partitions-schedules-sensors/schedules#running-the-scheduler . Once you have a schedule defined, have turned that schedule on, and have the scheduler running, there should be no other action required on your part in order for the schedule to start launching runs. So you're losing me a bit when you say "and then I paste in the configuration into the launch pad".David Bieber
11/29/2021, 3:39 PMdagit -f kdb_scheduler.py
2. run dagster-daemon run
3. when I look in the dagit window I can see the scheduler running in the status->schedules window
4. to test the main_job I paste in the yaml configuration into the lauchpad window... it works
5. to test the scheduler I switch on the scheduler in the dagit status->schedules window... but it failsdaniel
11/29/2021, 3:43 PMDavid Bieber
11/29/2021, 3:47 PMdaniel
11/29/2021, 3:49 PM{"ops":{"do_stuff":{"config":{"datetime":datetime}}}}
you would add a "resources" key as well and any other additional config that's needed for the run to workDavid Bieber
11/29/2021, 3:50 PMdaniel
11/29/2021, 3:52 PM@schedule(
cron_schedule="* * * * *",
job=main_job
)
def my_file_based_schedule(context):
with open(
my_file_path,
"r",
) as fd:
return yaml.safe_load(fd.read())
David Bieber
11/29/2021, 3:57 PMdaniel
11/29/2021, 3:58 PM