https://dagster.io/ logo
Title
j

John Sears

11/19/2022, 10:37 PM
Thread: In implementing an IOManager's
load_input
method, what is the right way to check if the upstream and downstream assets share a partition definition?
This was working for me in ad hoc runs, but fails during a partitioned job.
if context.has_partition_key and context.has_asset_partitions:
    shared_partition = context.partition_key in context.asset_partitions_def.get_partition_keys()
My understanding was
context.partition_key
is the downstream key and
context.asset_partitions_def
is the upstream partitions definition
s

sandy

11/21/2022, 4:10 PM
I would try:
if context.has_partition_key and context.has_asset_partitions:
    shared_partition = context.partition_key in context.asset_partition_keys
context.partition_key
corresponds to the run-wide partition key, which ends up being equal to the downstream key
context.asset_partition_keys
returns all upstream partition keys that correspond to the input being loaded
j

John Sears

11/29/2022, 5:56 PM
Thanks Sandy!
Hey, so this fixed the asset that was erring, but now I get another one:
AttributeError: 'StaticPartitionsDefinition' object has no attribute 'timezone'
It's this line:
shared_partition = context.partition_key in context.asset_partition_keys
Is there a way to check that the upstream keys are string and not datetimes?
s

sandy

12/02/2022, 12:33 AM
yikes - mind sharing the full stack trace?
do both the upstream and downstream assets have StaticPartitionsDefinitions?
j

John Sears

12/02/2022, 12:33 AM
No, the downstream are daily
AttributeError: 'StaticPartitionsDefinition' object has no attribute 'timezone'
  File "/home/john/.pyenv/versions/3.10.8/envs/dagster/lib/python3.10/site-packages/dagster/_core/execution/plan/utils.py", line 47, in solid_execution_error_boundary
    yield
  File "/home/john/.pyenv/versions/3.10.8/envs/dagster/lib/python3.10/site-packages/dagster/_core/execution/plan/inputs.py", line 867, in _load_input_with_input_manager
    value = input_manager.load_input(context)
  File "/home/john/repos/dagster/nba/sports_data/duckdb_io_manager.py", line 250, in load_input
    ret = self._input_handlers_by_type[obj_type].load_input(context, conn=conn)
  File "/home/john/repos/dagster/nba/sports_data/duckdb_io_manager.py", line 77, in load_input
    shared_partition = context.partition_key in context.asset_partition_keys
  File "/home/john/.pyenv/versions/3.10.8/envs/dagster/lib/python3.10/site-packages/dagster/_core/execution/context/input.py", line 339, in asset_partition_keys
    return self.asset_partitions_def.get_partition_keys_in_range(self.asset_partition_key_range)
  File "/home/john/.pyenv/versions/3.10.8/envs/dagster/lib/python3.10/site-packages/dagster/_core/execution/context/input.py", line 330, in asset_partition_key_range
    return self.step_context.asset_partition_key_range_for_input(self.name)
  File "/home/john/.pyenv/versions/3.10.8/envs/dagster/lib/python3.10/site-packages/dagster/_core/execution/context/system.py", line 813, in asset_partition_key_range_for_input
    return get_upstream_partitions_for_partition_range(
  File "/home/john/.pyenv/versions/3.10.8/envs/dagster/lib/python3.10/site-packages/dagster/_core/definitions/asset_partitions.py", line 24, in get_upstream_partitions_for_partition_range
    return downstream_partition_mapping.get_upstream_partitions_for_partition_range(
  File "/home/john/.pyenv/versions/3.10.8/envs/dagster/lib/python3.10/site-packages/dagster/_core/definitions/time_window_partition_mapping.py", line 42, in get_upstream_partitions_for_partition_range
    return self._map_partitions(
  File "/home/john/.pyenv/versions/3.10.8/envs/dagster/lib/python3.10/site-packages/dagster/_core/definitions/time_window_partition_mapping.py", line 74, in _map_partitions
    if to_partitions_def.timezone != from_partitions_def.timezone:
s

sandy

12/02/2022, 4:26 PM
for day X, what static partitions do you want to load? all of them? a particular one?
j

John Sears

12/02/2022, 4:33 PM
My understanding was by default I should get all of them, so was going to just do that. I would need a mapper to do fancier logic right?
As I think about it though I think I’m just doing it wrong…
Ultimately will need different paths for load input and write output methods
Since the partition reference is different
I was trying to reuse get_path
s

sandy

12/02/2022, 4:41 PM
Gotcha - if you want to get all of them, you need to use the
AllPartitionMapping()
, because Dagster doesn't know what to do when a date-partitioned asset depends on a static-partitioned asset.
👍 1
j

John Sears

12/02/2022, 4:41 PM
I’ll take a look!
That worked -- making it explicit with the
AllPartitionMapping
. I had for some reason thought it would automatically do it that way