Nathan Saccon
09/10/2021, 8:02 PMdaniel
09/10/2021, 8:04 PMNathan Saccon
09/10/2021, 8:04 PMfrom dagster import (
schedule,
PresetDefinition,
RunRequest,
ScheduleEvaluationContext,
)
from dagster.core.definitions.run_request import SkipReason
from src.file_env.file_environment import FileEnvironment
import src.presets as presets
import copy
def retry_sched_factory(pipline_name: str, preset: PresetDefinition):
@schedule(
cron_schedule="*/1 * * * *",
pipeline_name=pipline_name,
name=f"{preset.name}_sched",
execution_timezone="US/Eastern",
)
def retry_schedule(context: ScheduleEvaluationContext):
# Pull retry dir
retry_dir = preset.run_config["resources"]["retry_data"]["config"]
retry_dir = retry_dir["retry_data_dir"]
# Get files to retry
local_env = FileEnvironment("file_system")
retry_fnames = local_env.list_area_object_mtimes(retry_dir)
for fname in retry_fnames:
# Update config
run_config = copy.deepcopy(preset.run_config)
_config = run_config["resources"]["read_failed_send"]["config"]
_config["retry_data_fname"] = fname
yield RunRequest(run_config=run_config)
if not retry_fnames:
yield SkipReason(f"No retry files found at: {retry_dir}")
return
return retry_schedule
send_retry_schedule = retry_sched_factory(
"retry_pipeline", presets.retry_send_from_to
)
dagster.check.ParameterCheckError: Param "run_config" is not one of ['dict', 'frozendict']. Got <generator object retry_sched_factory.<locals>.retry_schedule at 0x7f7ded443d60> which is type <class 'generator'>.
daniel
09/13/2021, 1:23 PMdef define_multi_run_schedule():
def gen_runs(context):
if not context.scheduled_execution_time:
date = pendulum.now().subtract(days=1)
else:
date = pendulum.instance(context.scheduled_execution_time).subtract(days=1)
yield RunRequest(run_key="A", run_config=_solid_config(date), tags={"label": "A"})
yield RunRequest(run_key="B", run_config=_solid_config(date), tags={"label": "B"})
return ScheduleDefinition(
name="multi_run_schedule",
cron_schedule="0 0 * * *",
pipeline_name="the_pipeline",
execution_timezone="UTC",
execution_fn=gen_runs,
)
Nathan Saccon
09/13/2021, 1:55 PMprha
09/13/2021, 3:33 PM