https://dagster.io/ logo
#ask-community
Title
# ask-community
n

Nicholas Pezolano

02/22/2023, 12:59 AM
I have a partitioned asset with the following partition running on the following schedule:
Copy code
my_partition = DailyPartitionsDefinition(start_date="2023-01-12",  
                                      fmt='%Y-%m-%d',
                                      timezone='America/New_York')

my_schedule = ScheduleDefinition(job=my_job, cron_schedule="5 8 * * 1-5", execution_timezone='America/New_York')
For some reason the partition for 2023-02-21 isn't showing up? Even thou it's past 8am, it shows the next tick as on 2023-02-22, this happens even if I use off sets in the partition
c

claire

02/22/2023, 9:55 PM
Hi Nicholas. In Dagster, the time window for a given partition won't show up until the full time window has passed. In this case, if the current day is 2023-02-21, it won't show up in the daily partitions definition since the current day is not yet complete.
D 1
n

Nicholas Pezolano

02/23/2023, 12:07 AM
Is there a way to get the time window of the partition to start on the first second of that day? I tried using a
TimeWindowPartitionDefinition
as well with a similar result as above. I rely on the
context.partition_key
for many assets to ensure backfills, so if asset run needs to run for today on an 8am schedule, it would needs todays 2023-02-22
partition_key
c

claire

02/23/2023, 12:16 AM
I'd recommend setting
end_offset=1
on your partitions definition to get the current day to be considered as a partition
D 1
n

Nicholas Pezolano

02/24/2023, 4:40 PM
Hi @claire I tried setting
end_offset=1
to see if it ran locally today with the scheduler but I got the following error for all our jobs/ops
dagster._check.CheckError: Invariant failed. Description: Tried to access partition_key for a non-partitioned run
All of the assets/jobs run fine manually.
This is the full stack trace:
Copy code
dagster._core.errors.DagsterExecutionStepExecutionError: Error occurred while executing op "fed_asset":
  File "/home/magic/anaconda3/envs/py39/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_plan.py", line 265, in dagster_event_sequence_for_step
    for step_event in check.generator(step_events):
  File "/home/magic/anaconda3/envs/py39/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 378, in core_dagster_event_sequence_for_step
    for user_event in check.generator(
  File "/home/magic/anaconda3/envs/py39/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 92, in _step_output_error_checked_user_event_sequence
    for user_event in user_event_sequence:
  File "/home/magic/anaconda3/envs/py39/lib/python3.9/site-packages/dagster/_core/execution/plan/compute.py", line 177, in execute_core_compute
    for step_output in _yield_compute_results(step_context, inputs, compute_fn):
  File "/home/magic/anaconda3/envs/py39/lib/python3.9/site-packages/dagster/_core/execution/plan/compute.py", line 145, in _yield_compute_results
    for event in iterate_with_context(
  File "/home/magic/anaconda3/envs/py39/lib/python3.9/site-packages/dagster/_utils/__init__.py", line 473, in iterate_with_context
    return
  File "/home/magic/anaconda3/envs/py39/lib/python3.9/contextlib.py", line 137, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/home/magic/anaconda3/envs/py39/lib/python3.9/site-packages/dagster/_core/execution/plan/utils.py", line 85, in op_execution_error_boundary
    raise error_cls(
The above exception was caused by the following exception:
dagster._check.CheckError: Invariant failed. Description: Tried to access partition_key for a non-partitioned run
  File "/home/magic/anaconda3/envs/py39/lib/python3.9/site-packages/dagster/_core/execution/plan/utils.py", line 55, in op_execution_error_boundary
    yield
  File "/home/magic/anaconda3/envs/py39/lib/python3.9/site-packages/dagster/_utils/__init__.py", line 471, in iterate_with_context
    next_output = next(iterator)
  File "/home/magic/anaconda3/envs/py39/lib/python3.9/site-packages/dagster/_core/execution/plan/compute_generator.py", line 121, in _coerce_solid_compute_fn_to_iterator
    result = invoke_compute_fn(
  File "/home/magic/anaconda3/envs/py39/lib/python3.9/site-packages/dagster/_core/execution/plan/compute_generator.py", line 115, in invoke_compute_fn
    return fn(context, **args_to_pass) if context_arg_provided else fn(**args_to_pass)
  File "/home/magic/botat2/dagster/nmr_dagster/assets/fed_assets.py", line 17, in fed_asset
    dt = context.partition_key
  File "/home/magic/anaconda3/envs/py39/lib/python3.9/site-packages/dagster/_core/execution/context/compute.py", line 284, in partition_key
    return self._step_execution_context.partition_key
  File "/home/magic/anaconda3/envs/py39/lib/python3.9/site-packages/dagster/_core/execution/context/system.py", line 350, in partition_key
    check.invariant(
  File "/home/magic/anaconda3/envs/py39/lib/python3.9/site-packages/dagster/_check/__init__.py", line 1684, in invariant
    raise CheckError(f"Invariant failed. Description: {desc}")
c

claire

02/24/2023, 5:42 PM
Did you hit this error by executing on Dagit and clicking "run as a partition range in a single run"?
n

Nicholas Pezolano

02/24/2023, 5:43 PM
No this is when I had the scheduler enabled and the scheduler materialized the asset
if i manually run on dagit it runs fine
c

claire

02/24/2023, 5:44 PM
What version of dagster are you running?
n

Nicholas Pezolano

02/24/2023, 5:48 PM
1.1.19
c

claire

02/24/2023, 9:57 PM
Ah I see the issue. So it seems like your schedule definition is returning a run request that isn't partitioned. I think you actually want to do something like this instead:
Copy code
@schedule(
    job=my_job,
    cron_schedule=f"* * * * *",
)
def custom_schedule(context):
    # custom logic here to determine which partition to run
    request = my_job.run_request_for_partition(partition_key=my_partition.get_last_partition_key())
    yield request
Since it seems like you want your schedule to run, but not for every partition
If you wanted to run the schedule for each partition, we have a built in function
build_schedule_from_partitioned_job
that would just schedule a run for each partition
f

Frank Ferstler

03/09/2023, 2:15 PM
Hi @claire, I am running into issues trying to implement what you provided. For more clarity I am trying to get a job to run for the current partition (current day) at a specified time every day (do not want it to run for a window only to run for current day). What is the best way to accomplish this?
Originally we have the schedule set as this
Copy code
ScheduleDefinition(job=us_job, should_execute=nyse_holiday, cron_schedule="26 9 * * 1-5", execution_timezone='America/New_York')
But when dagster follows this schedule it's throwing this error
Copy code
dagster._check.CheckError: Failure condition: Tried to access partition_key_range for a non-partitioned run
  File "/home/magic/anaconda3/envs/py39/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_plan.py", line 265, in dagster_event_sequence_for_step
    for step_event in check.generator(step_events):
  File "/home/magic/anaconda3/envs/py39/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 342, in core_dagster_event_sequence_for_step
    for event_or_input_value in ensure_gen(
  File "/home/magic/anaconda3/envs/py39/lib/python3.9/site-packages/dagster/_core/execution/plan/inputs.py", line 166, in load_input_object
    load_input_context = step_context.for_input_manager(
  File "/home/magic/anaconda3/envs/py39/lib/python3.9/site-packages/dagster/_core/execution/context/system.py", line 612, in for_input_manager
    self.asset_partitions_subset_for_input(name)
  File "/home/magic/anaconda3/envs/py39/lib/python3.9/site-packages/dagster/_core/execution/context/system.py", line 928, in asset_partitions_subset_for_input
    self.asset_partition_key_range, dynamic_partitions_store=self.instance
  File "/home/magic/anaconda3/envs/py39/lib/python3.9/site-packages/dagster/_core/execution/context/system.py", line 379, in asset_partition_key_range
    check.failed("Tried to access partition_key_range for a non-partitioned run")
  File "/home/magic/anaconda3/envs/py39/lib/python3.9/site-packages/dagster/_check/__init__.py", line 1699, in failed
    raise CheckError(f"Failure condition: {desc}")
In the UI when I select the job and select materialize and select the latest partition (current day shows for us because we set end_offset=1) it successfully runs with no issues.
c

claire

03/09/2023, 10:13 PM
Hi Frank. Here's an example of how you'd be able to do this:
Copy code
daily_partitions_def = DailyPartitionsDefinition(start_date="2023-01-12", fmt='%Y-%m-%d')


@asset(partitions_def=daily_partitions_def)
def daily_asset():
    return 1


my_job = define_asset_job(
    "my_job", AssetSelection.assets(daily_asset), partitions_def=daily_partitions_def
)


@schedule(
    job=my_job,
    cron_schedule=f"0 17 * * *",  # Example cron schedule that runs at 5pm UTC
)
def custom_schedule(context):
    partition_key = daily_partitions_def.get_partition_key_for_timestamp(
        context.scheduled_execution_time.timestamp()
    )
    request = my_job.run_request_for_partition(partition_key=partition_key)
    yield request
basically you'd have to set up a custom schedule that executes at the desired time, and yield a run request for the job at that time
D 1
Ah actually, here's a more straightforward way of doing the above:
Copy code
daily_partitions_def = DailyPartitionsDefinition(start_date="2023-01-12", fmt='%Y-%m-%d')

my_job = define_asset_job(
    "my_job", AssetSelection.assets(daily_asset), partitions_def=daily_partitions_def
)

custom_schedule = build_schedule_from_partitioned_job(my_job, hour_of_day=2)
on
build_schedule_from_partitioned_job
you can override fields like the hour of execution to describe when the schedule should be executed
D 1