Hi! It's been awhile. I'm looking at using Dynamic...
# ask-community
y
Hi! It's been awhile. I'm looking at using DynamicPartitions for an upstream asset. A downstream asset would consume all the partitions of the upstream asset. Will it do this by default or do I need a PartitionMapping? Thanks!
🤖 1
A related question...Can I have another upstream asset that creates the DynamicPartitions or do I need to use a sensor like in the docs?
c
Hi Yang. By default, if an unpartitioned asset is downstream of a partitioned asset, the unpartitioned asset consumes all partitions of the upstream asset. If your downstream asset is partitioned, you'll have to specify a partition mapping
you could add dynamic partitions in another asset e.g.:
Copy code
@asset
def adds_partitions(context):
    context.instance.add_dynamic_partitions(partitions_def_name, partition_keys)
    ....
One caveat here is that unexpected behaviors may occur if you mutate the dynamic partitions def during run for one of its partitions: i.e. if you added partitions in an upstream dynamic-partitioned asset, and attempted to read all partitions in in a downstream unpartitioned asset in the same run. So I would recommend not mutating the partitions def in one of its runs.
y
oh great! yes the downstream asset will be unpartitioned. But how exactly will each partition be referenced? I saw somewhere it would be like a dictionary?
upstream_asset['part1']
is that right?
c
depends on what your IO manager is. The default
fs_io_manager
stores outputs per-partition, so returns a
Dict[str, obj]
which maps partition key to the output object. But if you had something like the snowflake IO manager which can load ranges of partitions, you would just get back a single object
y
oohh I see, I'm using the gcs_pickle_io_manager
Can Dynamic Partitions be Multipartitions?
Looks like a no, but I just want to confirm, and I'm wondering if it will be supported in the future. Thanks!
t
I just saw in the release notes of 1.2.3 that "dynamic partitions can now exist as dimensions of multi-partition definitions" (though that may be slightly different than what you are looking for?)
y
Oh??? I think that should be what I'm looking for, thanks!
Yes I see it, that's awesome. In that case, @claire, how would I have the downstream asset take all partitions of the one of the dimensions? What I mean is that the Multipartition would have 1 dimension that is Dynamic, say
name
. And the other one is
year
. Then the downstream asset would be using the partition
year
and combining all the
names
. Does that work automatically, and would the input asset also look like a dict with the
names
as the keys? Thanks!
r
Where did you see that @Timo Vink? I coulnd't find that in tha changelog.
c
By default, if a single dimensioned asset is upstream/downstream of a multipartitioned asset and the single dimension partitions def is a dimension of the multipartitions def, Dagster auto-infers a dependency relationship.
In terms of what the IO manager returns, it depends on the IO manager. For the default
fs_io_manager
, you would get a dict containing the partition key path to the object. Currently the path orders each dimension in a different folder, if you wanted to get back the multipartition key to the object that would look like this:
Copy code
@asset(partitions_def=DailyPartitionsDefinition("2023-01-01"))
def downstream_of_multipartitions_asset(upstream):
    def _get_key_from_multipartition_path(path: str) -> MultiPartitionKey:
        dimension_names = sorted(multipartitions_def.partition_dimension_names)
        keys_ordered_by_dimension = path.split("/")
        return MultiPartitionKey(
            {tup[0]: tup[1] for tup in list(zip(dimension_names, keys_ordered_by_dimension))}
        )

    object_by_key = {_get_key_from_multipartition_path(path): object for path, object in upstream.items()}
    ...
Not super ergonomic at the moment, I'd recommend filing an issue to instead return a mapping of multipartition key -> object instead of the filepath dict
y
Thanks, that's really helpful! Do you happen to know what it is for
gcs_pickle_io_manager
?
@Rubén Briones https://github.com/dagster-io/dagster/blob/master/CHANGES.md#new-1 • (experimental) Dynamic partitions definitions can now exist as dimensions of multi-partitions definitions.
❤️ 1
Hi @claire, sorry I have another question. I'm looking at adding dynamic partitions in an asset instead of sensor. I'm wondering if this still applies for the asset context
Copy code
context.instance.add_dynamic_partitions
c
yes it does, its the same method on the instance
🎉 1
y
How do I get the existing list of partitions? I was trying to do this but it didn't work.
Copy code
# existing_partitions = sfdr_metric_partitions_def.get_partitions()
t
I believe you can do:
Copy code
context.instance.get_dynamic_partitions(sfdr_metric_partitions_def.name)
🌈 1
y
oh thank you!
yes, verified
c
Hey Yang, reviving this thread as we're thinking about API improvements for adding dynamic partitions within assets. What's your use case for doing something like this? Are there any assets downstream of your asset that adds the dynamic partitions?
y
Hi! Oh great. No I can have a separate job that adds the dynamic partitions. Then another job that will materialize them. The use case is I have a configurable scoring system where there are metrics that are configured in a yaml file. Each different metric is a DynamicPartition. Then a downstream asset loads all the metric partitions and formats an output.
These should be multipartitions where the other partition with the metric is Year
Ideally I'd like to have partitions only materialize if they're missing in that job.
c
Ah ok, thanks! this makes sense to me