Nicholas Pezolano
02/22/2023, 12:59 AMmy_partition = DailyPartitionsDefinition(start_date="2023-01-12",
fmt='%Y-%m-%d',
timezone='America/New_York')
my_schedule = ScheduleDefinition(job=my_job, cron_schedule="5 8 * * 1-5", execution_timezone='America/New_York')
For some reason the partition for 2023-02-21 isn't showing up? Even thou it's past 8am, it shows the next tick as on 2023-02-22, this happens even if I use off sets in the partitionclaire
02/22/2023, 9:55 PMNicholas Pezolano
02/23/2023, 12:07 AMTimeWindowPartitionDefinition
as well with a similar result as above.
I rely on the context.partition_key
for many assets to ensure backfills, so if asset run needs to run for today on an 8am schedule, it would needs todays 2023-02-22 partition_key
claire
02/23/2023, 12:16 AMend_offset=1
on your partitions definition to get the current day to be considered as a partitionNicholas Pezolano
02/24/2023, 4:40 PMend_offset=1
to see if it ran locally today with the scheduler but I got the following error for all our jobs/ops dagster._check.CheckError: Invariant failed. Description: Tried to access partition_key for a non-partitioned run
All of the assets/jobs run fine manually.Nicholas Pezolano
02/24/2023, 5:19 PMdagster._core.errors.DagsterExecutionStepExecutionError: Error occurred while executing op "fed_asset":
File "/home/magic/anaconda3/envs/py39/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_plan.py", line 265, in dagster_event_sequence_for_step
for step_event in check.generator(step_events):
File "/home/magic/anaconda3/envs/py39/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 378, in core_dagster_event_sequence_for_step
for user_event in check.generator(
File "/home/magic/anaconda3/envs/py39/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 92, in _step_output_error_checked_user_event_sequence
for user_event in user_event_sequence:
File "/home/magic/anaconda3/envs/py39/lib/python3.9/site-packages/dagster/_core/execution/plan/compute.py", line 177, in execute_core_compute
for step_output in _yield_compute_results(step_context, inputs, compute_fn):
File "/home/magic/anaconda3/envs/py39/lib/python3.9/site-packages/dagster/_core/execution/plan/compute.py", line 145, in _yield_compute_results
for event in iterate_with_context(
File "/home/magic/anaconda3/envs/py39/lib/python3.9/site-packages/dagster/_utils/__init__.py", line 473, in iterate_with_context
return
File "/home/magic/anaconda3/envs/py39/lib/python3.9/contextlib.py", line 137, in __exit__
self.gen.throw(typ, value, traceback)
File "/home/magic/anaconda3/envs/py39/lib/python3.9/site-packages/dagster/_core/execution/plan/utils.py", line 85, in op_execution_error_boundary
raise error_cls(
The above exception was caused by the following exception:
dagster._check.CheckError: Invariant failed. Description: Tried to access partition_key for a non-partitioned run
File "/home/magic/anaconda3/envs/py39/lib/python3.9/site-packages/dagster/_core/execution/plan/utils.py", line 55, in op_execution_error_boundary
yield
File "/home/magic/anaconda3/envs/py39/lib/python3.9/site-packages/dagster/_utils/__init__.py", line 471, in iterate_with_context
next_output = next(iterator)
File "/home/magic/anaconda3/envs/py39/lib/python3.9/site-packages/dagster/_core/execution/plan/compute_generator.py", line 121, in _coerce_solid_compute_fn_to_iterator
result = invoke_compute_fn(
File "/home/magic/anaconda3/envs/py39/lib/python3.9/site-packages/dagster/_core/execution/plan/compute_generator.py", line 115, in invoke_compute_fn
return fn(context, **args_to_pass) if context_arg_provided else fn(**args_to_pass)
File "/home/magic/botat2/dagster/nmr_dagster/assets/fed_assets.py", line 17, in fed_asset
dt = context.partition_key
File "/home/magic/anaconda3/envs/py39/lib/python3.9/site-packages/dagster/_core/execution/context/compute.py", line 284, in partition_key
return self._step_execution_context.partition_key
File "/home/magic/anaconda3/envs/py39/lib/python3.9/site-packages/dagster/_core/execution/context/system.py", line 350, in partition_key
check.invariant(
File "/home/magic/anaconda3/envs/py39/lib/python3.9/site-packages/dagster/_check/__init__.py", line 1684, in invariant
raise CheckError(f"Invariant failed. Description: {desc}")
claire
02/24/2023, 5:42 PMNicholas Pezolano
02/24/2023, 5:43 PMNicholas Pezolano
02/24/2023, 5:43 PMclaire
02/24/2023, 5:44 PMNicholas Pezolano
02/24/2023, 5:48 PMclaire
02/24/2023, 9:57 PM@schedule(
job=my_job,
cron_schedule=f"* * * * *",
)
def custom_schedule(context):
# custom logic here to determine which partition to run
request = my_job.run_request_for_partition(partition_key=my_partition.get_last_partition_key())
yield request
Since it seems like you want your schedule to run, but not for every partitionclaire
02/24/2023, 9:58 PMbuild_schedule_from_partitioned_job
that would just schedule a run for each partitionFrank Ferstler
03/09/2023, 2:15 PMFrank Ferstler
03/09/2023, 2:27 PMScheduleDefinition(job=us_job, should_execute=nyse_holiday, cron_schedule="26 9 * * 1-5", execution_timezone='America/New_York')
But when dagster follows this schedule it's throwing this error
dagster._check.CheckError: Failure condition: Tried to access partition_key_range for a non-partitioned run
File "/home/magic/anaconda3/envs/py39/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_plan.py", line 265, in dagster_event_sequence_for_step
for step_event in check.generator(step_events):
File "/home/magic/anaconda3/envs/py39/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 342, in core_dagster_event_sequence_for_step
for event_or_input_value in ensure_gen(
File "/home/magic/anaconda3/envs/py39/lib/python3.9/site-packages/dagster/_core/execution/plan/inputs.py", line 166, in load_input_object
load_input_context = step_context.for_input_manager(
File "/home/magic/anaconda3/envs/py39/lib/python3.9/site-packages/dagster/_core/execution/context/system.py", line 612, in for_input_manager
self.asset_partitions_subset_for_input(name)
File "/home/magic/anaconda3/envs/py39/lib/python3.9/site-packages/dagster/_core/execution/context/system.py", line 928, in asset_partitions_subset_for_input
self.asset_partition_key_range, dynamic_partitions_store=self.instance
File "/home/magic/anaconda3/envs/py39/lib/python3.9/site-packages/dagster/_core/execution/context/system.py", line 379, in asset_partition_key_range
check.failed("Tried to access partition_key_range for a non-partitioned run")
File "/home/magic/anaconda3/envs/py39/lib/python3.9/site-packages/dagster/_check/__init__.py", line 1699, in failed
raise CheckError(f"Failure condition: {desc}")
Frank Ferstler
03/09/2023, 2:30 PMclaire
03/09/2023, 10:13 PMdaily_partitions_def = DailyPartitionsDefinition(start_date="2023-01-12", fmt='%Y-%m-%d')
@asset(partitions_def=daily_partitions_def)
def daily_asset():
return 1
my_job = define_asset_job(
"my_job", AssetSelection.assets(daily_asset), partitions_def=daily_partitions_def
)
@schedule(
job=my_job,
cron_schedule=f"0 17 * * *", # Example cron schedule that runs at 5pm UTC
)
def custom_schedule(context):
partition_key = daily_partitions_def.get_partition_key_for_timestamp(
context.scheduled_execution_time.timestamp()
)
request = my_job.run_request_for_partition(partition_key=partition_key)
yield request
basically you'd have to set up a custom schedule that executes at the desired time, and yield a run request for the job at that timeclaire
03/10/2023, 12:01 AMdaily_partitions_def = DailyPartitionsDefinition(start_date="2023-01-12", fmt='%Y-%m-%d')
my_job = define_asset_job(
"my_job", AssetSelection.assets(daily_asset), partitions_def=daily_partitions_def
)
custom_schedule = build_schedule_from_partitioned_job(my_job, hour_of_day=2)
on build_schedule_from_partitioned_job
you can override fields like the hour of execution to describe when the schedule should be executed