Casper Weiss Bang
02/03/2023, 5:21 AMScheduleDefinition(
job=define_asset_job(
name="partitioned_ingress_job",
selection=AssetSelection.groups(
assets.source_system.ingress_incremental.GROUP_NAME
),
partitions_def=assets.source_system.ingress_incremental.PARTITIONS_DEF,
),
cron_schedule="0 1 * * *",
name="daily_partitioned_ingress",
),
However when it runs all assets throws a
dagster._check.CheckError: Failure condition: Tried to access partition_key_range for a non-partitioned run
jamie
02/03/2023, 3:30 PMCasper Weiss Bang
02/03/2023, 3:40 PMjamie
02/03/2023, 4:09 PMCasper Weiss Bang
02/03/2023, 4:27 PMInvariant violation for parameter freshness_policies_by_key. Description: FreshnessPolicies are currently unsupported for partitioned assets.
Gotchaclaire
02/06/2023, 7:25 PMCasper Weiss Bang
02/07/2023, 5:38 AMAJ Floersch
02/20/2023, 4:30 PMrun_request_for_partition
.Casper Weiss Bang
02/21/2023, 6:46 AMdef create_daily_schedule_run_for_monthly_partition(
job: JobDefinition,
name: str,
default_status: DefaultScheduleStatus | None = None,
at_hour: int = 3,
at_minute: int = 0,
):
"""Creates a schedule that runs daily and triggers a job that has monthly partitioning"""
@schedule(
job=job,
cron_schedule=f"{at_minute} {at_hour} * * *",
name=name,
default_status=default_status or DefaultScheduleStatus.STOPPED,
)
def daily_schedule(context: ScheduleEvaluationContext):
current_date = context.scheduled_execution_time.strftime("%Y-%m-%d")
start_of_month = (
context.scheduled_execution_time + relativedelta(day=1)
).strftime("%Y-%m-%d")
request = job.run_request_for_partition(
partition_key=start_of_month, run_key=current_date
)
yield request
return daily_schedule