Alex Prykhodko
12/06/2022, 12:21 AMs3_pickle_io_manager
. Works as expected when using fs_io_manager
(the input upstream argument is a dict with keys as partitions).
Code:
@asset(partitions_def=StaticPartitionsDefinition(get_partitions()))
def sa_metrics_normalized(context: OpExecutionContext, sa_metrics_raw):
...
@asset
def sa_metrics_data_frame(context: OpExecutionContext, sa_metrics_normalized):
...
Error:
dagster._check.CheckError: Failure condition: Tried to access partition key for input 'sa_metrics_normalized' of step 'sa_metrics_data_frame', but the step input has a partition range: '2014-01' to '2016-12'.
File "/usr/local/lib/python3.8/site-packages/dagster/_core/execution/plan/utils.py", line 52, in solid_execution_error_boundary
yield
File "/usr/local/lib/python3.8/site-packages/dagster/_core/execution/plan/inputs.py", line 856, in _load_input_with_input_manager
value = input_manager.load_input(context)
File "/usr/local/lib/python3.8/site-packages/dagster_aws/s3/io_manager.py", line 72, in load_input
key = self._get_path(context)
File "/usr/local/lib/python3.8/site-packages/dagster_aws/s3/io_manager.py", line 33, in _get_path
path = context.get_asset_identifier()
File "/usr/local/lib/python3.8/site-packages/dagster/_core/execution/context/input.py", line 409, in get_asset_identifier
return [*self.asset_key.path, self.asset_partition_key]
File "/usr/local/lib/python3.8/site-packages/dagster/_core/execution/context/input.py", line 324, in asset_partition_key
return self.step_context.asset_partition_key_for_input(self.name)
File "/usr/local/lib/python3.8/site-packages/dagster/_core/execution/context/system.py", line 915, in asset_partition_key_for_input
check.failed(
File "/usr/local/lib/python3.8/site-packages/dagster/_check/__init__.py", line 1642, in failed
raise CheckError(f"Failure condition: {desc}")
owen
12/06/2022, 12:49 AMs3_pickle_io_manager
does not currently support loading ranges of partitions (it can only read a single partition at a time). By default, the assumption is that when a non-partitioned asset is downstream of a partitioned asset, the non-partitioned asset will need to load ALL partitions of the upstream asset as input. If that's not the behavior you want, you can override that by setting partition_mappings in your downstream asset (for example, the LastPartitionMapping will only load the most recent partition for the upstream asset, which should not cause an error in this case)
I believe multi-partition support should be a straightforward feature to add to the s3 io manager, if you don't mind creating a github issue!Alex Prykhodko
12/06/2022, 12:54 AM