Hi, I'm still trying to get a handle on the way as...
# ask-community
b
Hi, I'm still trying to get a handle on the way asset-backed pipelines work, and was hoping to get some help on how to think about this particular job. Essentially, I have two partitioned assets that I want to combine in a workflow: • equipment, which I'm imagining the equipment info as being a partitioned asset on the equipment id (we have a large amount of information per piece of equipment) • per-postcode weather forecasts (as a partitioned asset with the postcode as key) Each piece of equipment is located in a region, so I would like to end up with a mapping of equipment -> postcode that then runs a bunch of processing of the data and outputs an asset for each piece of equipment. This seems logical enough to me, but I'm unsure how this would translate into dagster abstractions. Can anyone set me straight on this?
c
Hi Benjamin. One way that you could do this is to make the final "equipment partitioned by ID, by transformed by postcode" asset downstream of both the equipment by ID asset and weather by postcode assets. Something like this:
Copy code
ids = StaticPartitionsDefinition(["1", "2", ...])

@asset(partitions_def=ids)
def equipment_by_id(context):
    ...


@asset(partitions_def=StaticPartitionsDefinition(["94109", "94101", ...]))
def weather_by_postal_code(context):
    ...


@asset(
    partitions_def=ids,
    ins={
        "weather_by_postal_code": AssetIn(
            partition_mapping=StaticPartitionMapping(
                downstream_partition_keys_by_upstream_partition_key={"94109": "1", "94101": "2", ...}
            ),
        ),
    },
)
def equipment_transformed_by_weather_by_id(context, equipment_by_id, weather_by_postal_code):
    ...
The static partition mapping defined on the downstream asset contains the mapping of postal code -> ID, so the
equipment_by_id
input represents the equipment for a certain id partition, and the
weather_by_postal_code
input represents the weather for the postal code of that equipment ID
❤️ 1
b
Hi Claire, thanks for the response! I'll try to digest and implement that, but it looks like a great starting point. One thing I'm mulling over is how (or whether it is possible) to do that mapping more dynamically - as in, being able to look up the location of an object and use that in the mapping (rather than as a static definition). But that is something for way down the line when I've got a better handle on dagster generally 🙂
c
One option is you can create a function that performs the lookup you mentioned and returns the
downstream_partition_keys_by_upstream_partition_key
mapping. When your
Definitions
object is loaded, this function is called and the partition mapping is instantiated from there. But Dagster doesn't currently support
PartitionMapping
objects that just accept a user-defined function, mostly because calling out to this user-defined function during job execution may be costly.
b
Thanks - I was thinking of something along those lines, with the caveat that both partition definitions are dynamic (we don't know what object IDs we have until data has been loaded). I might go down the route of trying to subclass the
PartitionMapping
to see if I can get it to do what I want, but I imagine their are plenty of potential pitfalls with that approach.
c
I definitely can see the use case for wanting some sort of custom partition mapping between two dynamic partitions definitions. I think this partition mapping class would most logically contain a function defined in user code, as dagster doesn't know what partitions exist at definition time. At the moment we don't support subclassing the
PartitionMapping
class because the APIs are in flux. I'd recommend filing an issue for this use case in the meantime 🌈
❤️ 1
b
Thanks, will do that!
Issue created here. One thing I also found was that, for the static mapping I'm using in the interim, you can pass a list of values in the dict e.g.
Copy code
{
  "postcode1": [
    "equipment1",
    "equipment2"
  ]
}
🌈 1