https://dagster.io/ logo
Title
n

Nathan Saccon

09/10/2021, 8:02 PM
Quick question: Can you yield multiple runs from a scheduler like you can with a sensor?
d

daniel

09/10/2021, 8:04 PM
You can! The syntax is the same
n

Nathan Saccon

09/10/2021, 8:04 PM
Thank you!
Hey Daniel, just a quick follow-up. It seems like Dagster does not like the way I am trying to generate multiple runs from a Scheduler. Is there a different syntax I'm not aware of?
from dagster import (
    schedule,
    PresetDefinition,
    RunRequest,
    ScheduleEvaluationContext,
)
from dagster.core.definitions.run_request import SkipReason
from src.file_env.file_environment import FileEnvironment
import src.presets as presets
import copy


def retry_sched_factory(pipline_name: str, preset: PresetDefinition):
    @schedule(
        cron_schedule="*/1 * * * *",
        pipeline_name=pipline_name,
        name=f"{preset.name}_sched",
        execution_timezone="US/Eastern",
    )
    def retry_schedule(context: ScheduleEvaluationContext):
        # Pull retry dir
        retry_dir = preset.run_config["resources"]["retry_data"]["config"]
        retry_dir = retry_dir["retry_data_dir"]

        # Get files to retry
        local_env = FileEnvironment("file_system")
        retry_fnames = local_env.list_area_object_mtimes(retry_dir)
        for fname in retry_fnames:
            # Update config
            run_config = copy.deepcopy(preset.run_config)
            _config = run_config["resources"]["read_failed_send"]["config"]
            _config["retry_data_fname"] = fname

            yield RunRequest(run_config=run_config)
        if not retry_fnames:
            yield SkipReason(f"No retry files found at: {retry_dir}")
        return

    return retry_schedule


send_retry_schedule = retry_sched_factory(
    "retry_pipeline", presets.retry_send_from_to
)
The error message is:
dagster.check.ParameterCheckError: Param "run_config" is not one of ['dict', 'frozendict']. Got <generator object retry_sched_factory.<locals>.retry_schedule at 0x7f7ded443d60> which is type <class 'generator'>.
d

daniel

09/13/2021, 1:23 PM
Hmm, which version of dagster is this?
ah wait, I might see the problem. One moment
Apologies, I misremembered how this works exactly - to use a generator as your schedule function, I believe you need to create a ScheduleDefinition directly (the @schedule decorate is a thin wrapper around that class) - and pass in your generator function as the 'execution_fn' arg: https://docs.dagster.io/_apidocs/schedules-sensors#dagster.ScheduleDefinition. cc @prha, it seems like we should be able to make the original syntax nathan posted here work, by checking if the function used with the ScheduleDefinition is a generator
Here's a code example:
def define_multi_run_schedule():
    def gen_runs(context):
        if not context.scheduled_execution_time:
            date = pendulum.now().subtract(days=1)
        else:
            date = pendulum.instance(context.scheduled_execution_time).subtract(days=1)

        yield RunRequest(run_key="A", run_config=_solid_config(date), tags={"label": "A"})
        yield RunRequest(run_key="B", run_config=_solid_config(date), tags={"label": "B"})

    return ScheduleDefinition(
        name="multi_run_schedule",
        cron_schedule="0 0 * * *",
        pipeline_name="the_pipeline",
        execution_timezone="UTC",
        execution_fn=gen_runs,
    )
n

Nathan Saccon

09/13/2021, 1:55 PM
Daniel, I really appreciate you taking the time to clarify this! Thank you!
Quick update: I implemented and it works perfectly! Thanks again, I really appreciate it
:condagster: 1
p

prha

09/13/2021, 3:33 PM
I can try to get in a patch for this week to support generators using the schedule decorator