Is there a bug with running partitioned job via sc...
# ask-community
c
Is there a bug with running partitioned job via schedules? I have a schedule defined as so:
Copy code
ScheduleDefinition(
            job=define_asset_job(
                name="partitioned_ingress_job",
                selection=AssetSelection.groups(
                    assets.source_system.ingress_incremental.GROUP_NAME
                ),
                partitions_def=assets.source_system.ingress_incremental.PARTITIONS_DEF,
            ),
            cron_schedule="0 1 * * *",
            name="daily_partitioned_ingress",
        ),
However when it runs all assets throws a
Copy code
dagster._check.CheckError: Failure condition: Tried to access partition_key_range for a non-partitioned run
🙌 1
j
Hi @Casper Weiss Bang the API for defining schedules for partitioned assets is slightly different - can you try following the example here and let me know if you run into the same issue? https://docs.dagster.io/concepts/partitions-schedules-sensors/schedules#schedules-from-partitioned-assets-and-jobs (it should be the second code block after the anchor link)
c
Problem is that that gives me a schedule that will run monthly if the partition is monthly. I want the current month to run ever morning, not once a month. It's a bit of a Special case i know. I wanted to do a daily partition however as we have very scattered data 5+ years back, it's a bit of a pain to have 1000+ partitions, and difficult to do a complete backfill with the current dagster codebase. So rather than having a lot of scattered partitions i just have the big ones and reload the current one. Also is cheaper in storage and requests rather han having parquet files with 1-2 rows in
j
ah i see @claire is there a way to do this?
c
Does freshness policies fully support partitions? Maybe i can do something there
I never got an answer here (which is fine - i'm hoping i eventually will) - but do anyone know of a trick to more quickly backfill a daily partition that goes years back? Do i simply need to run 1000~ jobs?
🤖 1
also is there any relation with freshness and partitions? how does that work?
🤖 1
Invariant violation for parameter freshness_policies_by_key. Description: FreshnessPolicies are currently unsupported for partitioned assets.
Gotcha
I think the current best solution is to create a normal schedule as for jobs and then trigger the job with the correct partition key 🔐
c
Hi Casper, apologies about the late response. In response to running the monthly partition daily, I think you're right in that the easiest solution is to create a daily schedule that executes the job for the monthly partition key. Unfortunately, freshness policies currently do not support partitions, but this is something we're working on right now and should be out in the next month or so.
🎉 1
In response to backfilling many partitions, one thing you could try is kicking off a single run that backfills a range of partitions: https://github.com/dagster-io/dagster/discussions/11653 But the above is contingent on having an IO manager that can execute on partition ranges. If that doesn't work, the easiest solution is probably just to create a backfill.
👍 1
thisisfineparrot 1
c
Uh that might instead solve it and enable us to simply use a daily partition. That'd be better too 😮 Thank you, Claire!
uh i didn't see this button before!
a
@claire can you clarify the proper way to schedule a partitioned asset at a different cadence than the partition itself? Similar to Casper's scenario, I have assets with daily partitions that I'm trying to schedule to update hourly. However, I can't seem to find anything on the best way to do this. Disregard - I believe I found the way to do this using
run_request_for_partition
.
🌈 1
c
@AJ Floersch i created a factory function for this exact purpose
Copy code
def create_daily_schedule_run_for_monthly_partition(
    job: JobDefinition,
    name: str,
    default_status: DefaultScheduleStatus | None = None,
    at_hour: int = 3,
    at_minute: int = 0,
):
    """Creates a schedule that runs daily and triggers a job that has monthly partitioning"""

    @schedule(
        job=job,
        cron_schedule=f"{at_minute} {at_hour} * * *",
        name=name,
        default_status=default_status or DefaultScheduleStatus.STOPPED,
    )
    def daily_schedule(context: ScheduleEvaluationContext):
        current_date = context.scheduled_execution_time.strftime("%Y-%m-%d")

        start_of_month = (
            context.scheduled_execution_time + relativedelta(day=1)
        ).strftime("%Y-%m-%d")

        request = job.run_request_for_partition(
            partition_key=start_of_month, run_key=current_date
        )
        yield request

    return daily_schedule