https://dagster.io/ logo
Title
a

Albert

10/24/2022, 2:18 PM
Hi Dagsterinos, Dagster newbie here. I’m having an issue where I’d like a monthly partitioned asset to be run on a daily schedule. I was under the impression this was possible, but its a bit unclear from the docs how I would do it. Is there a simple example in the docs for this? If I create a schedule using
build_schedule_from_partitioned_job
, it is a monthly schedule because it’s a monthly partition. When I try to do something like:
my_month_partitioned_job = define_asset_job(name="job_name", selection="my_monthly_partitioned_asset", partitions_def=monthly_partition_def)
my_schedule = ScheduleDefinition(job=my_month_partitioned_job, cron_schedule="21 16 * * *")
I get an error like:
dagster._check.CheckError: Invariant failed. Description: Tried to access partition_key for a non-partitioned run
If I leave out the job partition definition:
my_month_partitioned_job = define_asset_job(name="job_name", selection="my_monthly_partitioned_asset")
my_schedule = ScheduleDefinition(job=my_month_partitioned_job, cron_schedule="21 16 * * *")
I get the same error as above:
dagster._check.CheckError: Invariant failed. Description: Tried to access partition_key for a non-partitioned run
Part of whats going on might be that my monthly partitioned asset is accessing the context to do what it needs to do:
@asset(partitions_def=monthly_partition_def)
def my_monthly_partitioned_asset(context):

    partition_key = context.asset_partition_key_for_output()
    ... use partition key to understand what month partition it is...
    return stuff
stack trace looks like this:
Stack Trace:
  File "lib/python3.9/site-packages/dagster/_core/execution/plan/utils.py", line 47, in solid_execution_error_boundary
    yield
  File "lib/python3.9/site-packages/dagster/_utils/__init__.py", line 430, in iterate_with_context
    next_output = next(iterator)
  File "lib/python3.9/site-packages/dagster/_core/execution/plan/compute_generator.py", line 73, in _coerce_solid_compute_fn_to_iterator
    result = fn(context, **kwargs) if context_arg_provided else fn(**kwargs)
  File "assets.py", line 39, in my_monthly_partitioned_asset
    <http://context.log.info|context.log.info>(context.asset_partition_key_for_output())
  File "lib/python3.9/site-packages/dagster/_core/execution/context/compute.py", line 370, in asset_partition_key_for_output
    return self._step_execution_context.asset_partition_key_for_output(output_name)
  File "lib/python3.9/site-packages/dagster/_core/execution/context/system.py", line 811, in asset_partition_key_for_output
    start, end = self.asset_partition_key_range_for_output(output_name)
  File "lib/python3.9/site-packages/dagster/_core/execution/context/system.py", line 806, in asset_partition_key_range_for_output
    return PartitionKeyRange(self.partition_key, self.partition_key)
  File "lib/python3.9/site-packages/dagster/_core/execution/context/system.py", line 334, in partition_key
    check.invariant(
  File "lib/python3.9/site-packages/dagster/_check/__init__.py", line 1470, in invariant
    raise CheckError(f"Invariant failed. Description: {desc}")
j

jamie

10/24/2022, 3:21 PM
Hi @Albert I think the issue here is that when a plain
ScheduleDefinition
starts a run dagster doesn’t “know” what partition of the job should be run. additionally the context that is created for the job is doesn’t contain information about the partition (this is why you’re getting the error trying to access the partition keys within the op. I think you should be able to achieve what you are trying to do, but you will need to write a bit more custom code. Here’s a sketch of what it might look like
@schedule(job=my_month_partitioned_job, cron_schedule="21 16 * * *")
def my_daily_schedule(context):
    time_now = partition_keys = partitions_def.get_partition_keys(context.scheduled_execution_time)
    # code to get the partition_key (string) you want to execute based on the time_now 
    # you might need to manually construct the key (like getting the month from time_now)
    # or you might be able to do something like this
    partition_key = my_month_partitioned_job.partitions_def.get_partition_keys(time_now)[-1]

    yield my_month_partitioned_job.run_request_for_partition(
            partition_key=partition_key, run_key=partition_key,
        )
I haven’t tried running this, but something like it will probably work
a

Albert

10/24/2022, 5:41 PM
ok, cool, thanks so much for the quick reply! I’ll give it a shot.