https://dagster.io/ logo
#ask-community
Title
# ask-community
r

Ricardo Da Silva

03/13/2024, 3:42 PM
Hello, I am currently trying to schedule a partitioned asset to run the missing partitions this way:
Copy code
# Partitioned job
braintree_disputes_job = define_asset_job(
    "braintree_extracts_job", selection=AssetSelection.groups(JOBS['braintree_job']['group_name']),
)

# Partitioned schedules
  schedule=  build_schedule_from_partitioned_job(job=braintree_disputes_job, cron_schedule="0 10 * * 1-5"),

# definitions

defs1 = Definitions(
    assets=[
        *braintree_assets,
    ],
    resources=resources,
    schedules=schedule,
)
However, when I see the schedule, for this job it looks alwasy as 00:00 UTC. Any ideas on what that may be? I can see in the doc, that the build_schedule_from_partitioned_job does take a
cron_schedule
argument, but it also says “The schedule executes at the cadence specified by the time partitioning of the job or assets.” So I don’t know if this is correct or not.
c

Chris Roth

03/13/2024, 5:54 PM
Have you tried setting
end_offset
in the partition to something other than 0? If this doesn't work, I would try building your own scheduling method. Below is a modified copy of one that I use on a multi-partition to give the control I need.
Copy code
def create_daily_schedule_definition(job):
    """Create a daily schedule definition for the given job."""

    def daily_schedule_execution_fn(context):
        """Execution_fn that provides partition and run context."""
        object_partition = partition_def.secondary_dimension.partitions_def
        time_partition_def = partition_def.time_window_dimension.partitions_def
        most_recent_data_day = time_partition_def.get_last_partition_key(
            context.scheduled_execution_time
        )
        for obj in object_partition.get_partition_keys():
            <http://context.log.info|context.log.info>(
                f"Adding job for {obj=} ending at {most_recent_data_day}"
            )
            yield RunRequest(
                partition_key=MultiPartitionKey(
                    {"end_date": most_recent_data_day, "object": obj}
                ),
                run_key=f"{most_recent_data_day}|{obj}",
            )

    return ScheduleDefinition(
        cron_schedule="0 9 * * *",
        default_status=SCHEDULE_STATUS,
        execution_timezone="UTC",
        job=job,
        name=f"{job.name}_schedule",
        execution_fn=daily_schedule_execution_fn,
    )
r

Ricardo Da Silva

03/13/2024, 5:55 PM
thanks for this, let me try it
c

Chris Roth

03/13/2024, 5:58 PM
I did some edits to the names on the fly...
partition_def
is defined earlier in this file. We are using a factory like this as we have many similar jobs to wrap in the same schedule. If you don't need the factory pattern, you could drop the inner code out and just use it.
r

Ricardo Da Silva

03/18/2024, 11:54 AM
@Chris Roth Applied this today and it works now. Thanks for the help, now I can see the changes reflected on schedule.
c

Chris Roth

03/18/2024, 8:13 PM
Great news!