Hello, I have a Multipartition with a Static Parti...
# ask-community
y
Hello, I have a Multipartition with a Static Partition and a Dynamic Partition
Copy code
year_partitions_def = StaticPartitionsDefinition(YEARS)
sfdr_metric_partitions_def = DynamicPartitionsDefinition(
    name="sfdr_metric")

sfdr_metric_year_partitions_def=MultiPartitionsDefinition(
        {
            "year": year_partitions_def,
            "sfdr_metric": sfdr_metric_partitions_def
        }
    )
The upstream asset uses the MultiPartition and a downstream asset uses the
year_partitions_def
I get an error when I have the 2 assets in a job, though.
Copy code
sfdr_alignment_job = define_asset_job(
    "sfdr_alignment_job",
    selection=asset_names,
    executor_def=executor
)
error message: Multiple partitioned assets exist in assets job 'sfdr_alignment_job'. Selected assets must have the same partitions definitions, but the selected assets have different partitions definitions:
Seems like I need to make a partition-mapping but I'm not sure how to do that. Are there more docs or an example? Thanks!
I tried with a single Dynamic Partition and the downstream asset having no partition. I use the
gcs_pickle_io_manager
It doesn't seem to support merging all the partitions of the upstream asset. I deploy with k8s so I don't think I can use the defualt io manager. How can I modify this to work or is there another io_manager I should use? Thanks!
Copy code
dagster._check.CheckError: Failure condition: Tried to access partition key for asset 'AssetKey(['idealratings_sfdr_metric'])', but the number of input partitions != 1: 'DefaultPartitionsSubset(subset={'e_pillar', 's_pillar', 'g_pillar', 'passes_e_screen'}, partitions_def=Dynamic partitions: "sfdr_metric")'.
  File "/opt/homebrew/Caskroom/miniforge/base/envs/py311/lib/python3.11/site-packages/dagster/_core/execution/plan/utils.py", line 54, in op_execution_error_boundary
    yield
  File "/opt/homebrew/Caskroom/miniforge/base/envs/py311/lib/python3.11/site-packages/dagster/_core/execution/plan/inputs.py", line 826, in _load_input_with_input_manager
    value = input_manager.load_input(context)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Caskroom/miniforge/base/envs/py311/lib/python3.11/site-packages/dagster_gcp/gcs/io_manager.py", line 62, in load_input
    key = self._get_path(context)
          ^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Caskroom/miniforge/base/envs/py311/lib/python3.11/site-packages/dagster_gcp/gcs/io_manager.py", line 31, in _get_path
    path = context.get_asset_identifier()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Caskroom/miniforge/base/envs/py311/lib/python3.11/site-packages/dagster/_core/execution/context/input.py", line 459, in get_asset_identifier
    return [*self.asset_key.path, self.asset_partition_key]
Is it implemented for the bigquery io manager? I could use that instead
@claire @sandy sorry to bug you guys
Hi, I'm trying it now with
bigquery_pandas_io_manager
but now I get this error
Copy code
dagster._check.CheckError: Failure condition: The instance is not available to load partitions. You may be seeing this error when using dynamic partitions with a version of dagit or dagster-cloud that is older than 1.1.18.
  File "/opt/homebrew/Caskroom/miniforge/base/envs/py311/lib/python3.11/site-packages/dagster/_core/errors.py", line 204, in user_code_error_boundary
    yield
  File "/opt/homebrew/Caskroom/miniforge/base/envs/py311/lib/python3.11/site-packages/dagster/_grpc/impl.py", line 475, in get_partition_tags
    tags = partitioned_config.get_tags_for_partition_key(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Caskroom/miniforge/base/envs/py311/lib/python3.11/site-packages/dagster/_core/definitions/partition.py", line 755, in get_tags_for_partition_key
    partition = self._key_to_partition(partition_key, current_time, instance)
I'm running
Copy code
dagster dev -f esg/regulatory_frameworks/regulatory_frameworks.py
trying it with dagit instead
it didn't work....
c
hey yang, apologies for this error. This is a known issue (a regression introduced in 1.2.4): https://github.com/dagster-io/dagster/issues/13309 which will be fixed in this week's release. Downgrading to 1.2.3 should resolve this
Yep, I think the
gcs_pickle_io_manager
currently only support loading from a single partition or unpartitioned asset. I believe the bigquery IO manager supports loading from multiple partitions.
error message: Multiple partitioned assets exist in assets job 'sfdr_alignment_job'. Selected assets must have the same partitions definitions, but the selected assets have different partitions definitions:
This error is raised when assets with different partitions defs are put in the same job defined via
define_asset_job
. Dagster currently only supports jobs with assets that use the same partitions def, or are unpartitioned.
y
oh OK, but I can put them in separate jobs and that should work?
c
yes
y
oooh got it, thanks I'll try that!
oh no, I'm getting this other error now.
Copy code
AttributeError: 'Series' object has no attribute 'iteritems'
  File "/opt/homebrew/Caskroom/miniforge/base/envs/py311/lib/python3.11/site-packages/dagster/_core/execution/plan/utils.py", line 54, in op_execution_error_boundary
    yield
  File "/opt/homebrew/Caskroom/miniforge/base/envs/py311/lib/python3.11/site-packages/dagster/_utils/__init__.py", line 472, in iterate_with_context
    next_output = next(iterator)
                  ^^^^^^^^^^^^^^
  File "/opt/homebrew/Caskroom/miniforge/base/envs/py311/lib/python3.11/site-packages/dagster/_core/execution/plan/execute_step.py", line 595, in _gen_fn
    gen_output = output_manager.handle_output(output_context, output.value)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Caskroom/miniforge/base/envs/py311/lib/python3.11/site-packages/dagster/_core/storage/db_io_manager.py", line 138, in handle_output
    self._handlers_by_type[obj_type].handle_output(context, table_slice, obj, conn)
  File "/opt/homebrew/Caskroom/miniforge/base/envs/py311/lib/python3.11/site-packages/dagster_gcp_pandas/bigquery/bigquery_pandas_type_handler.py", line 58, in handle_output
    for name, dtype in obj.dtypes.iteritems()
                       ^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Caskroom/miniforge/base/envs/py311/lib/python3.11/site-packages/pandas/core/generic.py", line 5989, in __getattr__
    return object.__getattribute__(self, name)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Well I guess I can wait for the release this week. I just want to make sure, does the bigquery io manager handle Multi-partitions, too? My upstream asset has a static partition and a dynamic partition. Then the downstream asset only has the static partition. The static partition doesn't have a name, though. That's ok?
Copy code
year_partitions_def = StaticPartitionsDefinition(YEARS)

sfdr_metric_year_partitions_def=MultiPartitionsDefinition(
        {
            "year": year_partitions_def,
            "sfdr_metric": sfdr_metric_partitions_def
        }
    )
c
I believe it handles multipartitions. @jamie do you know why the error above is raised?
j
yeah the bigquery io manager handles multi partitions. Are you returning a pandas Series from your op/asset instead of a pandas DataFrame? the bigquery IO manager expects DataFrames, and it looks like a series is being passed instead
y
no I'm actually passing a Dataframe. I tried 1.2.2 and I get the same error
Ok great that is handled multi-partitions! What do I put for
partition_expr
for multi-partitions? An array?
j
ah ok good to know. what pandas version are you using? i think iteritems has been marked for deprecation (we need to update our code), so if your pandas version is newer it might be fully gone
for partition expr you’ll put a dictionary of the partition name to the expression. here’s an example https://docs.dagster.io/integrations/bigquery/reference#storing-multi-partitioned-assets
y
I'm using pandas 2.0.0
Oh yeah, so about partition expr. It seems like static partitions don't have names though? How do I add them to the dict? Thanks!
It actually worked with a single partition when I downgraded pandas to 1.5.3, go figure. Thanks for the clue, though! I will try now with a multi-partition.
Hm, I'm getting this error now for the downstream asset that is defined with one of the StaticPartitions. The upstream is a MultiPartition with 2 StaticPartitions. Is this something that will be resolved with the release this week?
Copy code
dagster._check.CheckError: Failure condition: Tried to access partition key for asset 'AssetKey(['idealratings_sfdr_metric'])', but the number of input partitions != 1: 'DefaultPartitionsSubset(subset={'e_screen|2022', 'e_pillar|2022'}, partitions_def=Multi-partitioned, with dimensions:
Sfdr_metric: 'e_pillar', 'e_screen'
Year: '2016', '2017', '2018', '2019', '2020', '2021', '2022')'.
  File "/opt/homebrew/Caskroom/miniforge/base/envs/py311/lib/python3.11/site-packages/dagster/_core/execution/plan/utils.py", line 55, in op_execution_error_boundary
    yield
  File "/opt/homebrew/Caskroom/miniforge/base/envs/py311/lib/python3.11/site-packages/dagster/_core/execution/plan/inputs.py", line 836, in _load_input_with_input_manager
    value = input_manager.load_input(context)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Caskroom/miniforge/base/envs/py311/lib/python3.11/site-packages/dagster/_core/storage/db_io_manager.py", line 165, in load_input
    table_slice = self._get_table_slice(context, cast(OutputContext, context.upstream_output))
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Caskroom/miniforge/base/envs/py311/lib/python3.11/site-packages/dagster/_core/storage/db_io_manager.py", line 206, in _get_table_slice
    MultiPartitionKey, context.asset_partition_key
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Caskroom/miniforge/base/envs/py311/lib/python3.11/site-packages/dagster/_core/execution/context/input.py", line 348, in asset_partition_key
    check.failed(
  File "/opt/homebrew/Caskroom/miniforge/base/envs/py311/lib/python3.11/site-packages/dagster/_check/__init__.py", line 1699, in failed
    raise CheckError(f"Failure condition: {desc}")
The upstream and downstream assets. I'm using dagster 1.2.2 now.
Copy code
@asset(
        name=metric_asset_name,
        partitions_def=sfdr_metric_year_partitions_def,
        metadata={
            "partition_expr": {
                "sfdr_metric": "METRIC",
                "year": "YEAR"}
            }
    )
    def sfdr_metric(context):
        """metric per year, sfdr_metric
        """
        partition_keys = context.partition_key.keys_by_dimension
        metric_name = partition_keys['sfdr_metric']
        year = partition_keys["year"]
        # metric_name = context.partition_key
        <http://context.log.info|context.log.info>(f"get metric data for {metric_name}")
        
        mdata = pd.DataFrame({"orgpermid": [len(metric_name), 2, 3]})
        mdata.loc[:, "metric"] = metric_name
        mdata.loc[:, "year"] = year
        return Output(
            mdata,
            metadata={"num_rows": len(mdata)})

    alignment_asset_name = f"{dataset}_sfdr_alignment"
    @asset(
        name=alignment_asset_name,
        ins={
            "sfdr_metric": AssetIn(metric_asset_name)
        },
        partitions_def=year_partitions_def,
        metadata={
            "partition_expr": {
                "sfdr_metric": "METRIC",
                "year": "YEAR"}
            }
    )
    def sfdr_alignment(context, sfdr_metric):
        """compute alignment score
        
        sfdr_metric will be a dict of all the partitions
        """
        <http://context.log.info|context.log.info>(sfdr_metric)
        context.log.debug(list(sfdr_metric))
        
        data = pd.DataFrame({"orgpermid": [1, 2, 3]})
        data.loc[:, "year"] = context.partition_key
        return Output(
            data,
            metadata={"num_rows": len(data)})
j
can you share how you’re defining you multi partitions? i’m pretty sure each sub-partition should have a name associated with it
c
I think the issue is probably that the db IO manager assumes that there's only one upstream multipartition key, when there could be multiple: https://github.com/dagster-io/dagster/blob/5496617a52d32e7508ac86b79fc225bd8dbfbc9[…]3/python_modules/dagster/dagster/_core/storage/db_io_manager.py
y
Yes, the upstream asset with the multi-partition runs fine with the partition_expr. With the downstream asset, I tried it with only the
year_partition_def
or with no partition, and they both generate the error
number of input partitions != 1:
It's 1.2.2, though. So maybe it will be resolved soon?
Copy code
year_partitions_def = StaticPartitionsDefinition(["2021", "2022"])
sfdr_metric_partitions_def = StaticPartitionsDefinition(
     ["e_pillar", "e_screen"])

sfdr_metric_year_partitions_def=MultiPartitionsDefinition(
        {
            "year": year_partitions_def,
            "sfdr_metric": sfdr_metric_partitions_def
        }
    )
Hi! @jamie I tried with the dagster update and I still got this error. I'm using the 2 StaticPartitions right now
Copy code
dagster._check.CheckError: Failure condition: Tried to access partition key for asset 'AssetKey(['idealratings_sfdr_metric'])', but the number of input partitions != 1: 'DefaultPartitionsSubset(subset={'e_screen|2022', 'e_pillar|2022'}, partitions_def=Multi-partitioned, with dimensions: 
Sfdr_metric: 'e_pillar', 'e_screen' 
Year: '2021', '2022')'.
  File "/opt/homebrew/Caskroom/miniforge/base/envs/py311/lib/python3.11/site-packages/dagster/_core/execution/plan/utils.py", line 54, in op_execution_error_boundary
    yield
  File "/opt/homebrew/Caskroom/miniforge/base/envs/py311/lib/python3.11/site-packages/dagster/_core/execution/plan/inputs.py", line 819, in _load_input_with_input_manager
    value = input_manager.load_input(context)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Caskroom/miniforge/base/envs/py311/lib/python3.11/site-packages/dagster/_core/storage/db_io_manager.py", line 165, in load_input
    table_slice = self._get_table_slice(context, cast(OutputContext, context.upstream_output))
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Caskroom/miniforge/base/envs/py311/lib/python3.11/site-packages/dagster/_core/storage/db_io_manager.py", line 206, in _get_table_slice
    MultiPartitionKey, context.asset_partition_key
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Caskroom/miniforge/base/envs/py311/lib/python3.11/site-packages/dagster/_core/execution/context/input.py", line 347, in asset_partition_key
    check.failed(
  File "/opt/homebrew/Caskroom/miniforge/base/envs/py311/lib/python3.11/site-packages/dagster/_check/__init__.py", line 1669, in failed
    raise CheckError(f"Failure condition: {desc}")
It works with the default fs io_manager, though! fyi, @claire @jamie
j
just so i’m on the same page as you, you have assets that are partitioned with a Multi-Partition containing two sub Static Partitions. Then are you trying to materialize multiple partitions at once? That specific case (materialized multiple partitions of a MultiPartition) isn’t supported by the db io managers (I was a bit confused earlier in the thread since a MultiPartition and materializing multiple partitions sounds so similar)
y
I have an upstream asset that is the MultiPartition, and then the downstream asset uses one of the partitions (year). So the upstream ones materialize fine (eg. a1|b1, a2|b2), but I can't materialize the downstream one (b2) that would take in a1|b1 and a2|b1 etc of the upstream one. But it does work with the file system io_manager
j
ok. I think it’s likely the same issue since two partitions are feeding into a single partition. it’s on my list of things to fix
y
Oh! What's your timeline for addressing this? Hopefully for the release next week?
j
unlikely that it’ll be next week - i’ve got a couple other high priority things to get done first. I can carve out some time the week after though
y
Ok great! That works for me, thanks!!
Hello, sorry to bug you. I'm wondering what the timeline is on this. If you have a ticket I can follow it. Thanks!