Benjamin Weise
03/22/2023, 12:16 AMclaire
03/22/2023, 5:43 PMids = StaticPartitionsDefinition(["1", "2", ...])
@asset(partitions_def=ids)
def equipment_by_id(context):
...
@asset(partitions_def=StaticPartitionsDefinition(["94109", "94101", ...]))
def weather_by_postal_code(context):
...
@asset(
partitions_def=ids,
ins={
"weather_by_postal_code": AssetIn(
partition_mapping=StaticPartitionMapping(
downstream_partition_keys_by_upstream_partition_key={"94109": "1", "94101": "2", ...}
),
),
},
)
def equipment_transformed_by_weather_by_id(context, equipment_by_id, weather_by_postal_code):
...
The static partition mapping defined on the downstream asset contains the mapping of postal code -> ID, so the equipment_by_id
input represents the equipment for a certain id partition, and the weather_by_postal_code
input represents the weather for the postal code of that equipment IDBenjamin Weise
03/22/2023, 9:17 PMclaire
03/22/2023, 9:30 PMdownstream_partition_keys_by_upstream_partition_key
mapping. When your Definitions
object is loaded, this function is called and the partition mapping is instantiated from there.
But Dagster doesn't currently support PartitionMapping
objects that just accept a user-defined function, mostly because calling out to this user-defined function during job execution may be costly.Benjamin Weise
03/22/2023, 9:33 PMPartitionMapping
to see if I can get it to do what I want, but I imagine their are plenty of potential pitfalls with that approach.claire
03/22/2023, 9:38 PMPartitionMapping
class because the APIs are in flux. I'd recommend filing an issue for this use case in the meantime 🌈Benjamin Weise
03/23/2023, 12:40 AMBenjamin Weise
03/24/2023, 12:21 AM{
"postcode1": [
"equipment1",
"equipment2"
]
}