Thread: In implementing an IOManager's `load_input...
# ask-community
j
Thread: In implementing an IOManager's
load_input
method, what is the right way to check if the upstream and downstream assets share a partition definition?
This was working for me in ad hoc runs, but fails during a partitioned job.
Copy code
if context.has_partition_key and context.has_asset_partitions:
    shared_partition = context.partition_key in context.asset_partitions_def.get_partition_keys()
My understanding was
context.partition_key
is the downstream key and
context.asset_partitions_def
is the upstream partitions definition
s
I would try:
Copy code
if context.has_partition_key and context.has_asset_partitions:
    shared_partition = context.partition_key in context.asset_partition_keys
context.partition_key
corresponds to the run-wide partition key, which ends up being equal to the downstream key
context.asset_partition_keys
returns all upstream partition keys that correspond to the input being loaded
j
Thanks Sandy!
Hey, so this fixed the asset that was erring, but now I get another one:
Copy code
AttributeError: 'StaticPartitionsDefinition' object has no attribute 'timezone'
It's this line:
Copy code
shared_partition = context.partition_key in context.asset_partition_keys
Is there a way to check that the upstream keys are string and not datetimes?
s
yikes - mind sharing the full stack trace?
do both the upstream and downstream assets have StaticPartitionsDefinitions?
j
No, the downstream are daily
Copy code
AttributeError: 'StaticPartitionsDefinition' object has no attribute 'timezone'
  File "/home/john/.pyenv/versions/3.10.8/envs/dagster/lib/python3.10/site-packages/dagster/_core/execution/plan/utils.py", line 47, in solid_execution_error_boundary
    yield
  File "/home/john/.pyenv/versions/3.10.8/envs/dagster/lib/python3.10/site-packages/dagster/_core/execution/plan/inputs.py", line 867, in _load_input_with_input_manager
    value = input_manager.load_input(context)
  File "/home/john/repos/dagster/nba/sports_data/duckdb_io_manager.py", line 250, in load_input
    ret = self._input_handlers_by_type[obj_type].load_input(context, conn=conn)
  File "/home/john/repos/dagster/nba/sports_data/duckdb_io_manager.py", line 77, in load_input
    shared_partition = context.partition_key in context.asset_partition_keys
  File "/home/john/.pyenv/versions/3.10.8/envs/dagster/lib/python3.10/site-packages/dagster/_core/execution/context/input.py", line 339, in asset_partition_keys
    return self.asset_partitions_def.get_partition_keys_in_range(self.asset_partition_key_range)
  File "/home/john/.pyenv/versions/3.10.8/envs/dagster/lib/python3.10/site-packages/dagster/_core/execution/context/input.py", line 330, in asset_partition_key_range
    return self.step_context.asset_partition_key_range_for_input(self.name)
  File "/home/john/.pyenv/versions/3.10.8/envs/dagster/lib/python3.10/site-packages/dagster/_core/execution/context/system.py", line 813, in asset_partition_key_range_for_input
    return get_upstream_partitions_for_partition_range(
  File "/home/john/.pyenv/versions/3.10.8/envs/dagster/lib/python3.10/site-packages/dagster/_core/definitions/asset_partitions.py", line 24, in get_upstream_partitions_for_partition_range
    return downstream_partition_mapping.get_upstream_partitions_for_partition_range(
  File "/home/john/.pyenv/versions/3.10.8/envs/dagster/lib/python3.10/site-packages/dagster/_core/definitions/time_window_partition_mapping.py", line 42, in get_upstream_partitions_for_partition_range
    return self._map_partitions(
  File "/home/john/.pyenv/versions/3.10.8/envs/dagster/lib/python3.10/site-packages/dagster/_core/definitions/time_window_partition_mapping.py", line 74, in _map_partitions
    if to_partitions_def.timezone != from_partitions_def.timezone:
s
for day X, what static partitions do you want to load? all of them? a particular one?
j
My understanding was by default I should get all of them, so was going to just do that. I would need a mapper to do fancier logic right?
As I think about it though I think I’m just doing it wrong…
Ultimately will need different paths for load input and write output methods
Since the partition reference is different
I was trying to reuse get_path
s
Gotcha - if you want to get all of them, you need to use the
AllPartitionMapping()
, because Dagster doesn't know what to do when a date-partitioned asset depends on a static-partitioned asset.
👍 1
j
I’ll take a look!
That worked -- making it explicit with the
AllPartitionMapping
. I had for some reason thought it would automatically do it that way