How to set a PartitionMapping mapping the last 10 ...
# ask-ai
r
How to set a PartitionMapping mapping the last 10 time partitions of a parent asset to the child asset ?
s
To set a
PartitionMapping
that maps the last 10 time partitions of a parent asset to a child asset, you would typically use a
TimeWindowPartitionMapping
with specific offsets. However, as of the last update from the documentation, there isn't a built-in
PartitionMapping
that directly supports mapping exactly the last 10 partitions of an upstream asset to a downstream asset. The
TimeWindowPartitionMapping
allows you to specify
start_offset
and
end_offset
to define a window of time for which partitions should be mapped. For example, if you have daily partitions and you want to map the previous day's partition to the current partition, you could use a
start_offset
and
end_offset
of -1. Here is an example from the documentation on how to use
TimeWindowPartitionMapping
with offsets:
Copy code
python from dagster import (     AssetIn,     DailyPartitionsDefinition,     TimeWindowPartitionMapping,     asset, )  partitions_def = DailyPartitionsDefinition(start_date="2023-01-21")  @asset(partitions_def=partitions_def) def events():     ...  @asset(     partitions_def=partitions_def,     ins={         "events": AssetIn(             partition_mapping=TimeWindowPartitionMapping(                 start_offset=-1, end_offset=-1             ),         )     }, ) def yesterday_event_stats(events):     ...
However, this example only maps a single partition. To map the last 10 partitions, you would need to adjust the
start_offset
to -10. But there's a caveat: the
TimeWindowPartitionMapping
does not directly support specifying a fixed number of partitions to map, and using a large negative number as the
start_offset
might not be the most performant or straightforward solution, especially if the partitions definition isn't a basic daily or hourly one. As of now, if you need to map exactly the last 10 partitions, you might need to implement a custom solution, such as a custom sensor or logic within your asset code to determine which partitions to depend on. The documentation does not provide a direct example of a
PartitionMapping
for this specific use case. For the most accurate and up-to-date information, please refer to the [Dagster documentation](https://docs.dagster.io/concepts/partitions-schedules-sensors/partitioning-assets) or consider reaching out on the Dagster GitHub Discussions or Issues for further assistance.
r
@Scout How to create an unpartitioned child asset based on th last 10 hourly partitions of a parent asset ?
s
Creating an unpartitioned child asset that depends on the last 10 hourly partitions of a parent asset is not directly supported by a built-in
PartitionMapping
in Dagster. However, you can achieve this by writing custom logic within your asset computation function to reference the specific partitions of the upstream asset that you are interested in. Here's a conceptual example of how you might approach this: 1. Define your hourly partitioned parent asset. 2. Define your unpartitioned child asset. 3. Within the computation function of your child asset, use custom logic to determine the last 10 partition keys of the parent asset and retrieve the corresponding data. Here's a rough sketch of what this might look like in code:
Copy code
python
from dagster import asset, repository, DailyPartitionsDefinition, build_assets_job
from datetime import datetime, timedelta

# Define your hourly partitions for the parent asset
hourly_partitions_def = DailyPartitionsDefinition(
    start_date="2023-01-01",
    cron_schedule="0 * * * *",  # Hourly schedule
)

@asset(partitions_def=hourly_partitions_def)
def hourly_partitioned_asset(context):
    # Your logic to produce the partitioned data
    ...

# Define your unpartitioned child asset
@asset
def unpartitioned_child_asset(context, hourly_partitioned_asset):
    # Custom logic to determine the last 10 partition keys
    end_time = datetime.now()
    start_time = end_time - timedelta(hours=10)
    
    # Generate the partition keys for the last 10 hours
    partition_keys = [
        hourly_partitions_def.get_partition_key(datetime(year=start_time.year, month=start_time.month, day=start_time.day, hour=hour))
        for hour in range(start_time.hour, end_time.hour)
    ]
    
    # Retrieve data for the last 10 partitions
    # This is a conceptual example; you would need to implement the actual data retrieval based on your storage and execution environment
    data_for_last_10_partitions = [
        context.get_partitioned_data(hourly_partitioned_asset, partition_key)
        for partition_key in partition_keys
    ]
    
    # Your logic to process the data from the last 10 partitions
    ...

# Define your job to materialize the assets
assets_job = build_assets_job("my_assets_job", assets=[hourly_partitioned_asset, unpartitioned_child_asset])

@repository
def my_repository():
    return [assets_job]
In this example,
context.get_partitioned_data
is a placeholder for whatever mechanism you have to retrieve the data for a given partition key. Dagster does not provide a method like this out of the box, so you would need to implement this logic yourself, potentially by querying your data storage system directly. Please note that this example is conceptual and would need to be adapted to fit the specifics of your
environment and data storage system. The key takeaway is that you would need to manually handle the logic to determine which partitions of the parent asset you want to depend on and then retrieve and process that data within your unpartitioned child asset's computation function.