https://dagster.io/ logo
Title
b

Binoy Shah

03/01/2023, 2:01 PM
Hi, what is the best way to do Fan-In using Partitions and Assets ? I want to scan S3 files using Daily and Hourly Partition Scheme, but then Merge them on a Daily Partition basis. I need to leverage multiple partitioning schemes to process data and combine/merge according to day granularity
s

sandy

03/03/2023, 12:45 AM
So if I understand correctly, you have two multi-partitioned assets. The upstream one is partitioned by ("users", "hour") and the downstream one is partitioned by ("users", "date"). @claire has recently worked on some partition mappings for multi-partitioned assets. Claire - thoughts on how to handle this?
c

claire

03/03/2023, 5:24 PM
Hi Binoy. I think we need a multi-to-multi partition mapping that can use the
TimeWindowPartitionMapping
to map different time dimensions to each other, which currently does not exist right now. Would you mind filing an issue for this?
s

sri raghavan

03/10/2023, 11:01 PM
hi y'all - did an issue ever get filed for this one? I haven't been able to find it, and would love to follow along. We have a similar setup, where our upstream asset is
("account_id", "day")
, our downstream asset is
("account_id", "month")
, and we'd like the downstream "month" asset to depend on the last upstream day of that month's asset. I'm currently working on a custom mapping class for this (seems like it should be straightforward?), but a natively-supported multi-mapping probably avoids the trial-and-error I'm about to go through learning this API 🙂
c

claire

03/10/2023, 11:02 PM
Hi Sri! No issue yet to my knowledge--would you mind filing one? I can try to take it on sometime next week, agree that this would be a good value add
s

sri raghavan

03/10/2023, 11:02 PM
i'll do it right now 🙂
b

Binoy Shah

03/10/2023, 11:06 PM
Hi @claire would this issue topic be appropriate “Add support for multi-to-multi partition mapping with TimeWindowPartitions” ?
c

claire

03/10/2023, 11:06 PM
yes that makes sense to me
s

sri raghavan

03/10/2023, 11:07 PM
@Binoy Shah if you're filing an issue right now, i'll hold off -- would rather not create duplicates, and I can just chime in on the one you create!
b

Binoy Shah

03/10/2023, 11:07 PM
Adding as we speak..
👍🏾 1
c

claire

03/10/2023, 11:10 PM
Thanks!
b

Binoy Shah

03/10/2023, 11:14 PM
BTW, I had asked the question with more details again, here too with more information surrounding it.. Just FYI https://dagster.slack.com/archives/C01U954MEER/p1677853956896529
Thanks Sandy and Clair for resurfacing this
s

sri raghavan

03/17/2023, 10:07 AM
fyi - i managed to work around this for a little while (we were ok with recomputing the dependent asset as part of computing the rollup, at least for one use case, so we managed to solve it purely with sensors), but ran into another where that recomputation is prohibitively expensive. it's a slightly trickier one (but should fit into the abstraction i saw in the PR): • upstream is partitioned by (
account_id
,
month
) • downstream is also partitioned by (
account_id
,
month
), but this time depends on all preceding months [the difference between "all" and "all preceding" isn't important here -- we don't expect to recompute old stuff very often] thoughts / other ideas for workarounds while y'all are solidifying the new functionality here?
c

claire

03/17/2023, 4:55 PM
For now the identity partition mapping is a solid workaround since you dont expect to recompute old partitions, but to support the preceding partitions we'd ideally add some sort of
PrecedingPartitionMapping
and allow you to specify that in your multipartition mapping (if this pr makes it through)
s

sri raghavan

03/17/2023, 11:52 PM
@claire maybe i'm not quite understanding - how will the downstream job know to load all relevant partitions of the upstream job? it seems like it's currently just loading the one matching partition, which isn't what i want
c

claire

03/20/2023, 9:39 PM
The upstream partitions an asset's partition depends are defined in the asset's partition mapping object, which defines a function like the below code snippet.
def get_upstream_partitions_for_partitions(
        self,
        downstream_partitions_subset: Optional[PartitionsSubset],
        upstream_partitions_def: PartitionsDefinition,
        dynamic_partitions_store: Optional[DynamicPartitionsStore] = None,
    ) -> PartitionsSubset:
        partition_key = downstream_partitions_subset.get_partition_keys()[0] # assume only one key
        return upstream_partitions_def.empty_subset().with_partition_key_range(PartitionKeyRange(upstream_partitions_def.get_first_partition_key(), partition_key))
If we created a custom
PrecedingPartitionMapping
that contained a function like the one above, we'd be able to define that the downstream asset's partition
X
depends on all upstream partitions between the first partition and
X
.
s

sri raghavan

03/20/2023, 11:31 PM
sorry, let me clarify - I understand how
PrecedingPartitionsMapping
would help here! I don't think I understand, however, how using
IdentityPartitionsMapping
is a workaround, since it doesn't specify the dependency between a downstream partition and preceding upstream partitions. (Maybe you're saying "it should work if you never compute downstream assets before their corresponding upstreams", which I agree with?)
c

claire

03/20/2023, 11:35 PM
Ah my bad! I meant to say that using the
AllPartitionsMapping
is a workaround if you don't recompute old partitions very frequently, so if you materialize new partitions after they exist they should only incorporate the preceding months up to the month of the partition.
s

sri raghavan

03/20/2023, 11:53 PM
oh, cool, that makes sense! thanks