Ben Kimpel
08/02/2023, 6:04 PMbuild_schedule_from_partitioned_asset_job
to run at a particular time. We don't want to shift our definition of a partition start/end time to match when a file drops.
from dagster import asset, define_asset_job, schedule, TimeWindowPartitionsDefinition
weekday_partition = TimeWindowPartitionsDefinition(
start="2022-01-01",
fmt="%Y-%m-%d",
timezone="America/New_York",
cron_schedule="0 0 * * 1-5",
end_offset=1,
)
@asset(partitions_def=weekday_partition)
def testing(context):
return [1, 2, 3]
testing_job = define_asset_job("testing_job", partitions_def=weekday_partition, selection=[testing])
@schedule(
"30 16 * * 1-5",
execution_timezone="America/New_York",
job=testing_job,
)
def testing_schedule(context):
return testing_job.run_request_for_partition(
partition_key=context.scheduled_execution_time.format("YYYY-MM-DD"),
run_key=context.scheduled_execution_time.format("YYYY-MM-DD"),
current_time=context.scheduled_execution_time,
)
chris
08/02/2023, 6:51 PM@schedule
decorator - is there a problem with that approach for you?Ben Kimpel
08/02/2023, 6:54 PMbuild_schedule_from_partitioned_asset_job
as noted.
second, we have to repeat our partition definitions if we have to do it this because the start_date is different. we have about 30 or so assets with different start dates. we could make a helper, but it still feels like we're fighting the system here for some reason.
third, we're unsure why we need to pass a partition to define_asset_job at all since the asset is already partitioned and all partitions must match for it to run anyway?Ben Kimpel
08/02/2023, 6:54 PMBen Kimpel
08/02/2023, 6:55 PMBen Kimpel
08/02/2023, 6:55 PMBen Kimpel
08/02/2023, 7:14 PMchris
08/02/2023, 10:14 PMbuild_schedule_from_partitioned_asset_job
work for you? You should, at least theoretically, be able to use the hour_of_day
and minute_of_hour
params in order to run the schedule at 4:30 every day.
2. In the case of having many different partitions definitions with different start dates, the best way is indeed to have a helper. I agree this is a bit clunky.
3. You also shouldn’t need to pass the partitions definition to define_asset_job
- it’s an optional argument. Are you hitting an error if it’s not provided?
Sorry you’re running into these rough edges