Giacomo Colombo

03/06/2024, 2:31 PM
how define a sensor that is triggered when a new partition is added to an asset?


03/06/2024, 2:31 PM
To define a sensor that is triggered when a new partition is added to an asset, you can use the
decorator provided by Dagster. This decorator allows you to monitor multiple assets, including partitioned assets, and trigger runs based on materializations of those assets. Here's an example from the documentation that shows how to define a sensor that monitors an asset with daily partitions:
from dagster import asset, multi_asset_sensor, DailyPartitionsDefinition, RunRequest

def my_daily_partitioned_asset():
    # Asset computation logic
    return 1

def my_sensor(context):
    # Get the latest materialization records for the asset's partitions
    materialization_records = context.latest_materialization_records_by_partition(my_daily_partitioned_asset.key)
    # Check if there are new materializations and trigger a run if needed
    for partition_key, event_log_record in materialization_records.items():
        if event_log_record:
            # Trigger a run for the new partition
            yield RunRequest(run_key=partition_key)
In this example,
is a partitioned asset with daily partitions starting from "2022-07-01". The
function is decorated with
and specifies the asset key of
. Inside the sensor function,
is used to get the latest materialization records for each partition of the monitored asset. If there are new materializations, the sensor yields a
for each new partition. Please note that the example provided is a simplified version, and you may need to adjust the logic to fit your specific use case, such as handling partition keys more dynamically or incorporating additional logic to determine when to trigger a run.

Giacomo Colombo

03/06/2024, 2:39 PM
no in my specific case, the new partition is added dynamically without running a materialization! i want to triggere the sensor when a new partition is added, not when the last materialization happened.