Yang
03/29/2023, 8:58 PMYang
03/29/2023, 9:00 PMclaire
03/29/2023, 11:03 PMclaire
03/29/2023, 11:13 PM@asset
def adds_partitions(context):
context.instance.add_dynamic_partitions(partitions_def_name, partition_keys)
....
One caveat here is that unexpected behaviors may occur if you mutate the dynamic partitions def during run for one of its partitions: i.e. if you added partitions in an upstream dynamic-partitioned asset, and attempted to read all partitions in in a downstream unpartitioned asset in the same run. So I would recommend not mutating the partitions def in one of its runs.Yang
03/29/2023, 11:28 PMupstream_asset['part1']
Yang
03/29/2023, 11:28 PMclaire
03/29/2023, 11:31 PMfs_io_manager
stores outputs per-partition, so returns a Dict[str, obj]
which maps partition key to the output object.
But if you had something like the snowflake IO manager which can load ranges of partitions, you would just get back a single objectYang
03/29/2023, 11:37 PMYang
03/30/2023, 12:12 AMYang
03/30/2023, 12:13 AMTimo Vink
03/30/2023, 1:06 AMYang
03/30/2023, 1:20 AMYang
03/30/2023, 4:36 AMname
. And the other one is year
. Then the downstream asset would be using the partition year
and combining all the names
. Does that work automatically, and would the input asset also look like a dict with the names
as the keys? Thanks!Rubén Briones
03/30/2023, 7:11 AMclaire
03/30/2023, 4:36 PMclaire
03/30/2023, 4:37 PMfs_io_manager
, you would get a dict containing the partition key path to the object. Currently the path orders each dimension in a different folder, if you wanted to get back the multipartition key to the object that would look like this:
@asset(partitions_def=DailyPartitionsDefinition("2023-01-01"))
def downstream_of_multipartitions_asset(upstream):
def _get_key_from_multipartition_path(path: str) -> MultiPartitionKey:
dimension_names = sorted(multipartitions_def.partition_dimension_names)
keys_ordered_by_dimension = path.split("/")
return MultiPartitionKey(
{tup[0]: tup[1] for tup in list(zip(dimension_names, keys_ordered_by_dimension))}
)
object_by_key = {_get_key_from_multipartition_path(path): object for path, object in upstream.items()}
...
claire
03/30/2023, 4:38 PMYang
03/30/2023, 6:37 PMgcs_pickle_io_manager
?Yang
03/30/2023, 6:41 PMYang
03/30/2023, 8:54 PMcontext.instance.add_dynamic_partitions
claire
03/30/2023, 8:55 PMYang
03/30/2023, 11:59 PM# existing_partitions = sfdr_metric_partitions_def.get_partitions()
Timo Vink
03/31/2023, 12:01 AMcontext.instance.get_dynamic_partitions(sfdr_metric_partitions_def.name)
Yang
03/31/2023, 12:04 AMYang
03/31/2023, 6:23 AMclaire
04/28/2023, 3:10 PMYang
04/28/2023, 11:28 PMYang
04/28/2023, 11:29 PMYang
04/28/2023, 11:29 PMclaire
05/01/2023, 1:32 PM