Hi all, I'm trying to create a schedule that runs ...
# ask-community
j
Hi all, I'm trying to create a schedule that runs my pipeline at the start of each hour. I want to be able to run this through dagit. I have a pipelines.py file where I define solids and their dependencies and I'm deploying through dagster through kuberenetes. I'm trying to run a non partition based schedule: https://docs.dagster.io/concepts/partitions-schedules-sensors/schedules#non-partition-based-schedules I'm having some trouble initializing the schedule. I'm not sure how to trigger this schedule. On dagit, I see: no schedulers found for this instance This is what I have written for the code.
Copy code
@schedule(cron_schedule="0 * * * *", pipeline_name="my_pipeline", execution_timezone="US/Eastern")

def my_schedule():
    return {}
Not sure what else I have to do here, do I need to pass this schedule to my repository for it to be defined? Thanks a lot!
d
Hi Jay - "do I need to pass this schedule to my repository for it to be defined?" - this is exactly right. Include it along with the pipelines and you're good
j
Thanks for your quick reply Daniel. I passed it to my repository like this:
Copy code
@repository
def my_pipeline_repository():
    return [get_pipelines(),my_schedule()]
I got this error: dagster.core.errors.DagsterInvalidInvocationError: Schedule decorated function has context argument, but no context argument was provided when invoking. I noticed that this could be related to the dagster version: https://dagster.slack.com/archives/C01U954MEER/p1626383309161700. I'm currently on 0.11.15, do I need to upgrade?
d
on that version the schedule function takes in a context argument. changing to
Copy code
def my_schedule(_context):
    return {}
should fix it. Upgrading dagster would also fix it.
j
Hi Daniel, I tried the above approach and ran into the same error of no context argument was provided when invoking.:
Copy code
def my_schedule(_context):
    return {}
I am not sure what to pass for the context. Looking here: https://docs.dagster.io/concepts/partitions-schedules-sensors/schedules#relevant-apis, Not sure if Should I use the
build_schedule_context
to create the context. Thanks for your help.
d
oh! Sorry. I think I misdiagnosed the error. This is what you want:
Copy code
@repository
def my_pipeline_repository():
    return [get_pipelines(),my_schedule]
(no need to call my_schedule like its a function - the @schedule decorator turns it into an object that you can pass around)
j
Hi Daniel, Reason why I was passing:
Copy code
@repository
def my_pipeline_repository():
    return [get_pipelines(),my_schedule()]
because of an error I was getting an error: dagster.core.errors.DagsterInvalidDefinitionError: Bad return value from repository construction function: all elements of list must be of type PipelineDefinition, PartitionSetDefinition, ScheduleDefinition, or SensorDefinition. Got value of type <class 'list'> at index 0. It is expecting the type to be list/pipeline etc Just a reminder, I'm passing a context to my schedule,
Copy code
def my_schedule(_context):
    return {}
I'm not sure how to invoke the context or is it invoked automatically. I keep receiving a dagster.core.errors.DagsterInvalidInvocationError: Schedule decorated function has context argument, but no context argument was provided when invoking. thanks for your help.
d
Hi Jay, I think you want:
Copy code
@repository
def my_pipeline_repository():
    return get_pipelines() + [my_schedule]
What you had before,
Copy code
[get_pipelines(),my_schedule]
is a list with two elements, the first element being a list of pipeline definitions and the second being a schedule definition. You want a single list of definitions with no nesting
j
Hi Daniel, thanks for your help. I was able to get a schedule up and running.
condagster 1