Files
POE-sensor/sensor_tracker.py
2025-06-24 09:31:26 +00:00

218 lines
9.1 KiB
Python

import json
import threading
from datetime import datetime, timezone
from collections import defaultdict
import logging
from config import (
MODBUS_HOSTS, SENSOR_TIMEOUT_THRESHOLD, RECOVERY_CONFIRMATION_COUNT,
ALERTING_ENABLED, LOCAL_TIMEZONE
)
class SensorTracker:
def __init__(self):
self.sensor_status = {}
self.failure_counts = defaultdict(int)
self.success_counts = defaultdict(int)
self.alerted_sensors = set()
self.lock = threading.Lock()
# Initialize sensor status
for host in MODBUS_HOSTS:
sensor_id = f"{host['ip']}_{host['location']}"
self.sensor_status[sensor_id] = {
"ip": host["ip"],
"location": host["location"],
"type": host["type"],
"status": "unknown",
"last_success": None,
"last_failure": None,
"consecutive_failures": 0,
"consecutive_successes": 0,
"total_failures": 0,
"total_successes": 0,
"uptime_percentage": 0.0
}
def record_success(self, host_info, mqtt_client=None):
"""Record a successful sensor reading"""
sensor_id = f"{host_info['ip']}_{host_info['location']}"
current_time = datetime.now(LOCAL_TIMEZONE)
with self.lock:
sensor = self.sensor_status[sensor_id]
sensor["status"] = "online"
sensor["last_success"] = current_time.isoformat()
sensor["consecutive_failures"] = 0
sensor["consecutive_successes"] += 1
sensor["total_successes"] += 1
# Update uptime percentage
total_attempts = sensor["total_successes"] + sensor["total_failures"]
if total_attempts > 0:
sensor["uptime_percentage"] = (sensor["total_successes"] / total_attempts) * 100
# Check for recovery
if (sensor["consecutive_successes"] >= RECOVERY_CONFIRMATION_COUNT and
sensor_id in self.alerted_sensors):
self._send_recovery_alert(sensor_id, sensor, mqtt_client)
self.alerted_sensors.remove(sensor_id)
logging.info(f"Sensor {sensor['location']} ({sensor['ip']}) has recovered")
# Note: Status is now published as part of sensor data, not separately
def record_failure(self, host_info, error_message, mqtt_client=None):
"""Record a failed sensor reading"""
sensor_id = f"{host_info['ip']}_{host_info['location']}"
current_time = datetime.now(LOCAL_TIMEZONE)
with self.lock:
sensor = self.sensor_status[sensor_id]
sensor["status"] = "offline"
sensor["last_failure"] = current_time.isoformat()
sensor["last_error"] = error_message
sensor["consecutive_successes"] = 0
sensor["consecutive_failures"] += 1
sensor["total_failures"] += 1
# Update uptime percentage
total_attempts = sensor["total_successes"] + sensor["total_failures"]
if total_attempts > 0:
sensor["uptime_percentage"] = (sensor["total_successes"] / total_attempts) * 100
# Check if we need to send an alert
if (sensor["consecutive_failures"] >= SENSOR_TIMEOUT_THRESHOLD and
sensor_id not in self.alerted_sensors):
self._send_failure_alert(sensor_id, sensor, mqtt_client)
self.alerted_sensors.add(sensor_id)
logging.warning(f"Sensor {sensor['location']} ({sensor['ip']}) is now considered offline")
# Publish offline status using new topic structure
if mqtt_client and ALERTING_ENABLED:
self._publish_offline_status(host_info, sensor, mqtt_client)
def _send_failure_alert(self, sensor_id, sensor, mqtt_client):
"""Send failure alert to MQTT"""
if not mqtt_client or not ALERTING_ENABLED:
return
# Create alert topic using new structure: Location/{location_name}/alerts
alert_topic = f"Location/{sensor['location']}/alerts"
alert_message = {
"alert_type": "sensor_failure",
"sensor_location": sensor["location"],
"sensor_type": sensor["type"],
"consecutive_failures": sensor["consecutive_failures"],
"last_error": sensor.get("last_error"),
"severity": "critical"
}
try:
result = mqtt_client.publish(alert_topic, json.dumps(alert_message))
result.wait_for_publish()
logging.info(f"Sent failure alert for sensor {sensor['location']} to '{alert_topic}'")
except Exception as e:
logging.error(f"Failed to send failure alert: {e}")
def _send_recovery_alert(self, sensor_id, sensor, mqtt_client):
"""Send recovery alert to MQTT"""
if not mqtt_client or not ALERTING_ENABLED:
return
# Create alert topic using new structure: Location/{location_name}/alerts
alert_topic = f"Location/{sensor['location']}/alerts"
alert_message = {
"alert_type": "sensor_recovery",
"sensor_location": sensor["location"],
"sensor_type": sensor["type"],
"consecutive_successes": sensor["consecutive_successes"],
"severity": "info"
}
try:
result = mqtt_client.publish(alert_topic, json.dumps(alert_message))
result.wait_for_publish()
logging.info(f"Sent recovery alert for sensor {sensor['location']} to '{alert_topic}'")
except Exception as e:
logging.error(f"Failed to send recovery alert: {e}")
def _publish_offline_status(self, host_info, sensor, mqtt_client):
"""Publish offline status using new topic structure"""
try:
location = host_info["location"]
# Create base topic path - same structure as online sensors
base_topic = f"Location/{location}"
current_time = datetime.now(LOCAL_TIMEZONE).strftime("%Y-%m-%d %H:%M:%S")
# Publish offline status to match online sensor structure
topics_data = [
(f"{base_topic}/Time", current_time),
(f"{base_topic}/Status","offline"),
(f"{base_topic}/Temperature/temperature"),
(f"{base_topic}/Humidity/humidity")
]
# Add CO2 topic for CO2 sensors
if host_info["type"] == "cwt_co2":
topics_data.append((f"{base_topic}/CO2/CO2"))
# Publish offline status
for topic, value in topics_data:
try:
payload = str(value)
result = mqtt_client.publish(topic, payload)
result.wait_for_publish()
logging.info(f"Published offline status to '{topic}': {payload}")
except Exception as e:
logging.error(f"Error publishing offline status to '{topic}': {e}")
except Exception as e:
logging.error(f"Failed to publish offline status: {e}")
def get_sensor_status(self, sensor_id):
"""Get status of a specific sensor"""
with self.lock:
return self.sensor_status.get(sensor_id, {}).copy()
def get_all_sensor_status(self):
"""Get status of all sensors"""
with self.lock:
return {
"timestamp": datetime.now(LOCAL_TIMEZONE).isoformat(),
"total_sensors": len(self.sensor_status),
"online_sensors": len([s for s in self.sensor_status.values() if s["status"] == "online"]),
"offline_sensors": len([s for s in self.sensor_status.values() if s["status"] == "offline"]),
"sensors": self.sensor_status.copy()
}
def get_summary(self):
"""Get a summary of sensor health"""
with self.lock:
total = len(self.sensor_status)
online = len([s for s in self.sensor_status.values() if s["status"] == "online"])
offline = len([s for s in self.sensor_status.values() if s["status"] == "offline"])
unknown = len([s for s in self.sensor_status.values() if s["status"] == "unknown"])
return {
"total_sensors": total,
"online_sensors": online,
"offline_sensors": offline,
"unknown_sensors": unknown,
"health_percentage": (online / total * 100) if total > 0 else 0,
"alerted_sensors": len(self.alerted_sensors)
}
# Global sensor tracker instance
_sensor_tracker = SensorTracker()
def get_sensor_tracker():
"""Get the global sensor tracker instance"""
return _sensor_tracker
def get_all_sensor_status():
"""Get status of all sensors (convenience function for health check)"""
return _sensor_tracker.get_all_sensor_status()