Ivan Trajkovic
05/09/2023, 8:15 PMdagster._check.CheckError: Invariant failed. Description: Can't supply a PartitionedConfig for 'config' with a different PartitionsDefinition than supplied for 'partitions_def'.
this is how the asset is initialized:
partition=MultiPartitionsDefinition(
{
# "date": DailyPartitionsDefinition(start_date="2022-01-01"),
"name": StaticPartitionsDefinition(names_lst),
# partition each flow into N partitions
"i": StaticPartitionsDefinition([str(i) for i in range(0, 10)])
})
@asset(
partitions_def=partition
)
def tester(context):
print(">>>>>>>>>")
<http://context.log.info|context.log.info>(context.partition_key.keys_by_dimension)
any ideas on why this strange behavior with this error message coming up periodically?
the times when it works it works for some period of time and works well.
any guidance is appreciated. :)sean
05/09/2023, 9:33 PMIvan Trajkovic
05/09/2023, 10:44 PMdagster dev
it always fails.
do you think it would help me bypass this bug for the immediate future by constructing PartitionedConfig explicitly, not just the definition? since that mapping between definitions and configs seems to be whats becoming inconsistent.
also, for now materializing asset partitions only via the dagit UI for now.sean
05/10/2023, 12:16 PMdo you think it would help me bypass this bug for the immediate future by constructing PartitionedConfig explicitly, not just the definition? since that mapping between definitions and configs seems to be whats becoming inconsistent.I’m not sure yet-- are you able to provide a self-contained example (like just a single partitioned asset in a
Definitions
object) and tell me which of the behaviors you listed above it is manifesting?Ivan Trajkovic
05/10/2023, 12:57 PMnames_lst = ["name1", "name2", "name3"]
partition_tester = MultiPartitionsDefinition(
{
"name": StaticPartitionsDefinition(names_lst),
"partition_i": StaticPartitionsDefinition([str(i) for i in range(0, 10)])
})
@asset(
description='''some description''',
required_resource_keys={"aws_secrets_manager", "db"},
io_manager_key="some_io_manager",
partitions_def=partition_tester,
)
def tester(context):
print("works...")
and loaded in `__init__.py`:
import tester_assets
defs = Definitions(
assets=load_assets_from_modules([
tester_assets,
]),
resources=resources_map[deployment_name]
)
when materializing partitions of this asset, Im usually getting the first few fails of a task run, with the following error:
dagster._check.CheckError: Invariant failed. Description: Can't supply a PartitionedConfig for 'config' with a different PartitionsDefinition than supplied for 'partitions_def'.
File "/usr/local/lib/python3.10/dist-packages/dagster/_grpc/impl.py", line 137, in core_execute_run
yield from execute_run_iterator(
File "/usr/local/lib/python3.10/dist-packages/dagster/_core/execution/api.py", line 880, in __iter__
yield from self.execution_context_manager.prepare_context()
File "/usr/local/lib/python3.10/dist-packages/dagster/_utils/__init__.py", line 491, in generate_setup_events
obj = next(self.generator)
File "/usr/local/lib/python3.10/dist-packages/dagster/_core/execution/context_creation_job.py", line 293, in orchestration_context_event_generator
context_creation_data = create_context_creation_data(
File "/usr/local/lib/python3.10/dist-packages/dagster/_core/execution/context_creation_job.py", line 105, in create_context_creation_data
job_def = job.get_definition()
File "/usr/local/lib/python3.10/dist-packages/dagster/_core/definitions/reconstruct.py", line 243, in get_definition
return self.repository.get_definition().get_maybe_subset_job_def(
File "/usr/local/lib/python3.10/dist-packages/dagster/_core/definitions/repository_definition/repository_definition.py", line 278, in get_maybe_subset_job_def
return defn.get_job_def_for_subset_selection(op_selection, asset_selection)
File "/usr/local/lib/python3.10/dist-packages/dagster/_core/definitions/job_definition.py", line 729, in get_job_def_for_subset_selection
return self._get_job_def_for_asset_selection(asset_selection)
File "/usr/local/lib/python3.10/dist-packages/dagster/_core/definitions/job_definition.py", line 765, in _get_job_def_for_asset_selection
new_job = build_asset_selection_job(
File "/usr/local/lib/python3.10/dist-packages/dagster/_core/definitions/asset_layer.py", line 786, in build_asset_selection_job
asset_job = build_source_asset_observation_job(
File "/usr/local/lib/python3.10/dist-packages/dagster/_core/definitions/assets_job.py", line 312, in build_source_asset_observation_job
return graph.to_job(
File "/usr/local/lib/python3.10/dist-packages/dagster/_core/definitions/graph_definition.py", line 631, in to_job
return JobDefinition.dagster_internal_init(
File "/usr/local/lib/python3.10/dist-packages/dagster/_core/definitions/job_definition.py", line 285, in dagster_internal_init
return JobDefinition(
File "/usr/local/lib/python3.10/dist-packages/dagster/_core/definitions/job_definition.py", line 224, in __init__
self._partitioned_config = PartitionedConfig.from_flexible_config(
File "/usr/local/lib/python3.10/dist-packages/dagster/_core/definitions/partition.py", line 670, in from_flexible_config
check.invariant(
File "/usr/local/lib/python3.10/dist-packages/dagster/_check/__init__.py", line 1654, in invariant
raise CheckError(f"Invariant failed. Description: {desc}")
eventually the task usually succeeds, sometimes on the 2nd run, sometimes on the 5th, but sometimes it fails on every retry and eventually is marked as permanently failed.
I do have a few other partitioned assets in other modules in other directories, each with its own partition definition. some of them have the same behavior and error message, one of them is always succeeding.sean
05/10/2023, 1:19 PMresource_map
so I can’t run this. The tester
asset you show doesn’t actually use these resources-- does the problem still manifest if you drop the resources from Definitions
and required_resource_keys
from the tester
?Harry James
06/07/2023, 12:39 PM