Andres Crucetta
04/06/2023, 3:49 PM@sensor(job=upload_data)
def azure_blob_file_sensor(context):
"""
Sensor that checks if a file matching a regex pattern exists in a folder in an Azure Blob Storage container.
"""
regex_pattern: String = context.config_schema["regex_pattern"]
folder_path: String = context.config_schema["folder_path"]
time_threshold: int = context.config_schema["time_threshold"] # Time threshold in seconds
container_client = ContainerClient.from_connection_string(
os.environ.get('AZURE_CONNECTION_STRING'), os.environ.get("AZURE_CONTAINER_NAME")
)
matched_files = file_exists(container_client, regex_pattern, folder_path)
if matched_files:
last_run_time = get_last_run_time(context)
if last_run_time:
time_since_last_run = (datetime.utcnow() - last_run_time).total_seconds()
if time_since_last_run < time_threshold:
<http://context.log.info|context.log.info>("Job ran recently, skipping execution.")
return
for matched_file in matched_files:
<http://context.log.info|context.log.info>(f"Sensed file: {matched_file.name} at {matched_file.properties.last_modified}")
yield RunRequest(run_key=matched_file.name)
else:
<http://context.log.info|context.log.info>("No matching files found.")
sandy
04/06/2023, 6:31 PMdef make_sensor(file_name):
@sensor(name=f"{file_name}_sensor")
def _sensor():
...
return _sensor
Andres Crucetta
04/06/2023, 7:07 PMsandy
04/06/2023, 8:47 PMmake_sensor
in a for loop on all your files that you want to monitor. Would that work for you?Andres Crucetta
04/07/2023, 4:16 PMsandy
04/07/2023, 4:33 PMWould I have to build each sensor in this definitions call? With some form of list comprehensionExactly