https://dagster.io/ logo
Title
l

Lech

11/28/2022, 1:24 PM
Hello I'm trying to make my scheduler order partitions from a StaticPartitionsDefinition class on the daily basis, but I end up with just one partition scheduled. Here's my code: partition creation:
partitions_def= StaticPartitionsDefinition(key_list)
partitioned scheduler:
def get_schedule_def(partitions_def, cron_schedule, job, execution_timezone):
    @schedule(name=f'{job.name}_scheduler',
        cron_schedule=cron_schedule,
        job=job,
        default_status=DefaultScheduleStatus.STOPPED,
        execution_timezone=execution_timezone)
    def schedule_def():
        partition_keys = partitions_def.get_partition_keys()
        if len(partition_keys) == 0:
            return SkipReason("The job's PartitionsDefinition has no partitions")
        for key in partition_keys:
            yield job.run_request_for_partition(partition_key=key, run_key=key)
    return schedule_def
partitioned job creation:
job = asset.to_job(name=f"Job_{job_name}", resource_defs=resource_defs,
                    config=config, partitions_def=partitions_def, executor_def=in_process_executor,)
and job variable + get_schedule_def goes to the repository function
o

owen

11/28/2022, 5:43 PM
hi @Lech! by "just one partitioned scheduled", does that mean that each day, only one partition gets executed for each of the assets you've created a schedule for using
get_schedule_def
? Also, how is key_list being updated? One thing I noticed is that you've set
run_key=key
inside the call to
job.run_request_for_partition
, which means that once a schedule launches a run for a given partition key, it will never launch a run for that partition key again. If this is not desired behavior, then you can set run_key=None.
l

Lech

11/28/2022, 7:57 PM
The key list is taken from config and it is static (e.g. ['a','b','c']) I want to schedule all partitions daily. So I thought that, like in documentation, if we yield each partition key in the scheduler function I would get all job partitions ordered to run at the desired hour
o

owen

11/28/2022, 8:02 PM
hi yep that should work as you're describing, but I believe you'll need to
yield job.run_request_for_partition(partition_key=key, run_key=None)
(right now, you have
run_key=key
). Run keys are unique identifiers for runs, and you can't create multiple runs from the same sensor that share the same run key. That feature doesn't seem to be useful for what you're trying to accomplish, so you can just skip that logic by setting run_key to None (but keeping the partition_key)
l

Lech

11/28/2022, 8:11 PM
I've replaced
run_key=key
with
run_key=None
, but in the "Schedules" tab I don't see more jobs, or when I enter the partitioned job I see
Partition Set - None
with slight modifications based on
build_schedule_from_partitioned_job
(but it works only with
TimeWindowPartitionsDefinition
)
o

owen

11/28/2022, 8:17 PM
can you share your repository code? the code inside the schedule itself won't impact what you see in the schedules tab. the schedules just show the list of schedules you've defined, so if you have something like
@repository
def my_repo():
    return [
        get_schedule_def(partitions_def, "0 0 * * *", my_job, "some-timezone"),
        ...
    ]
then I'd only expect to see a single schedule on that tab, regardless of what partitions that schedule is going to kick off
l

Lech

11/28/2022, 8:33 PM
I've send you private message with repository code