https://dagster.io/ logo
#ask-community
Title
# ask-community
t

Timo Klockow

02/14/2023, 9:05 AM
Hey, any idea why in production the
scheduled_execution_time
translate like this:
Copy code
2023-02-14 07:15:00 +0000 - dagster - INFO - data_platform_repo - context.scheduled_execution_time=DateTime(2023, 2, 14, 7, 15, 0, tzinfo=Timezone('UTC'))
2023-02-14 07:15:00 +0000 - dagster - INFO - data_platform_repo - 
            current_partition_key='2023-02-14'
            current_partition_tag={'dagster/partition': '2023-02-14'}
            previous_partition_key='2023-02-13'
            previous_partition_tag={'dagster/partition': '2023-02-13'}
But in tests when using
with instance_for_test() as instance:
in combination with
.evaluate_tick(build_schedule_context(dagster_instance, datetime(2023, 1, 3, 2, 0, tzinfo=Timezone('UTC'))))
for the same code I get:
Copy code
2023-02-14 09:52:29 +0100 - dagster - INFO - context.scheduled_execution_time=datetime.datetime(2023, 1, 3, 2, 0, tzinfo=Timezone('UTC'))
2023-02-14 09:52:29 +0100 - dagster - INFO - 
            current_partition_key='2023-01-02'
            current_partition_tag={'dagster/partition': '2023-01-02'}
            previous_partition_key='2023-01-01'
            previous_partition_tag={'dagster/partition': '2023-01-01'}
The
current_partition_key
is not the same as the
scheduled_execution_time
like in production Code:
Copy code
logger = DagsterLogger(context)
<http://logger.info|logger.info>(f'{context.scheduled_execution_time=}')
current_partition_key = partitions_def.get_partition_key_for_timestamp(
    partitions_def.get_current_timestamp(context.scheduled_execution_time)
)
current_partition_tag: Mapping[
    str, str
] = partitions_def.get_tags_for_partition_key(current_partition_key)

previous_partition_key = partitions_def.get_partition_key_for_timestamp(
    partitions_def.get_current_timestamp(
        context.scheduled_execution_time - timedelta(days=1)
    )
)
previous_partition_tag: Mapping[
    str, str
] = partitions_def.get_tags_for_partition_key(previous_partition_key)

<http://logger.info|logger.info>(
    f'''
    {current_partition_key=}
    {current_partition_tag=}
    {previous_partition_key=}
    {previous_partition_tag=}
    '''
)
j

Jakub Zgrzebnicki

02/14/2023, 9:15 AM
What is your partition definition?
t

Timo Klockow

02/14/2023, 9:22 AM
Copy code
@daily_partitioned_config(start_date=BASE_START_DATE, minute_offset=15, hour_offset=7)
def daily_partitioning(start: datetime, end: datetime) -> dict:
    ...
Copy code
def build_schedule() -> ScheduleDefinition:
    partitions_def: TimeWindowPartitionsDefinition = daily_partitioning.partitions_def
    cron_schedule = partitions_def.cron_schedule
    time_window_partitions_def: TimeWindowPartitionsDefinition = cast(
        TimeWindowPartitionsDefinition, partitions_def
    )

    @schedule(
        cron_schedule=cron_schedule,
        job=reports_job,
        default_status=DefaultScheduleStatus.RUNNING,
        execution_timezone=time_window_partitions_def.timezone,
        name='SCHEDULE',
    )
    def schedule_def(context: ScheduleEvaluationContext) -> RunRequest:
        logger = DagsterLogger(context)
        <http://logger.info|logger.info>(f'{context.scheduled_execution_time=}')
        current_partition_key = partitions_def.get_partition_key_for_timestamp(
            partitions_def.get_current_timestamp(context.scheduled_execution_time)
        )
        current_partition_tag: Mapping[
            str, str
        ] = partitions_def.get_tags_for_partition_key(current_partition_key)

        previous_partition_key = partitions_def.get_partition_key_for_timestamp(
            partitions_def.get_current_timestamp(
                context.scheduled_execution_time - timedelta(days=1)
            )
        )
        previous_partition_tag: Mapping[
            str, str
        ] = partitions_def.get_tags_for_partition_key(previous_partition_key)

        <http://logger.info|logger.info>(
            f'''
            {current_partition_key=}
            {current_partition_tag=}
            {previous_partition_key=}
            {previous_partition_tag=}
            '''
        )
        yield RunRequest(...)
    return schedule_def
j

Jakub Zgrzebnicki

02/14/2023, 9:25 AM
Copy code
minute_offset=15, hour_offset=7
is the problem, basically, everything before 7:15 will be previous day and everything after will be a new day partition
t

Timo Klockow

02/14/2023, 9:25 AM
let me try that
You’r absolutely correct
Any other hints 🙂 ?
j

Jakub Zgrzebnicki

02/14/2023, 9:31 AM
have you tried build_schedule_from_partitioned_job? maybe that's something that can simplify your build_schedule function https://docs.dagster.io/_apidocs/schedules-sensors#dagster.build_schedule_from_partitioned_job
t

Timo Klockow

02/14/2023, 9:32 AM
Well what I didn’t show here
Copy code
...
        run_request: RunRequest = RunRequest(
            run_key=None,
            job_name=job.name,
            tags={
                'dagster/asset_partition_range_start': current_partition_key,
                'dagster/asset_partition_range_end': current_partition_key,
            },
            run_config=run_config,
        )

        runs: list[RunRecord] = get_runs(
            context.instance, tags=previous_partition_tag
        )

        if runs and runs[0].dagster_run.is_failure:
            # Raise if the latest run for the previous partition is a failure.
            raise DagsterScheduleException(runs[0].dagster_run)
        # Otherwise, either there were no runs for the previous partition
        # or it was a success.
        yield run_request
Basically a schedule that only runs when the previous run/day was successful
j

Jakub Zgrzebnicki

02/14/2023, 9:33 AM
I don't know the use case, but I will get rid of offsets from partition definition if you are not using time just the pure date
t

Timo Klockow

02/14/2023, 9:34 AM
nah it should run at 7:15 UTC 🙂
j

Jakub Zgrzebnicki

02/14/2023, 9:34 AM
time partition and schedule date are two different things
if your partitions divide time by date you don't want the offset there
t

Timo Klockow

02/14/2023, 9:36 AM
You mean hardcode the
cron_schedule='15 7 * * *
instead and get rid of the offsets?
j

Jakub Zgrzebnicki

02/14/2023, 9:38 AM
I will prefer that, but that's just my opinion. I understand it this way that your partition is from 0:00 to 23:59 so it shouldn't have an offset. Only schedule is set for different one
t

Timo Klockow

02/14/2023, 9:39 AM
Ok thanks I will consider that option 👍 Either way thanks a lot for your help. Have a good one bye slide
👍 1