Drew Broadley
02/08/2024, 1:34 AMcron_schedule
and working hours when setting our timezone (Pacific/Auckland) as execution_timezone
. It just wouldn't work for partitioned jobs, and would fail saying the partition wasn't valid.
I ended up doing this as a work around - hope it helps someone:
from datetime import timedelta, datetime
from dagster import schedule, RunRequest
from helpers.dates import get_working_hours_cron
from pipelines.i_various_r_aggregated_o_sql_f_hourly.job_zendesk_aggregated import aggregated_assets_pipeline
@schedule(
cron_schedule=get_working_hours_cron(min_offset=30),
#execution_timezone="Pacific/Auckland",
job=aggregated_assets_pipeline,
name="aggregated_asset_schedule",)
def aggregated_asset_schedule(context):
run_config = {
"loggers": {
"console": {
"config": {
"log_level": "INFO"
}
}
}
}
return RunRequest(
run_key=(context.scheduled_execution_time).strftime('%Y-%m-%d'), # A unique key for the run based on the day
run_config=run_config,
tags={"partition": (context.scheduled_execution_time).strftime('%Y-%m-%d')},
partition_key=(context.scheduled_execution_time).strftime('%Y-%m-%d')
)
And here's the code for the definition of `get_working_hours_cron`:
import pytz
from datetime import datetime
def get_working_hours_cron(min_offset=0) -> str:
time_start = datetime.now().replace(hour=int(get_secret('WORKING_HOUR_START')), minute=int(min_offset), second=0, microsecond=0)
time_end = datetime.now().replace(hour=int(get_secret('WORKING_HOUR_END')), minute=int(min_offset), second=0, microsecond=0)
datetime_start_nzt = set_datetime_localize_to_nztz(time_start)
datetime_end_nzt = set_datetime_localize_to_nztz(time_end)
datetime_start = get_utc_from_nz_tz(datetime_start_nzt)
datetime_end = get_utc_from_nz_tz(datetime_end_nzt)
hours = ','.join(str(hour % 24) for hour in range(datetime_start.hour, datetime_end.hour + 24 if datetime_start.hour > datetime_end.hour else datetime_end.hour + 1))
return f"{datetime_start.minute} {hours} * * *"
def set_datetime_localize_to_nztz(datetime: datetime):
nz_tz = pytz.timezone('Pacific/Auckland') # Set the Auckland timezone
return nz_tz.localize(datetime) # Convert datetime
def get_utc_from_nz_tz(datetime_nztz: datetime):
# Check to see if it has Pacific/Auckland already, if not, add it
if datetime_nztz.tzinfo is None:
nz_timezone = pytz.timezone('Pacific/Auckland')
datetime_nztz = nz_timezone.localize(datetime_nztz, is_dst=None)
datetime_utc = datetime_nztz.astimezone(pytz.utc)
return datetime_utc