Yang
03/31/2023, 11:41 PMyear_partitions_def = StaticPartitionsDefinition(YEARS)
sfdr_metric_partitions_def = DynamicPartitionsDefinition(
name="sfdr_metric")
sfdr_metric_year_partitions_def=MultiPartitionsDefinition(
{
"year": year_partitions_def,
"sfdr_metric": sfdr_metric_partitions_def
}
)
The upstream asset uses the MultiPartition and a downstream asset uses the year_partitions_def
I get an error when I have the 2 assets in a job, though.
sfdr_alignment_job = define_asset_job(
"sfdr_alignment_job",
selection=asset_names,
executor_def=executor
)
error message: Multiple partitioned assets exist in assets job 'sfdr_alignment_job'. Selected assets must have the same partitions definitions, but the selected assets have different partitions definitions:Yang
03/31/2023, 11:53 PMYang
04/01/2023, 4:07 PMgcs_pickle_io_manager
It doesn't seem to support merging all the partitions of the upstream asset. I deploy with k8s so I don't think I can use the defualt io manager. How can I modify this to work or is there another io_manager I should use? Thanks!
dagster._check.CheckError: Failure condition: Tried to access partition key for asset 'AssetKey(['idealratings_sfdr_metric'])', but the number of input partitions != 1: 'DefaultPartitionsSubset(subset={'e_pillar', 's_pillar', 'g_pillar', 'passes_e_screen'}, partitions_def=Dynamic partitions: "sfdr_metric")'.
File "/opt/homebrew/Caskroom/miniforge/base/envs/py311/lib/python3.11/site-packages/dagster/_core/execution/plan/utils.py", line 54, in op_execution_error_boundary
yield
File "/opt/homebrew/Caskroom/miniforge/base/envs/py311/lib/python3.11/site-packages/dagster/_core/execution/plan/inputs.py", line 826, in _load_input_with_input_manager
value = input_manager.load_input(context)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/Caskroom/miniforge/base/envs/py311/lib/python3.11/site-packages/dagster_gcp/gcs/io_manager.py", line 62, in load_input
key = self._get_path(context)
^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/Caskroom/miniforge/base/envs/py311/lib/python3.11/site-packages/dagster_gcp/gcs/io_manager.py", line 31, in _get_path
path = context.get_asset_identifier()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/Caskroom/miniforge/base/envs/py311/lib/python3.11/site-packages/dagster/_core/execution/context/input.py", line 459, in get_asset_identifier
return [*self.asset_key.path, self.asset_partition_key]
Yang
04/01/2023, 8:08 PMYang
04/03/2023, 4:15 PMYang
04/03/2023, 10:01 PMbigquery_pandas_io_manager
but now I get this error
dagster._check.CheckError: Failure condition: The instance is not available to load partitions. You may be seeing this error when using dynamic partitions with a version of dagit or dagster-cloud that is older than 1.1.18.
File "/opt/homebrew/Caskroom/miniforge/base/envs/py311/lib/python3.11/site-packages/dagster/_core/errors.py", line 204, in user_code_error_boundary
yield
File "/opt/homebrew/Caskroom/miniforge/base/envs/py311/lib/python3.11/site-packages/dagster/_grpc/impl.py", line 475, in get_partition_tags
tags = partitioned_config.get_tags_for_partition_key(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/Caskroom/miniforge/base/envs/py311/lib/python3.11/site-packages/dagster/_core/definitions/partition.py", line 755, in get_tags_for_partition_key
partition = self._key_to_partition(partition_key, current_time, instance)
Yang
04/03/2023, 10:02 PMdagster dev -f esg/regulatory_frameworks/regulatory_frameworks.py
Yang
04/03/2023, 10:03 PMYang
04/03/2023, 10:07 PMclaire
04/03/2023, 10:40 PMclaire
04/03/2023, 10:44 PMgcs_pickle_io_manager
currently only support loading from a single partition or unpartitioned asset. I believe the bigquery IO manager supports loading from multiple partitions.claire
04/03/2023, 10:45 PMerror message: Multiple partitioned assets exist in assets job 'sfdr_alignment_job'. Selected assets must have the same partitions definitions, but the selected assets have different partitions definitions:This error is raised when assets with different partitions defs are put in the same job defined via
define_asset_job
. Dagster currently only supports jobs with assets that use the same partitions def, or are unpartitioned.Yang
04/03/2023, 10:49 PMclaire
04/03/2023, 10:49 PMYang
04/03/2023, 10:49 PMYang
04/03/2023, 11:57 PMAttributeError: 'Series' object has no attribute 'iteritems'
File "/opt/homebrew/Caskroom/miniforge/base/envs/py311/lib/python3.11/site-packages/dagster/_core/execution/plan/utils.py", line 54, in op_execution_error_boundary
yield
File "/opt/homebrew/Caskroom/miniforge/base/envs/py311/lib/python3.11/site-packages/dagster/_utils/__init__.py", line 472, in iterate_with_context
next_output = next(iterator)
^^^^^^^^^^^^^^
File "/opt/homebrew/Caskroom/miniforge/base/envs/py311/lib/python3.11/site-packages/dagster/_core/execution/plan/execute_step.py", line 595, in _gen_fn
gen_output = output_manager.handle_output(output_context, output.value)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/Caskroom/miniforge/base/envs/py311/lib/python3.11/site-packages/dagster/_core/storage/db_io_manager.py", line 138, in handle_output
self._handlers_by_type[obj_type].handle_output(context, table_slice, obj, conn)
File "/opt/homebrew/Caskroom/miniforge/base/envs/py311/lib/python3.11/site-packages/dagster_gcp_pandas/bigquery/bigquery_pandas_type_handler.py", line 58, in handle_output
for name, dtype in obj.dtypes.iteritems()
^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/Caskroom/miniforge/base/envs/py311/lib/python3.11/site-packages/pandas/core/generic.py", line 5989, in __getattr__
return object.__getattribute__(self, name)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Yang
04/04/2023, 12:00 AMyear_partitions_def = StaticPartitionsDefinition(YEARS)
sfdr_metric_year_partitions_def=MultiPartitionsDefinition(
{
"year": year_partitions_def,
"sfdr_metric": sfdr_metric_partitions_def
}
)
claire
04/04/2023, 12:02 AMjamie
04/04/2023, 2:27 PMYang
04/04/2023, 3:22 PMYang
04/04/2023, 3:49 PMpartition_expr
for multi-partitions? An array?jamie
04/04/2023, 4:46 PMjamie
04/04/2023, 4:47 PMYang
04/04/2023, 5:28 PMYang
04/04/2023, 5:30 PMYang
04/04/2023, 6:39 PMYang
04/04/2023, 7:23 PMdagster._check.CheckError: Failure condition: Tried to access partition key for asset 'AssetKey(['idealratings_sfdr_metric'])', but the number of input partitions != 1: 'DefaultPartitionsSubset(subset={'e_screen|2022', 'e_pillar|2022'}, partitions_def=Multi-partitioned, with dimensions:
Sfdr_metric: 'e_pillar', 'e_screen'
Year: '2016', '2017', '2018', '2019', '2020', '2021', '2022')'.
File "/opt/homebrew/Caskroom/miniforge/base/envs/py311/lib/python3.11/site-packages/dagster/_core/execution/plan/utils.py", line 55, in op_execution_error_boundary
yield
File "/opt/homebrew/Caskroom/miniforge/base/envs/py311/lib/python3.11/site-packages/dagster/_core/execution/plan/inputs.py", line 836, in _load_input_with_input_manager
value = input_manager.load_input(context)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/Caskroom/miniforge/base/envs/py311/lib/python3.11/site-packages/dagster/_core/storage/db_io_manager.py", line 165, in load_input
table_slice = self._get_table_slice(context, cast(OutputContext, context.upstream_output))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/Caskroom/miniforge/base/envs/py311/lib/python3.11/site-packages/dagster/_core/storage/db_io_manager.py", line 206, in _get_table_slice
MultiPartitionKey, context.asset_partition_key
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/Caskroom/miniforge/base/envs/py311/lib/python3.11/site-packages/dagster/_core/execution/context/input.py", line 348, in asset_partition_key
check.failed(
File "/opt/homebrew/Caskroom/miniforge/base/envs/py311/lib/python3.11/site-packages/dagster/_check/__init__.py", line 1699, in failed
raise CheckError(f"Failure condition: {desc}")
Yang
04/04/2023, 7:24 PM@asset(
name=metric_asset_name,
partitions_def=sfdr_metric_year_partitions_def,
metadata={
"partition_expr": {
"sfdr_metric": "METRIC",
"year": "YEAR"}
}
)
def sfdr_metric(context):
"""metric per year, sfdr_metric
"""
partition_keys = context.partition_key.keys_by_dimension
metric_name = partition_keys['sfdr_metric']
year = partition_keys["year"]
# metric_name = context.partition_key
<http://context.log.info|context.log.info>(f"get metric data for {metric_name}")
mdata = pd.DataFrame({"orgpermid": [len(metric_name), 2, 3]})
mdata.loc[:, "metric"] = metric_name
mdata.loc[:, "year"] = year
return Output(
mdata,
metadata={"num_rows": len(mdata)})
alignment_asset_name = f"{dataset}_sfdr_alignment"
@asset(
name=alignment_asset_name,
ins={
"sfdr_metric": AssetIn(metric_asset_name)
},
partitions_def=year_partitions_def,
metadata={
"partition_expr": {
"sfdr_metric": "METRIC",
"year": "YEAR"}
}
)
def sfdr_alignment(context, sfdr_metric):
"""compute alignment score
sfdr_metric will be a dict of all the partitions
"""
<http://context.log.info|context.log.info>(sfdr_metric)
context.log.debug(list(sfdr_metric))
data = pd.DataFrame({"orgpermid": [1, 2, 3]})
data.loc[:, "year"] = context.partition_key
return Output(
data,
metadata={"num_rows": len(data)})
jamie
04/04/2023, 8:00 PMclaire
04/04/2023, 8:03 PMYang
04/04/2023, 8:10 PMyear_partition_def
or with no partition, and they both generate the error number of input partitions != 1:
It's 1.2.2, though. So maybe it will be resolved soon?
year_partitions_def = StaticPartitionsDefinition(["2021", "2022"])
sfdr_metric_partitions_def = StaticPartitionsDefinition(
["e_pillar", "e_screen"])
sfdr_metric_year_partitions_def=MultiPartitionsDefinition(
{
"year": year_partitions_def,
"sfdr_metric": sfdr_metric_partitions_def
}
)
Yang
04/06/2023, 7:13 PMdagster._check.CheckError: Failure condition: Tried to access partition key for asset 'AssetKey(['idealratings_sfdr_metric'])', but the number of input partitions != 1: 'DefaultPartitionsSubset(subset={'e_screen|2022', 'e_pillar|2022'}, partitions_def=Multi-partitioned, with dimensions:
Sfdr_metric: 'e_pillar', 'e_screen'
Year: '2021', '2022')'.
File "/opt/homebrew/Caskroom/miniforge/base/envs/py311/lib/python3.11/site-packages/dagster/_core/execution/plan/utils.py", line 54, in op_execution_error_boundary
yield
File "/opt/homebrew/Caskroom/miniforge/base/envs/py311/lib/python3.11/site-packages/dagster/_core/execution/plan/inputs.py", line 819, in _load_input_with_input_manager
value = input_manager.load_input(context)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/Caskroom/miniforge/base/envs/py311/lib/python3.11/site-packages/dagster/_core/storage/db_io_manager.py", line 165, in load_input
table_slice = self._get_table_slice(context, cast(OutputContext, context.upstream_output))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/Caskroom/miniforge/base/envs/py311/lib/python3.11/site-packages/dagster/_core/storage/db_io_manager.py", line 206, in _get_table_slice
MultiPartitionKey, context.asset_partition_key
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/Caskroom/miniforge/base/envs/py311/lib/python3.11/site-packages/dagster/_core/execution/context/input.py", line 347, in asset_partition_key
check.failed(
File "/opt/homebrew/Caskroom/miniforge/base/envs/py311/lib/python3.11/site-packages/dagster/_check/__init__.py", line 1669, in failed
raise CheckError(f"Failure condition: {desc}")
Yang
04/06/2023, 7:14 PMjamie
04/06/2023, 7:17 PMYang
04/06/2023, 7:20 PMjamie
04/06/2023, 7:21 PMYang
04/06/2023, 7:22 PMjamie
04/06/2023, 7:23 PMYang
04/06/2023, 7:24 PMYang
04/28/2023, 11:43 PM