Albert
10/24/2022, 2:18 PMbuild_schedule_from_partitioned_job
, it is a monthly schedule because it’s a monthly partition.
When I try to do something like:
my_month_partitioned_job = define_asset_job(name="job_name", selection="my_monthly_partitioned_asset", partitions_def=monthly_partition_def)
my_schedule = ScheduleDefinition(job=my_month_partitioned_job, cron_schedule="21 16 * * *")
I get an error like:
dagster._check.CheckError: Invariant failed. Description: Tried to access partition_key for a non-partitioned run
If I leave out the job partition definition:
my_month_partitioned_job = define_asset_job(name="job_name", selection="my_monthly_partitioned_asset")
my_schedule = ScheduleDefinition(job=my_month_partitioned_job, cron_schedule="21 16 * * *")
I get the same error as above:
dagster._check.CheckError: Invariant failed. Description: Tried to access partition_key for a non-partitioned run
Part of whats going on might be that my monthly partitioned asset is accessing the context to do what it needs to do:
@asset(partitions_def=monthly_partition_def)
def my_monthly_partitioned_asset(context):
partition_key = context.asset_partition_key_for_output()
... use partition key to understand what month partition it is...
return stuff
stack trace looks like this:
Stack Trace:
File "lib/python3.9/site-packages/dagster/_core/execution/plan/utils.py", line 47, in solid_execution_error_boundary
yield
File "lib/python3.9/site-packages/dagster/_utils/__init__.py", line 430, in iterate_with_context
next_output = next(iterator)
File "lib/python3.9/site-packages/dagster/_core/execution/plan/compute_generator.py", line 73, in _coerce_solid_compute_fn_to_iterator
result = fn(context, **kwargs) if context_arg_provided else fn(**kwargs)
File "assets.py", line 39, in my_monthly_partitioned_asset
<http://context.log.info|context.log.info>(context.asset_partition_key_for_output())
File "lib/python3.9/site-packages/dagster/_core/execution/context/compute.py", line 370, in asset_partition_key_for_output
return self._step_execution_context.asset_partition_key_for_output(output_name)
File "lib/python3.9/site-packages/dagster/_core/execution/context/system.py", line 811, in asset_partition_key_for_output
start, end = self.asset_partition_key_range_for_output(output_name)
File "lib/python3.9/site-packages/dagster/_core/execution/context/system.py", line 806, in asset_partition_key_range_for_output
return PartitionKeyRange(self.partition_key, self.partition_key)
File "lib/python3.9/site-packages/dagster/_core/execution/context/system.py", line 334, in partition_key
check.invariant(
File "lib/python3.9/site-packages/dagster/_check/__init__.py", line 1470, in invariant
raise CheckError(f"Invariant failed. Description: {desc}")
jamie
10/24/2022, 3:21 PMScheduleDefinition
starts a run dagster doesn’t “know” what partition of the job should be run. additionally the context that is created for the job is doesn’t contain information about the partition (this is why you’re getting the error trying to access the partition keys within the op. I think you should be able to achieve what you are trying to do, but you will need to write a bit more custom code. Here’s a sketch of what it might look like
@schedule(job=my_month_partitioned_job, cron_schedule="21 16 * * *")
def my_daily_schedule(context):
time_now = partition_keys = partitions_def.get_partition_keys(context.scheduled_execution_time)
# code to get the partition_key (string) you want to execute based on the time_now
# you might need to manually construct the key (like getting the month from time_now)
# or you might be able to do something like this
partition_key = my_month_partitioned_job.partitions_def.get_partition_keys(time_now)[-1]
yield my_month_partitioned_job.run_request_for_partition(
partition_key=partition_key, run_key=partition_key,
)
I haven’t tried running this, but something like it will probably workAlbert
10/24/2022, 5:41 PM