Giacomo Colombo
03/06/2024, 2:45 PMdynamic_partitions_requests.append(
files_partitions_def.build_add_request['the_partition_name'])
without running the materialization.
So i don't want to get the latest materialization records for the asset's partitions, but to the get the latest asset in which a partition was added. it is possible to achieve this?Scout
03/06/2024, 2:45 PM@sensor
decorator along with the DynamicPartitionsDefinition
for your asset. The sensor can check for new partitions by looking at the environment, such as a directory for new files, and then use the DynamicPartitionsDefinition
to build requests for adding those new partitions.
Here's an example from the documentation that demonstrates how to detect when new files are added to a folder and kick off runs to process just those new files. This example uses a DynamicPartitionsDefinition
and a sensor to add new partitions without immediately running the materialization:
python
from dagster import (
DynamicPartitionsDefinition,
asset,
sensor,
SensorResult,
RunRequest,
AssetSelection,
)
files_partitions_def = DynamicPartitionsDefinition(name="files")
@asset(partitions_def=files_partitions_def)
def files_metadata():
...
@sensor(
asset_selection=AssetSelection.keys(
files_metadata.key
)
)
def file_sensor(context):
new_files = [
filename
for filename in os.listdir(os.getenv("MY_DIRECTORY"))
if not context.instance.has_dynamic_partition(
files_partitions_def.name, filename
)
]
return SensorResult(
run_requests=[
RunRequest(partition_key=filename) for filename in new_files
],
dynamic_partitions_requests=[
files_partitions_def.build_add_request(new_files)
],
)
In this example, the file_sensor
checks for new files in a directory and then creates a SensorResult
that includes both run_requests
and dynamic_partitions_requests
. The dynamic_partitions_requests
are built using files_partitions_def.build_add_request(new_files)
, which adds new partitions corresponding to the new files.
If you want to only add partitions without triggering materialization, you can omit the run_requests
and only include dynamic_partitions_requests
in the SensorResult
. This way, the sensor will only add the new partitions to the asset without starting any runs.
Remember to replace os.getenv("MY_DIRECTORY")
with the actual path to the directory you are monitoring for new files, and adjust the logic inside the sensor to match your specific use case for detecting new partitions. Is there a way to trigger an asset mater... button provide run config to run_request_for_pa... button assets with dynamic mapping and dynamic ... button In a sensor, how do I request a run for ... button Thank you for your feedback!