hello everyone. doing some experimentation with Da...
# ask-community
i
hello everyone. doing some experimentation with Dagster partitioned assets, and getting very strange behavior on the local dagster instance. sometimes assets manage to materialize individual partitions, other times it keeps failing with the following error on several partitioned assets:
Copy code
dagster._check.CheckError: Invariant failed. Description: Can't supply a PartitionedConfig for 'config' with a different PartitionsDefinition than supplied for 'partitions_def'.
this is how the asset is initialized:
Copy code
partition=MultiPartitionsDefinition(
        {
            # "date": DailyPartitionsDefinition(start_date="2022-01-01"),
            "name": StaticPartitionsDefinition(names_lst),
            
            # partition each flow into N partitions
            "i": StaticPartitionsDefinition([str(i) for i in range(0, 10)])
        })

@asset(
    partitions_def=partition
)
def tester(context):

    print(">>>>>>>>>")
    <http://context.log.info|context.log.info>(context.partition_key.keys_by_dimension)
any ideas on why this strange behavior with this error message coming up periodically? the times when it works it works for some period of time and works well. any guidance is appreciated. :)
s
Hi Ivan, this sounds like a bug, but it would be helpful to get to something reproducible. Are you encountering this error with just this asset in your definitions? How are you materializing the assets? Through dagit?
🙏 1
i
hello @sean. thank you very much for getting back to me 🙂 encountering it with a few test assets. one of them always succeeds, the second one succeeds usually on the 1-2 automatic retry, and third one almost always fails. locally with
dagster dev
it always fails. do you think it would help me bypass this bug for the immediate future by constructing PartitionedConfig explicitly, not just the definition? since that mapping between definitions and configs seems to be whats becoming inconsistent. also, for now materializing asset partitions only via the dagit UI for now.
s
do you think it would help me bypass this bug for the immediate future by constructing PartitionedConfig explicitly, not just the definition? since that mapping between definitions and configs seems to be whats becoming inconsistent.
I’m not sure yet-- are you able to provide a self-contained example (like just a single partitioned asset in a
Definitions
object) and tell me which of the behaviors you listed above it is manifesting?
i
asset is defined as in `tester_assets.py`:
Copy code
names_lst = ["name1", "name2", "name3"]

partition_tester = MultiPartitionsDefinition(
    {
        "name": StaticPartitionsDefinition(names_lst),
        "partition_i": StaticPartitionsDefinition([str(i) for i in range(0, 10)])
    })

@asset(
    description='''some description''',
    required_resource_keys={"aws_secrets_manager", "db"},
    io_manager_key="some_io_manager",
    partitions_def=partition_tester,
)
def tester(context):
    print("works...")
and loaded in `__init__.py`:
Copy code
import tester_assets

defs = Definitions(
	assets=load_assets_from_modules([
		tester_assets,
	]),
	resources=resources_map[deployment_name]
)
when materializing partitions of this asset, Im usually getting the first few fails of a task run, with the following error:
Copy code
dagster._check.CheckError: Invariant failed. Description: Can't supply a PartitionedConfig for 'config' with a different PartitionsDefinition than supplied for 'partitions_def'.
  File "/usr/local/lib/python3.10/dist-packages/dagster/_grpc/impl.py", line 137, in core_execute_run
    yield from execute_run_iterator(
  File "/usr/local/lib/python3.10/dist-packages/dagster/_core/execution/api.py", line 880, in __iter__
    yield from self.execution_context_manager.prepare_context()
  File "/usr/local/lib/python3.10/dist-packages/dagster/_utils/__init__.py", line 491, in generate_setup_events
    obj = next(self.generator)
  File "/usr/local/lib/python3.10/dist-packages/dagster/_core/execution/context_creation_job.py", line 293, in orchestration_context_event_generator
    context_creation_data = create_context_creation_data(
  File "/usr/local/lib/python3.10/dist-packages/dagster/_core/execution/context_creation_job.py", line 105, in create_context_creation_data
    job_def = job.get_definition()
  File "/usr/local/lib/python3.10/dist-packages/dagster/_core/definitions/reconstruct.py", line 243, in get_definition
    return self.repository.get_definition().get_maybe_subset_job_def(
  File "/usr/local/lib/python3.10/dist-packages/dagster/_core/definitions/repository_definition/repository_definition.py", line 278, in get_maybe_subset_job_def
    return defn.get_job_def_for_subset_selection(op_selection, asset_selection)
  File "/usr/local/lib/python3.10/dist-packages/dagster/_core/definitions/job_definition.py", line 729, in get_job_def_for_subset_selection
    return self._get_job_def_for_asset_selection(asset_selection)
  File "/usr/local/lib/python3.10/dist-packages/dagster/_core/definitions/job_definition.py", line 765, in _get_job_def_for_asset_selection
    new_job = build_asset_selection_job(
  File "/usr/local/lib/python3.10/dist-packages/dagster/_core/definitions/asset_layer.py", line 786, in build_asset_selection_job
    asset_job = build_source_asset_observation_job(
  File "/usr/local/lib/python3.10/dist-packages/dagster/_core/definitions/assets_job.py", line 312, in build_source_asset_observation_job
    return graph.to_job(
  File "/usr/local/lib/python3.10/dist-packages/dagster/_core/definitions/graph_definition.py", line 631, in to_job
    return JobDefinition.dagster_internal_init(
  File "/usr/local/lib/python3.10/dist-packages/dagster/_core/definitions/job_definition.py", line 285, in dagster_internal_init
    return JobDefinition(
  File "/usr/local/lib/python3.10/dist-packages/dagster/_core/definitions/job_definition.py", line 224, in __init__
    self._partitioned_config = PartitionedConfig.from_flexible_config(
  File "/usr/local/lib/python3.10/dist-packages/dagster/_core/definitions/partition.py", line 670, in from_flexible_config
    check.invariant(
  File "/usr/local/lib/python3.10/dist-packages/dagster/_check/__init__.py", line 1654, in invariant
    raise CheckError(f"Invariant failed. Description: {desc}")
eventually the task usually succeeds, sometimes on the 2nd run, sometimes on the 5th, but sometimes it fails on every retry and eventually is marked as permanently failed. I do have a few other partitioned assets in other modules in other directories, each with its own partition definition. some of them have the same behavior and error message, one of them is always succeeding.
s
Thanks for providing more detail-- the non-determinism here is really baffling. The example still isn’t self-contained though, because I don’ have access to
resource_map
so I can’t run this. The
tester
asset you show doesn’t actually use these resources-- does the problem still manifest if you drop the resources from
Definitions
and
required_resource_keys
from the
tester
?
h
Sorry to contribute with further non-reproducible evidence. I am blocked by this same error when working with static partitions. I wasn't able to put together something reproducible (I will try again when I find the time), battled with non-determinism.