Jeff
02/01/2023, 7:57 PMIOManager
with MultiPartitionsDefinition
I think was caused changes in the latest release. Details in 🧵context.asset_partition_key
I see that context.asset_partitions_def
is an instance of MultiPartitionsDefinition
,
however, context.asset_partition_key
is a string of format key_1|key_2
and not MultiPartitionKey
. This was previously working, but after I upgraded from 1.10 to 1.14, it does this behavior now.jamie
02/01/2023, 8:23 PMJeff
02/01/2023, 8:23 PMfrom dagster import StaticPartitionsDefinition, ResourceDefinition, asset, AssetIn, materialize, io_manager, IOManager, MultiPartitionKey, MultiPartitionsDefinition
dummy_multi_partition = MultiPartitionsDefinition(
{
"part_a": StaticPartitionsDefinition(["abc", "def"]),
"part_b": StaticPartitionsDefinition(["123", "456"]),
}
)
@io_manager()
def mock_delta_io_manager(_):
class TestIOManager(IOManager):
def handle_output(self, context, obj):
print(f"TYPE: {type(context.asset_partition_key)}")
print(context.asset_partition_key)
assert isinstance(context.asset_partition_key, MultiPartitionKey)
def load_input(self, context):
pass
return TestIOManager()
@asset(
name="dummy_asset_upstream",
metadata={"dagster_partition_cols": ["foo", "date"], "additional_partition_cols": ["test_col"]},
io_manager_def=mock_delta_io_manager,
partitions_def=dummy_multi_partition
)
def dummy_asset_upstream(context):
return 1234
materialize(
[dummy_asset_upstream], partition_key=MultiPartitionKey({"part_a": "abc", "part_b": "123"})
)
partition: abc|123
in addition to partition/part_a: abc
and partition/part_b: 123
.
In 1.10, I only see partition/part_a: abc
and partition/part_b: 123
.claire
02/02/2023, 12:58 AMMoulay Chihani
04/25/2023, 3:04 PM