My data is setup such that a dataset has many imag...
# ask-community
n
My data is setup such that a dataset has many images. I have a partition mapping that allows me to take a dataset-partitioned-asset and generate image-partitioned-assets. Now, I want to take those image-partitioned-assets and aggregate them into a singular dataset-partitioned-asset. Is there a way to do that? I.e., my parition mapping for sending a dataset-partitioned-asset as an input to generate an image-paritioned-asset is
DatasetToImagePartitionMapping(PartitionMapping):
. The problem is when I go in the reverse direction the input to the dataset-paritioned-asset I want to generate needs to be a list of image-partitioned-assets.
c
Hi Nima. What types of partitions definitions are you using? If you're using static partitions for both the Dataset and Image partitions definitions, you should be able to define a different StaticPartitionsMapping for the dataset -> image dependency, and the image -> dataset dependency
n
They're dynamic partitions but I know the mapping
But let's say I'm going from images->dataset. My dataset-level asset will take in a list of image-level assets?
Copy code
@asset(
    partitions_def=dataset_partitions_def,
    name="datset_asset",
    ins={
        "image_asset": AssetIn(partition_mapping=ImageToDatasetPartitionMapping())
    }
)
def datset_asset(context, image_asset: List[dict]) -> dict:
Like how do I make the
image_asset
that goes into the
dataset_asset
plural?
c
Do you know the full set of input `image_asset`s at definition time? If so, you could list them all as dependencies:
Copy code
@asset(...)
def dataset_asset(context, image_asset_1, image_asset_2, ...):
   ...
If you don't know the full set of input `image_asset`s, you could do use an asset factory: https://github.com/dagster-io/dagster/discussions/11045
n
The image and datasets are dynamic partitions, so they're not known at definition time.
The partition key for image is
"dataset0 /path/to/image.jpeg"
and the partition key for dataset is
"dataset0"
so it's very easy to get the partition mapping between the two. E.g.,
Image<"dataset0 /path/to/image1.jpeg">
->
Dataset<"dataset0">
,
Image<"dataset1 /path/to/image2.jpeg">
->
Dataset<"dataset1">
.
Is there no way to just use the
AssetIn(partition_mapping=)
method?
I noticed PartitionMapping has
get_upstream_mapped_partitions_result_for_partitions
and
get_downstream_partitions_for_partitions
- I currently only use
get_upstream_mapped_partitions_result_for_partitions
(dataset->image) but can I use
get_downstream_partitions_for_partitions
somehow to get the image->dataset
c
Yes,
get_downstream_partitions_for_partitions
should just be the inverse method of
get_upstream_mapped_partitions_result_for_partitions
. So if you have
get_upstream_mapped_partitions_result_for_partitions
defined, you should be able to use
get_downstream_partitions_for_partitions
n
But then how do I have a plural asset input because each dataset has many images
c
Let me make sure I understand correctly--my understanding is that you have an upstream
datasets
asset that is dynamically partitioned by dataset, a downstream
images
asset that is dynamically partitioned by image, and then you want to add a third
datasets_2
asset that is downstream of
images
and also partitioned by dataset. If you're using IO managers, when you execute a partition
dataset_partition
of
datasets_2
, the IO manager will automatically call
get_upstream_mapped_partitions_result_for_partitions
with
dataset_partition
to fetch the image partitions upstream of
dataset_partition
, and then back the persisted outputs for each of those partitions. So if you were using the default IO manager
fs_io_manager
, the
fs_io_manager
would return a mapping of partition key -> output for image partition. So your
dataset_2
asset definition would be the following
Copy code
@asset(
    partitions_def=dataset_partitions_def,
    name="datset_asset",
    ins={
        "image_asset": AssetIn(partition_mapping=ImageToDatasetPartitionMapping())
    }
)
def dataset_2(context, image_asset: Dict[str, <whatever type your image output is>]) -> dict:
n
Ooo nice! Yes, I think your understanding is on the money
So basically Dagster is smart enough to make this
image_asset: Dict[str, <whatever type your image output is>]
dict thing where the key is the image partition key?
One more minor question I already have
DatasetToImagePartitionMapping
, do I have to also have
ImageToDatasetPartitionMapping
or can I just use
DatasetToImagePartitionMapping
and dagster will be smart enough to call the
get_downstream_partitions_for_partitions
instead of the
get_upstream_mapped_partitions_result_for_partitions
function
c
Yes, exactly. The
Dict[str, <whatever type your image output is>]
is just the return type for the default
fs_io_manager
though. It could be different depending on what contents are being loaded, so for example, because the snowflake IO manager can load multiple partitions' contents in a single dataframe, a single dataframe would be the return type
👀 1
👌 1
One more minor question I already have
DatasetToImagePartitionMapping
, do I have to also have
ImageToDatasetPartitionMapping
or can I just use
DatasetToImagePartitionMapping
and dagster will be smart enough to call the
get_downstream_partitions_for_partitions
instead of the
get_upstream_mapped_partitions_result_for_partitions
function
I believe you would have to define a different partition mapping in that case.
But also, defining custom partition mappings is unsupported since the type signatures are in flux, so I'm surprised that you've been able to execute your assets as expected. We do have an issue for defining custom mappings between dynamic partitions defs: https://github.com/dagster-io/dagster/issues/13139
n
Oof, that may explain why it's buggy. I went down this rabbit-hole because that's what Sean advised.
It works when I generate one partition at a time
But when I backfill more than one partition, the partition mapping isn't getting applied
c
Ah yeah... sorry about sending you down a rabbit hole, I can update the documentation to emphasize that this is unsupported. In backfill execution, calling out to custom user-defined functions in partition mappings is especially expensive, so we load a serializable representation of partition mappings instead which means only built-in partition mappings are supported
If it's possible for you to use static partitions definitions instead, you could do that and define a
StaticPartitionsMapping
. Then, when new partitions exist, you'd have to reload the code location to load those new partitions
n
That's nearly impossible. We have a lot of datasets and each dataset has lots of images. It would be very weird to hard-code them in code.
Given all the context you have, do you have any other recs for how to organize it
c
You don't need to hard code them, if you had a function that could fetch the list of datasets you could do something like:
Copy code
dataset_partitions_def = StaticPartitionsDefinition(get_list_of_datasets())
n
Oh neat
And then what would the mapping look like
StaticPartitionMapping?
c
yes, you would just have to provide a dictionary i.e.
StaticPartitionMapping(downstream_partition_keys_by_upstream_partition_keys={"image1": "dataset1"...})
but similar to the partitions def, it would be better to define a custom function that returns that mapping
1
n
Mm, actually the other issue with this is we have new datasets come in every day
So we'd have to restart the prod dagster to reinit the partition def?
c
you'd have to reload the code location. So you can click the "redeploy" button
n
Ok, we'll go with that
Thanks 🍻
IOU a beer
c
np, happy to help 🍺
🌈 1
n
https://dagster.slack.com/archives/C01U954MEER/p1692743141657789?thread_ts=1692661377.527099&amp;cid=C01U954MEER One quick follow-up to this. I have a
DatabaseResource(ConfigurableResource)
Is it possible to somehow get that resource to the
get_list_of_datasets()
function
So that function can query the DB through the resource
Or is it too early in the setup process for the resource to exist
c
the resource hasn't been instantiated yet at that time. Resources are instantiated when a process spins up to execute a given asset, not at definition time
n
Ah ok, thanks 🙏