https://dagster.io/ logo
#ask-ai
Title
# ask-ai
g

Giacomo Colombo

03/06/2024, 2:45 PM
how define a sensor that is triggered when a new partition is added to an asset? In my case the partition is added with
Copy code
dynamic_partitions_requests.append(
    files_partitions_def.build_add_request['the_partition_name'])
without running the materialization. So i don't want to get the latest materialization records for the asset's partitions, but to the get the latest asset in which a partition was added. it is possible to achieve this?
s

Scout

03/06/2024, 2:45 PM
To define a sensor that is triggered when a new partition is added to an asset, you can use the
@sensor
decorator along with the
DynamicPartitionsDefinition
for your asset. The sensor can check for new partitions by looking at the environment, such as a directory for new files, and then use the
DynamicPartitionsDefinition
to build requests for adding those new partitions. Here's an example from the documentation that demonstrates how to detect when new files are added to a folder and kick off runs to process just those new files. This example uses a
DynamicPartitionsDefinition
and a sensor to add new partitions without immediately running the materialization:
Copy code
python
from dagster import (
    DynamicPartitionsDefinition,
    asset,
    sensor,
    SensorResult,
    RunRequest,
    AssetSelection,
)

files_partitions_def = DynamicPartitionsDefinition(name="files")

@asset(partitions_def=files_partitions_def)
def files_metadata():
   ...

@sensor(
   asset_selection=AssetSelection.keys(
       files_metadata.key
   )
)
def file_sensor(context):
    new_files = [
        filename
        for filename in os.listdir(os.getenv("MY_DIRECTORY"))
        if not context.instance.has_dynamic_partition(
            files_partitions_def.name, filename
        )
    ]

    return SensorResult(
        run_requests=[
            RunRequest(partition_key=filename) for filename in new_files
        ],
        dynamic_partitions_requests=[
            files_partitions_def.build_add_request(new_files)
        ],
    )
In this example, the
file_sensor
checks for new files in a directory and then creates a
SensorResult
that includes both
run_requests
and
dynamic_partitions_requests
. The
dynamic_partitions_requests
are built using
files_partitions_def.build_add_request(new_files)
, which adds new partitions corresponding to the new files. If you want to only add partitions without triggering materialization, you can omit the
run_requests
and only include
dynamic_partitions_requests
in the
SensorResult
. This way, the sensor will only add the new partitions to the asset without starting any runs. Remember to replace
os.getenv("MY_DIRECTORY")
with the actual path to the directory you are monitoring for new files, and adjust the logic inside the sensor to match your specific use case for detecting new partitions. Is there a way to trigger an asset mater... button provide run config to run_request_for_pa... button assets with dynamic mapping and dynamic ... button In a sensor, how do I request a run for ... button Thank you for your feedback!
2 Views