Nima Rahnemoon
08/21/2023, 11:42 PMDatasetToImagePartitionMapping(PartitionMapping):
. The problem is when I go in the reverse direction the input to the dataset-paritioned-asset I want to generate needs to be a list of image-partitioned-assets.claire
08/22/2023, 9:33 PMNima Rahnemoon
08/22/2023, 9:35 PMNima Rahnemoon
08/22/2023, 9:36 PMNima Rahnemoon
08/22/2023, 9:37 PM@asset(
partitions_def=dataset_partitions_def,
name="datset_asset",
ins={
"image_asset": AssetIn(partition_mapping=ImageToDatasetPartitionMapping())
}
)
def datset_asset(context, image_asset: List[dict]) -> dict:
Nima Rahnemoon
08/22/2023, 9:38 PMimage_asset
that goes into the dataset_asset
plural?claire
08/22/2023, 9:41 PM@asset(...)
def dataset_asset(context, image_asset_1, image_asset_2, ...):
...
If you don't know the full set of input `image_asset`s, you could do use an asset factory: https://github.com/dagster-io/dagster/discussions/11045Nima Rahnemoon
08/22/2023, 9:43 PMNima Rahnemoon
08/22/2023, 9:45 PM"dataset0 /path/to/image.jpeg"
and the partition key for dataset is "dataset0"
so it's very easy to get the partition mapping between the two. E.g., Image<"dataset0 /path/to/image1.jpeg">
-> Dataset<"dataset0">
, Image<"dataset1 /path/to/image2.jpeg">
-> Dataset<"dataset1">
.Nima Rahnemoon
08/22/2023, 9:45 PMAssetIn(partition_mapping=)
method?Nima Rahnemoon
08/22/2023, 9:50 PMget_upstream_mapped_partitions_result_for_partitions
and get_downstream_partitions_for_partitions
- I currently only use get_upstream_mapped_partitions_result_for_partitions
(dataset->image) but can I use get_downstream_partitions_for_partitions
somehow to get the image->datasetclaire
08/22/2023, 9:51 PMget_downstream_partitions_for_partitions
should just be the inverse method of get_upstream_mapped_partitions_result_for_partitions
. So if you have get_upstream_mapped_partitions_result_for_partitions
defined, you should be able to use get_downstream_partitions_for_partitions
Nima Rahnemoon
08/22/2023, 9:53 PMclaire
08/22/2023, 9:59 PMdatasets
asset that is dynamically partitioned by dataset, a downstream images
asset that is dynamically partitioned by image, and then you want to add a third datasets_2
asset that is downstream of images
and also partitioned by dataset.
If you're using IO managers, when you execute a partition dataset_partition
of datasets_2
, the IO manager will automatically call get_upstream_mapped_partitions_result_for_partitions
with dataset_partition
to fetch the image partitions upstream of dataset_partition
, and then back the persisted outputs for each of those partitions.
So if you were using the default IO manager fs_io_manager
, the fs_io_manager
would return a mapping of partition key -> output for image partition. So your dataset_2
asset definition would be the following
@asset(
partitions_def=dataset_partitions_def,
name="datset_asset",
ins={
"image_asset": AssetIn(partition_mapping=ImageToDatasetPartitionMapping())
}
)
def dataset_2(context, image_asset: Dict[str, <whatever type your image output is>]) -> dict:
Nima Rahnemoon
08/22/2023, 10:01 PMNima Rahnemoon
08/22/2023, 10:01 PMimage_asset: Dict[str, <whatever type your image output is>]
dict thing where the key is the image partition key?Nima Rahnemoon
08/22/2023, 10:03 PMDatasetToImagePartitionMapping
, do I have to also have ImageToDatasetPartitionMapping
or can I just use DatasetToImagePartitionMapping
and dagster will be smart enough to call the get_downstream_partitions_for_partitions
instead of the get_upstream_mapped_partitions_result_for_partitions
functionclaire
08/22/2023, 10:03 PMDict[str, <whatever type your image output is>]
is just the return type for the default fs_io_manager
though. It could be different depending on what contents are being loaded, so for example, because the snowflake IO manager can load multiple partitions' contents in a single dataframe, a single dataframe would be the return typeclaire
08/22/2023, 10:05 PMOne more minor question I already haveI believe you would have to define a different partition mapping in that case., do I have to also haveDatasetToImagePartitionMapping
or can I just useImageToDatasetPartitionMapping
and dagster will be smart enough to call theDatasetToImagePartitionMapping
instead of theget_downstream_partitions_for_partitions
functionget_upstream_mapped_partitions_result_for_partitions
claire
08/22/2023, 10:07 PMNima Rahnemoon
08/22/2023, 10:11 PMNima Rahnemoon
08/22/2023, 10:12 PMNima Rahnemoon
08/22/2023, 10:12 PMclaire
08/22/2023, 10:23 PMclaire
08/22/2023, 10:24 PMStaticPartitionsMapping
. Then, when new partitions exist, you'd have to reload the code location to load those new partitionsNima Rahnemoon
08/22/2023, 10:24 PMNima Rahnemoon
08/22/2023, 10:25 PMclaire
08/22/2023, 10:25 PMdataset_partitions_def = StaticPartitionsDefinition(get_list_of_datasets())
Nima Rahnemoon
08/22/2023, 10:25 PMNima Rahnemoon
08/22/2023, 10:26 PMNima Rahnemoon
08/22/2023, 10:26 PMStaticPartitionMapping?
claire
08/22/2023, 10:28 PMStaticPartitionMapping(downstream_partition_keys_by_upstream_partition_keys={"image1": "dataset1"...})
but similar to the partitions def, it would be better to define a custom function that returns that mappingNima Rahnemoon
08/22/2023, 10:28 PMNima Rahnemoon
08/22/2023, 10:28 PMclaire
08/22/2023, 10:29 PMNima Rahnemoon
08/22/2023, 10:30 PMNima Rahnemoon
08/22/2023, 10:30 PMNima Rahnemoon
08/22/2023, 10:30 PMclaire
08/22/2023, 10:31 PMNima Rahnemoon
08/22/2023, 10:41 PMDatabaseResource(ConfigurableResource)
Nima Rahnemoon
08/22/2023, 10:42 PMget_list_of_datasets()
functionNima Rahnemoon
08/22/2023, 10:42 PMNima Rahnemoon
08/22/2023, 10:42 PMclaire
08/23/2023, 4:11 PMNima Rahnemoon
08/23/2023, 7:37 PM