Pezhman Zarabadi-Poor
01/27/2023, 5:14 PMStaticPartitionsDefinitions
within the following context to process a series of files. I'm using Dagster 1.1.13
and duckdb-polars-io-manager
(similar to pandas
that handles polars
dataframe). I have a code based on the following snippet.
import polars as pl
from dagster import asset, StaticPartitionsDefinition
def get_my_list():
return [f'my_file_{i}' for i in range(0,10)]
get_my_list()
@asset(
partitions_def=StaticPartitionsDefinition(partition_keys=get_my_list()),
)
def process_output(context):
partition_string = context.asset_partition_key_for_output()
pq_path = f'/path/to/data/{partition_string}.parquet'
return pl.read_parquet(pq_path)
However, when I materialize the asset, I get the following error:
ValueError: Tried to get asset partitions for an output that correponds to a partitioned asset that is not partitioned with a TimeWindowPartitionsDefinition.
File "/home/pezhman/repos/Cloud/dagster-selfdeps-poc/.venv/lib/python3.10/site-packages/dagster/_core/execution/plan/utils.py", line 55, in op_execution_error_boundary
yield
File "/home/pezhman/repos/Cloud/dagster-selfdeps-poc/.venv/lib/python3.10/site-packages/dagster/_utils/__init__.py", line 457, in iterate_with_context
next_output = next(iterator)
File "/home/pezhman/repos/Cloud/dagster-selfdeps-poc/.venv/lib/python3.10/site-packages/dagster/_core/execution/plan/execute_step.py", line 621, in _gen_fn
gen_output = output_manager.handle_output(output_context, output.value)
File "/home/pezhman/repos/Cloud/dagster-selfdeps-poc/.venv/lib/python3.10/site-packages/dagster/_core/storage/db_io_manager.py", line 97, in handle_output
table_slice = self._get_table_slice(context, context)
File "/home/pezhman/repos/Cloud/dagster-selfdeps-poc/.venv/lib/python3.10/site-packages/dagster/_core/storage/db_io_manager.py", line 171, in _get_table_slice
context.asset_partitions_time_window if context.has_asset_partitions else None
File "/home/pezhman/repos/Cloud/dagster-selfdeps-poc/.venv/lib/python3.10/site-packages/dagster/_core/execution/context/output.py", line 480, in asset_partitions_time_window
return self.step_context.asset_partitions_time_window_for_output(self.name)
File "/home/pezhman/repos/Cloud/dagster-selfdeps-poc/.venv/lib/python3.10/site-packages/dagster/_core/execution/context/system.py", line 1004, in asset_partitions_time_window_for_output
raise ValueError(
Base on my readings, it seems that the issue should be coming from io-manager
not knowing how to handle the StaticPartitionDefinitions
in this case. I assume that I should implement such logic within my io-manager
,however, I thought to share it here to ask about your opinions for best practices to handle such cases.
Thanks.