Josh
02/26/2024, 1:40 PMScout
02/26/2024, 1:40 PM@sensor
decorator with the asset_selection
parameter to target a selection of assets. When new files are detected, the sensor can generate `RunRequest`s for each new file, treating each file as a new partition. Here's a simplified version of the example from the documentation:
python
from dagster import sensor, SensorResult, RunRequest, AssetSelection
@sensor(
asset_selection=AssetSelection.keys("my_asset_key")
)
def new_partition_sensor(context):
# Logic to detect new partitions goes here
new_partitions = [...] # List of new partition keys
return SensorResult(
run_requests=[
RunRequest(partition_key=partition_key) for partition_key in new_partitions
]
)
In this example, new_partitions
would be a list of partition keys representing the newly added partitions that the sensor has detected. The sensor then creates a RunRequest
for each new partition key, which can trigger a run to process that partition.
Please note that the actual logic for detecting new partitions would depend on your specific use case and data source. The example provided is a conceptual illustration based on the documentation's guidance. Use Sensors with Inotify button Dagster 1.1: Thank U, Next | Dagster Blo... button How do I use a sensor with a partition d... button Reconciliation sensor behaviour on parti... button Thank you for your feedback!Josh
02/26/2024, 1:43 PMScout
02/26/2024, 1:43 PM@multi_asset_sensor
decorator. While the @multi_asset_sensor
is typically used to monitor changes or updates to existing assets, you could potentially use a similar approach to check for the presence of new assets by comparing the current set of assets with a previously known set. However, this would require maintaining some state or record of known assets and comparing it against the current state within the sensor's logic. Here is a conceptual example based on the documentation's guidance on sensors: python from dagster import sensor, SensorEvaluationContext, RunRequest @sensor def new_asset_monitoring_sensor(context: SensorEvaluationContext): current_assets = get_current_assets() # Function to retrieve the current list of assets known_assets = context.get_state() or set() # Retrieve the known assets from sensor state # Determine if there are any new assets by comparing with the known set new_assets = current_assets - known_assets if new_assets: # Update the sensor state with the new list of known assets context.update_state(current_assets) # Logic to handle new assets (e.g., trigger a run, log an event, etc.) for new_asset in new_assets: # Example: create a RunRequest for each new asset yield RunRequest(run_key=new_asset) def get_current_assets(): # Implement logic to retrieve the current list of assets pass
In this example, get_current_assets
would be a function you implement to retrieve the current list of assets. The sensor compares this list against a known set of assets stored in the sensor's state. If new assets are detected, the sensor updates its state and can perform actions such as triggering runs or logging events. Please note that this is a conceptual example and would require implementation details specific to your environment and how you track and manage assets. The actual implementation of get_current_assets
and how you handle new assets would depend on your particular use case and infrastructure.