Hello, what’s the best way to deal with “subset” d...
# ask-community
l
Hello, what’s the best way to deal with “subset” daily partitioned assets? eg asset B (partitions start on Oct 31 2022) now depends on asset A (partitions start on April 14 2023) I get errors because the “old” asset cannot be materialized. Can we provide a default/fallback?
s
@claire - I believe this is relevant to the change you've been working on. Interested in your thoughts.
c
Hey Louis, what would be the behavior you would want here? Would the Oct 31 2022 .... April 14 2023 partitions just depend on no upstream partitions? In my mind, if this is the case, the expected input would not exist so those downstream partitions can't be materialized. Wondering what your use case is for materializing them?
l
Hi Claire! Our use case is that we want to process jointly two assets (A and B) to produce C Initially, it was only A -> C, but we changed the logic recently to add this dependency on B Asset B only exists since April 14, but we’d still like to materialize C before this date, otherwise we’d loose a lot of history I agree that it most cases, processing can’t happen / C can’t be materialized if any upstream asset is missing, so the current behavior is fine. However, there are cases were a missing input is fine, and we should be able to provide a safe fallback in this case The way I currently achieve this is as follows: 1. Tweak our IO manager and add this:
Copy code
def load_input(self, context: InputContext) -> Any:
        if context.has_asset_partitions and not context.asset_partitions_def.has_partition_key(context.partition_key):
            context.log.warning("missing partition")
            return None
2. Tweak the definition of asset C:
Copy code
def asset_c(context: OpExecutionContext, asset_a: pd.DataFrame, asset_b: Optional[set[str]]) -> Output[DataFrame]:
    if asset_b is None:
        asset_b = set()
c
Hey Louis. That makes sense--one solution that comes to mind is to be able to configure the
TimeWindowPartitionMapping
to filter out nonexistent partitions. So for the Oct 31 2022 .... April 14 2023 partitions in the downstream asset, the partition mapping would return an empty list of upstream partition dependencies. Subsequently, there would be no upstream partitions and the asset would just receive a None/empty dict input.
Would you mind filing an issue for this? I'll try and update the backend portion in the coming week. The UI does its own upstream partition resolution to display the warnings, which we should unify. But for now you can just ignore the warnings and kick off the run.
l
Sure, I can try and file an issue later this week, thanks for your answer
🌈 1
Hello @claire I’ll file the issue tomorrow — we tried to upgrade to 1.3.5 today and now the workaround doesn’t work because the backfill is prevented to run:
Copy code
dagster._core.errors.DagsterInvalidInvocationError: Provided time windows [TimeWindow(start=DateTime(2023, 3, 22, 0, 0, 0, tzinfo=Timezone('UTC')), end=DateTime(2023, 3, 23, 0, 0, 0, tzinfo=Timezone('UTC')))] contain invalid time windows for partitions definition Daily, starting 2023-04-14 UTC.
  File "/usr/local/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_plan.py", line 262, in dagster_event_sequence_for_step
    for step_event in check.generator(step_events):
  File "/usr/local/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 335, in core_dagster_event_sequence_for_step
    for event_or_input_value in step_input.source.load_input_object(step_context, input_def):
  File "/usr/local/lib/python3.9/site-packages/dagster/_core/execution/plan/inputs.py", line 166, in load_input_object
    load_input_context = step_context.for_input_manager(
  File "/usr/local/lib/python3.9/site-packages/dagster/_core/execution/context/system.py", line 632, in for_input_manager
    self.asset_partitions_subset_for_input(name)
  File "/usr/local/lib/python3.9/site-packages/dagster/_core/execution/context/system.py", line 967, in asset_partitions_subset_for_input
    return partition_mapping.get_upstream_partitions_for_partitions(
  File "/usr/local/lib/python3.9/site-packages/dagster/_core/definitions/time_window_partition_mapping.py", line 114, in get_upstream_partitions_for_partitions
    return self._map_partitions(
  File "/usr/local/lib/python3.9/site-packages/dagster/_core/definitions/time_window_partition_mapping.py", line 244, in _map_partitions
    raise DagsterInvalidInvocationError(
I guess we’ll have to tweak
TimeWindowPartitionMapping
c
Hey Louis, I didn't get around to updating enabling the time window partition mapping to filter out nonexistent partitions, but I'm going to work on getting a fix hopefully in next week's release. We put in a change in 1.3.4 to
TimeWindowPartitionMapping
that raises this error when the upstream partitions are nonexistent. I think until we add the fix above that allows you to filter out nonexistent partitions instead of raising an error, you will need to keep your dagster version at 1.3.3 and before, sorry about that
l
No worries, thanks for all the hard work
🌈 1
c