Timo Klockow
02/14/2023, 9:05 AMscheduled_execution_time
translate like this:
2023-02-14 07:15:00 +0000 - dagster - INFO - data_platform_repo - context.scheduled_execution_time=DateTime(2023, 2, 14, 7, 15, 0, tzinfo=Timezone('UTC'))
2023-02-14 07:15:00 +0000 - dagster - INFO - data_platform_repo -
current_partition_key='2023-02-14'
current_partition_tag={'dagster/partition': '2023-02-14'}
previous_partition_key='2023-02-13'
previous_partition_tag={'dagster/partition': '2023-02-13'}
But in tests when using with instance_for_test() as instance:
in combination with .evaluate_tick(build_schedule_context(dagster_instance, datetime(2023, 1, 3, 2, 0, tzinfo=Timezone('UTC'))))
for the same code I get:
2023-02-14 09:52:29 +0100 - dagster - INFO - context.scheduled_execution_time=datetime.datetime(2023, 1, 3, 2, 0, tzinfo=Timezone('UTC'))
2023-02-14 09:52:29 +0100 - dagster - INFO -
current_partition_key='2023-01-02'
current_partition_tag={'dagster/partition': '2023-01-02'}
previous_partition_key='2023-01-01'
previous_partition_tag={'dagster/partition': '2023-01-01'}
The current_partition_key
is not the same as the scheduled_execution_time
like in production
Code:
logger = DagsterLogger(context)
<http://logger.info|logger.info>(f'{context.scheduled_execution_time=}')
current_partition_key = partitions_def.get_partition_key_for_timestamp(
partitions_def.get_current_timestamp(context.scheduled_execution_time)
)
current_partition_tag: Mapping[
str, str
] = partitions_def.get_tags_for_partition_key(current_partition_key)
previous_partition_key = partitions_def.get_partition_key_for_timestamp(
partitions_def.get_current_timestamp(
context.scheduled_execution_time - timedelta(days=1)
)
)
previous_partition_tag: Mapping[
str, str
] = partitions_def.get_tags_for_partition_key(previous_partition_key)
<http://logger.info|logger.info>(
f'''
{current_partition_key=}
{current_partition_tag=}
{previous_partition_key=}
{previous_partition_tag=}
'''
)
Jakub Zgrzebnicki
02/14/2023, 9:15 AMTimo Klockow
02/14/2023, 9:22 AM@daily_partitioned_config(start_date=BASE_START_DATE, minute_offset=15, hour_offset=7)
def daily_partitioning(start: datetime, end: datetime) -> dict:
...
def build_schedule() -> ScheduleDefinition:
partitions_def: TimeWindowPartitionsDefinition = daily_partitioning.partitions_def
cron_schedule = partitions_def.cron_schedule
time_window_partitions_def: TimeWindowPartitionsDefinition = cast(
TimeWindowPartitionsDefinition, partitions_def
)
@schedule(
cron_schedule=cron_schedule,
job=reports_job,
default_status=DefaultScheduleStatus.RUNNING,
execution_timezone=time_window_partitions_def.timezone,
name='SCHEDULE',
)
def schedule_def(context: ScheduleEvaluationContext) -> RunRequest:
logger = DagsterLogger(context)
<http://logger.info|logger.info>(f'{context.scheduled_execution_time=}')
current_partition_key = partitions_def.get_partition_key_for_timestamp(
partitions_def.get_current_timestamp(context.scheduled_execution_time)
)
current_partition_tag: Mapping[
str, str
] = partitions_def.get_tags_for_partition_key(current_partition_key)
previous_partition_key = partitions_def.get_partition_key_for_timestamp(
partitions_def.get_current_timestamp(
context.scheduled_execution_time - timedelta(days=1)
)
)
previous_partition_tag: Mapping[
str, str
] = partitions_def.get_tags_for_partition_key(previous_partition_key)
<http://logger.info|logger.info>(
f'''
{current_partition_key=}
{current_partition_tag=}
{previous_partition_key=}
{previous_partition_tag=}
'''
)
yield RunRequest(...)
return schedule_def
Jakub Zgrzebnicki
02/14/2023, 9:25 AMminute_offset=15, hour_offset=7
is the problem, basically, everything before 7:15 will be previous day and everything after will be a new day partitionTimo Klockow
02/14/2023, 9:25 AMJakub Zgrzebnicki
02/14/2023, 9:31 AMTimo Klockow
02/14/2023, 9:32 AM...
run_request: RunRequest = RunRequest(
run_key=None,
job_name=job.name,
tags={
'dagster/asset_partition_range_start': current_partition_key,
'dagster/asset_partition_range_end': current_partition_key,
},
run_config=run_config,
)
runs: list[RunRecord] = get_runs(
context.instance, tags=previous_partition_tag
)
if runs and runs[0].dagster_run.is_failure:
# Raise if the latest run for the previous partition is a failure.
raise DagsterScheduleException(runs[0].dagster_run)
# Otherwise, either there were no runs for the previous partition
# or it was a success.
yield run_request
Jakub Zgrzebnicki
02/14/2023, 9:33 AMTimo Klockow
02/14/2023, 9:34 AMJakub Zgrzebnicki
02/14/2023, 9:34 AMTimo Klockow
02/14/2023, 9:36 AMcron_schedule='15 7 * * *
instead and get rid of the offsets?Jakub Zgrzebnicki
02/14/2023, 9:38 AMTimo Klockow
02/14/2023, 9:39 AM