Jonathan Phoon
03/03/2024, 1:17 PMScout
03/03/2024, 1:17 PMpython from dagster import ( DynamicPartitionsDefinition, asset, sensor, SensorResult, RunRequest, AssetSelection, ) files_partitions_def = DynamicPartitionsDefinition(name="files") @asset(partitions_def=files_partitions_def) def files_metadata(): ... @sensor( asset_selection=AssetSelection.keys( files_metadata.key ) ) def file_sensor(context): new_files = [ filename for filename in os.listdir(os.getenv("MY_DIRECTORY")) if not context.instance.has_dynamic_partition( files_partitions_def.name, filename ) ] return SensorResult( run_requests=[ RunRequest(partition_key=filename) for filename in new_files ], dynamic_partitions_requests=[ files_partitions_def.build_add_request(new_files) ], )
In this example, the file_sensor
sensor is used to add new partitions based on the presence of new files in a directory. However, there is a mention in the discussions that suggests it might be possible to add dynamic partitions programmatically without using sensors, by calling a method on the DagsterInstance
object: python DagsterInstance.get().add_dynamic_partition
This method is mentioned in the context of a discussion where a user wanted to update partition keys automatically without materializing them, and they found a way to do so by creating a job and using the add_dynamic_partition
method. While the documentation primarily illustrates the use of sensors for managing dynamic partitions, this discussion suggests that there may be alternative ways to programmatically manage dynamic partitions. However, the documentation does not provide a detailed example or further clarification on this method, so it's recommended to explore the Dagster codebase or reach out to the Dagster community for more specific guidance on this approach.