https://dagster.io/ logo
Title
d

David Bieber

11/28/2021, 11:58 PM
Hi, I am very new to dagster and I am trying to figure out how you access the context file when running scheduled jobs. What I would like to do is schedule a job every fixed time period and then query a db using the the execution time of the scheduler as one of the input. I had expected the scheduler to write to 
context.op_config
 the execution time, which would be used by the db function. However the scheduler does not appear to have access to all the parameters in the 
context
 so the job fails. Here is a bare bones example which is similar to the example given in github at scheduler example
class DBWarehouse
   def __init__(params)
      self.db_instance = db_conn(param)

   def read_db(time_period):
      self.db_instance.read(time_period)

@resourse
def local_warehouse_resource(context):
   returns DBWarehouse(context.resource_config.parma]

@op(required_resource_key="warehouse")
def do_stuff(context)
   time_period = context.op_config["datetime"]
   context.resources.warehouse.read_db[time_period]

@job(resource_def={"warehouse":local_warehouse_resource})
def main_job()
   do_suff()

@schedule(
   cron_schedule="* * * * *",
   job=main_job
)
def minute_schedule(context):
   datetime = str(context.scheduled_execution_time)
   return {"ops":{"do_stuff":{"config":{"datetime":datetime}}}}

@repository
def do_stuff_repo():
   return [main_job, minute_schedule]
and the corresponding yaml
ops:
  do_stuff:
    config: 
        params: "ops config stuff"
        datatime: "datetime"
resources:
  warehouse:
    config:
       parmas: "resource warehouse stuff"
this function works when I running it as a single job 
do_stuff_repo
in dagit however when I run it in dagit in the scheduler, it fails since it does have full access the the context... only to the execution datetime
ops:
  do_stuff:
    config: 
        datatime: "datetime"
I would really appreciate any help on this. Thanks David
d

daniel

11/29/2021, 12:19 AM
Hey David, welcome! Is it possible to polish the text of the error that you’re seeing? What you said here all sounds right so I’m wondering if the error might explain what’s going on
Re: ‘it fails because it doesn’t have access to the full context’ - the scheduler/schedule function doesn’t actually execute your job, it only determines the config and launches the run (just like you when manually launching it from dagit). So the part that has access to the scheduled time (the @schedule) should be pretty separate from the part that has access to the full context (the @job)
d

David Bieber

11/29/2021, 12:50 AM
Hi Daniel, So here is the error stack (sorry for the data dump but you can see where it is failing). It appears to be there is no resource definitions available doing the scheduled run (i.e. local_host etc). It could be a result of the way I am running dagit
dagit -f kdb_scheduler.py
and then I paste in the configuration into the launch pad. I must be doing something wrong but it is not clear from the docs where I am going wrong. Does this help?
dagster.core.errors.DagsterSubprocessError: During multiprocess execution errors occurred in child processes:
In process 12972: dagster.core.errors.DagsterResourceFunctionError: Error executing resource_fn on ResourceDefinition warehouse

Stack Trace:
File "c:\users\db70524\documents\python\dagster\venv36\lib\site-packages\dagster\core\executor\child_process_executor.py", line 65, in _execute_command_in_child_process
for step_event in command.execute():
File "c:\users\db70524\documents\python\dagster\venv36\lib\site-packages\dagster\core\executor\multiprocess.py", line 83, in execute
instance=instance,
File "c:\users\db70524\documents\python\dagster\venv36\lib\site-packages\dagster\core\execution\api.py", line 856, in _iter_
yield from self.execution_context_manager.prepare_context()
File "c:\users\db70524\documents\python\dagster\venv36\lib\site-packages\dagster\utils\_init_.py", line 447, in generate_setup_events
obj = next(self.generator)
File "c:\users\db70524\documents\python\dagster\venv36\lib\site-packages\dagster\core\execution\context_creation_pipeline.py", line 256, in execution_context_event_generator
yield from resources_manager.generate_setup_events()
File "c:\users\db70524\documents\python\dagster\venv36\lib\site-packages\dagster\utils\_init_.py", line 447, in generate_setup_events
obj = next(self.generator)
File "c:\users\db70524\documents\python\dagster\venv36\lib\site-packages\dagster\core\execution\resources_init.py", line 238, in resource_initialization_event_generator
pipeline_def_for_backwards_compat=pipeline_def_for_backwards_compat,
File "c:\users\db70524\documents\python\dagster\venv36\lib\site-packages\dagster\core\execution\resources_init.py", line 190, in _core_resource_initialization_event_generator
raise dagster_user_error
File "c:\users\db70524\documents\python\dagster\venv36\lib\site-packages\dagster\core\execution\resources_init.py", line 161, in _core_resource_initialization_event_generator
for event in manager.generate_setup_events():
File "c:\users\db70524\documents\python\dagster\venv36\lib\site-packages\dagster\utils\_init_.py", line 447, in generate_setup_events
obj = next(self.generator)
File "c:\users\db70524\documents\python\dagster\venv36\lib\site-packages\dagster\core\execution\resources_init.py", line 316, in single_resource_event_generator
raise dagster_user_error
File "c:\users\db70524\documents\python\dagster\venv36\lib\site-packages\dagster\core\execution\resources_init.py", line 310, in single_resource_event_generator
"Resource generator {name} must yield one item.".format(name=resource_name)
File "C:\Program Files (x86)\Microsoft Visual Studio\Shared\Anaconda3_64\lib\contextlib.py", line 99, in _exit_
self.gen.throw(type, value, traceback)
File "c:\users\db70524\documents\python\dagster\venv36\lib\site-packages\dagster\core\errors.py", line 194, in user_code_error_boundary
) from e

The above exception was the direct cause of the following exception:
TypeError: 'NoneType' object is not subscriptable

Stack Trace:
File "c:\users\db70524\documents\python\dagster\venv36\lib\site-packages\dagster\core\errors.py", line 185, in user_code_error_boundary
yield
File "c:\users\db70524\documents\python\dagster\venv36\lib\site-packages\dagster\core\execution\resources_init.py", line 293, in single_resource_event_generator
if is_context_provided(get_function_params(resource_def.resource_fn))
File "kdb_scheduler.py", line 187, in local_warehouse_resource
local_host=context.resource_config["local_host"],


 File "c:\users\db70524\documents\python\dagster\venv36\lib\site-packages\dagster\core\execution\api.py", line 774, in pipeline_execution_iterator
   for event in pipeline_context.executor.execute(pipeline_context, execution_plan):
 File "c:\users\db70524\documents\python\dagster\venv36\lib\site-packages\dagster\core\executor\multiprocess.py", line 236, in execute
   subprocess_error_infos=list(errs.values()),
Btw thanks Daniel for the welcome. I do think that dagster looks great and I am excited to see if I am able to make use of it.
Just a thought… but do you need to define a separate configuration when running the scheduler?
d

daniel

11/29/2021, 3:35 PM
I first want to make sure we are referring to the same thing when we say 'running the scheduler' - the way to do that is described here and involves a
dagster-daemon
process that you run separately from dagit: https://docs.dagster.io/concepts/partitions-schedules-sensors/schedules#running-the-scheduler . Once you have a schedule defined, have turned that schedule on, and have the scheduler running, there should be no other action required on your part in order for the schedule to start launching runs. So you're losing me a bit when you say "and then I paste in the configuration into the launch pad".
d

David Bieber

11/29/2021, 3:39 PM
So I do exactly as you describe I set the SET DAGSTER_HOME=directory of the kdb_scheduler.py 1. run
dagit -f kdb_scheduler.py
2. run
dagster-daemon run
3. when I look in the dagit window I can see the scheduler running in the status->schedules window 4. to test the main_job I paste in the yaml configuration into the lauchpad window... it works 5. to test the scheduler I switch on the scheduler in the dagit status->schedules window... but it fails
d

daniel

11/29/2021, 3:43 PM
Ah, so if you're seeing an issue about resource config missing - that resource config will need to be included in the config generated by the schedule function (other than providing a way to toggle the schedule on and off, dagit isn't involved in the schedule execution at all - the config needs to be self-contained within the schedule function).
d

David Bieber

11/29/2021, 3:47 PM
ahh - that would explain it
where and how should this be defined in the scheduler function
so I have a missing resource config and a missing op config (other than the datetime of execution)
d

daniel

11/29/2021, 3:49 PM
The dict returned from the schedule function should match directly to the config that you would enter into dagit to launch the run - so instead of returning
{"ops":{"do_stuff":{"config":{"datetime":datetime}}}}
you would add a "resources" key as well and any other additional config that's needed for the run to work
d

David Bieber

11/29/2021, 3:50 PM
also is this the correct way of using dagster for my user-case or should I be using it in a different way
ok - so it need to be embedded into the code
is there a way of using yaml config files
in that way I would have abstraction from the scheduler code and the configuration... or maybe I am just thinking about this incorrectly
d

daniel

11/29/2021, 3:52 PM
yeah, something like this should work:
@schedule(
   cron_schedule="* * * * *",
   job=main_job
)
def my_file_based_schedule(context):
    with open(
        my_file_path,
        "r",
    ) as fd:
         return yaml.safe_load(fd.read())
d

David Bieber

11/29/2021, 3:57 PM
brilliant - thanks Daniel, will have a go but I am certain it will work now. It wasn't obvious (for me) in the example or the documentation. Happy to do an edit on github and submit if that would be helpful.
d

daniel

11/29/2021, 3:58 PM
That would be great! definitely would love this to be clearer at the outset