Hi, what is the best way to do Fan-In using Partit...
# ask-community
b
Hi, what is the best way to do Fan-In using Partitions and Assets ? I want to scan S3 files using Daily and Hourly Partition Scheme, but then Merge them on a Daily Partition basis. I need to leverage multiple partitioning schemes to process data and combine/merge according to day granularity
s
So if I understand correctly, you have two multi-partitioned assets. The upstream one is partitioned by ("users", "hour") and the downstream one is partitioned by ("users", "date"). @claire has recently worked on some partition mappings for multi-partitioned assets. Claire - thoughts on how to handle this?
c
Hi Binoy. I think we need a multi-to-multi partition mapping that can use the
TimeWindowPartitionMapping
to map different time dimensions to each other, which currently does not exist right now. Would you mind filing an issue for this?
s
hi y'all - did an issue ever get filed for this one? I haven't been able to find it, and would love to follow along. We have a similar setup, where our upstream asset is
("account_id", "day")
, our downstream asset is
("account_id", "month")
, and we'd like the downstream "month" asset to depend on the last upstream day of that month's asset. I'm currently working on a custom mapping class for this (seems like it should be straightforward?), but a natively-supported multi-mapping probably avoids the trial-and-error I'm about to go through learning this API 🙂
c
Hi Sri! No issue yet to my knowledge--would you mind filing one? I can try to take it on sometime next week, agree that this would be a good value add
s
i'll do it right now 🙂
b
Hi @claire would this issue topic be appropriate “Add support for multi-to-multi partition mapping with TimeWindowPartitions” ?
c
yes that makes sense to me
s
@Binoy Shah if you're filing an issue right now, i'll hold off -- would rather not create duplicates, and I can just chime in on the one you create!
b
Adding as we speak..
👍🏾 1
c
Thanks!
b
BTW, I had asked the question with more details again, here too with more information surrounding it.. Just FYI https://dagster.slack.com/archives/C01U954MEER/p1677853956896529
Thanks Sandy and Clair for resurfacing this
s
fyi - i managed to work around this for a little while (we were ok with recomputing the dependent asset as part of computing the rollup, at least for one use case, so we managed to solve it purely with sensors), but ran into another where that recomputation is prohibitively expensive. it's a slightly trickier one (but should fit into the abstraction i saw in the PR): • upstream is partitioned by (
account_id
,
month
) • downstream is also partitioned by (
account_id
,
month
), but this time depends on all preceding months [the difference between "all" and "all preceding" isn't important here -- we don't expect to recompute old stuff very often] thoughts / other ideas for workarounds while y'all are solidifying the new functionality here?
c
For now the identity partition mapping is a solid workaround since you dont expect to recompute old partitions, but to support the preceding partitions we'd ideally add some sort of
PrecedingPartitionMapping
and allow you to specify that in your multipartition mapping (if this pr makes it through)
s
@claire maybe i'm not quite understanding - how will the downstream job know to load all relevant partitions of the upstream job? it seems like it's currently just loading the one matching partition, which isn't what i want
c
The upstream partitions an asset's partition depends are defined in the asset's partition mapping object, which defines a function like the below code snippet.
Copy code
def get_upstream_partitions_for_partitions(
        self,
        downstream_partitions_subset: Optional[PartitionsSubset],
        upstream_partitions_def: PartitionsDefinition,
        dynamic_partitions_store: Optional[DynamicPartitionsStore] = None,
    ) -> PartitionsSubset:
        partition_key = downstream_partitions_subset.get_partition_keys()[0] # assume only one key
        return upstream_partitions_def.empty_subset().with_partition_key_range(PartitionKeyRange(upstream_partitions_def.get_first_partition_key(), partition_key))
If we created a custom
PrecedingPartitionMapping
that contained a function like the one above, we'd be able to define that the downstream asset's partition
X
depends on all upstream partitions between the first partition and
X
.
s
sorry, let me clarify - I understand how
PrecedingPartitionsMapping
would help here! I don't think I understand, however, how using
IdentityPartitionsMapping
is a workaround, since it doesn't specify the dependency between a downstream partition and preceding upstream partitions. (Maybe you're saying "it should work if you never compute downstream assets before their corresponding upstreams", which I agree with?)
c
Ah my bad! I meant to say that using the
AllPartitionsMapping
is a workaround if you don't recompute old partitions very frequently, so if you materialize new partitions after they exist they should only incorporate the preceding months up to the month of the partition.
s
oh, cool, that makes sense! thanks