https://dagster.io/ logo
#community-showcase
Title
# community-showcase
d

Drew Broadley

02/08/2024, 1:34 AM
I'm not sure if anyone else is suffering this, but we struggled with
cron_schedule
and working hours when setting our timezone (Pacific/Auckland) as
execution_timezone
. It just wouldn't work for partitioned jobs, and would fail saying the partition wasn't valid. I ended up doing this as a work around - hope it helps someone:
Copy code
from datetime import timedelta, datetime

from dagster import schedule, RunRequest

from helpers.dates import get_working_hours_cron
from pipelines.i_various_r_aggregated_o_sql_f_hourly.job_zendesk_aggregated import aggregated_assets_pipeline


@schedule(
    cron_schedule=get_working_hours_cron(min_offset=30),
    #execution_timezone="Pacific/Auckland",
    job=aggregated_assets_pipeline,
    name="aggregated_asset_schedule",)
def aggregated_asset_schedule(context):

    run_config = {
       "loggers": {
            "console": {
                "config": {
                    "log_level": "INFO"
                }
            }
        }
    }
    return RunRequest(
        run_key=(context.scheduled_execution_time).strftime('%Y-%m-%d'),  # A unique key for the run based on the day
        run_config=run_config,
        tags={"partition": (context.scheduled_execution_time).strftime('%Y-%m-%d')},
        partition_key=(context.scheduled_execution_time).strftime('%Y-%m-%d')
    )
And here's the code for the definition of `get_working_hours_cron`:
Copy code
import pytz
from datetime import datetime

def get_working_hours_cron(min_offset=0) -> str:

    time_start = datetime.now().replace(hour=int(get_secret('WORKING_HOUR_START')), minute=int(min_offset), second=0, microsecond=0)
    time_end = datetime.now().replace(hour=int(get_secret('WORKING_HOUR_END')), minute=int(min_offset), second=0, microsecond=0)

    datetime_start_nzt = set_datetime_localize_to_nztz(time_start)
    datetime_end_nzt = set_datetime_localize_to_nztz(time_end)

    datetime_start = get_utc_from_nz_tz(datetime_start_nzt)
    datetime_end = get_utc_from_nz_tz(datetime_end_nzt)

    hours = ','.join(str(hour % 24) for hour in range(datetime_start.hour, datetime_end.hour + 24 if datetime_start.hour > datetime_end.hour else datetime_end.hour + 1))

    return f"{datetime_start.minute} {hours} * * *"

def set_datetime_localize_to_nztz(datetime: datetime):
    nz_tz = pytz.timezone('Pacific/Auckland')  # Set the Auckland timezone
    return nz_tz.localize(datetime)  # Convert datetime

def get_utc_from_nz_tz(datetime_nztz: datetime):

    # Check to see if it has Pacific/Auckland already, if not, add it
    if datetime_nztz.tzinfo is None:
        nz_timezone = pytz.timezone('Pacific/Auckland')
        datetime_nztz = nz_timezone.localize(datetime_nztz, is_dst=None)

    datetime_utc = datetime_nztz.astimezone(pytz.utc)

    return datetime_utc
🎉 1