235 lines
9.7 KiB
Python
235 lines
9.7 KiB
Python
import json
|
|
import threading
|
|
from datetime import datetime, timezone
|
|
from collections import defaultdict
|
|
import logging
|
|
|
|
from config import (
|
|
MODBUS_HOSTS, SENSOR_TIMEOUT_THRESHOLD, RECOVERY_CONFIRMATION_COUNT,
|
|
ALERTING_ENABLED
|
|
)
|
|
|
|
class SensorTracker:
|
|
def __init__(self):
|
|
self.sensor_status = {}
|
|
self.failure_counts = defaultdict(int)
|
|
self.success_counts = defaultdict(int)
|
|
self.alerted_sensors = set()
|
|
self.lock = threading.Lock()
|
|
|
|
# Initialize sensor status
|
|
for host in MODBUS_HOSTS:
|
|
sensor_id = f"{host['ip']}_{host['location']}"
|
|
self.sensor_status[sensor_id] = {
|
|
"ip": host["ip"],
|
|
"location": host["location"],
|
|
"type": host["type"],
|
|
"status": "unknown",
|
|
"last_success": None,
|
|
"last_failure": None,
|
|
"consecutive_failures": 0,
|
|
"consecutive_successes": 0,
|
|
"total_failures": 0,
|
|
"total_successes": 0,
|
|
"uptime_percentage": 0.0
|
|
}
|
|
|
|
def record_success(self, host_info, mqtt_client=None):
|
|
"""Record a successful sensor reading"""
|
|
sensor_id = f"{host_info['ip']}_{host_info['location']}"
|
|
current_time = datetime.now(timezone.utc)
|
|
|
|
with self.lock:
|
|
sensor = self.sensor_status[sensor_id]
|
|
sensor["status"] = "online"
|
|
sensor["last_success"] = current_time.isoformat()
|
|
sensor["consecutive_failures"] = 0
|
|
sensor["consecutive_successes"] += 1
|
|
sensor["total_successes"] += 1
|
|
|
|
# Update uptime percentage
|
|
total_attempts = sensor["total_successes"] + sensor["total_failures"]
|
|
if total_attempts > 0:
|
|
sensor["uptime_percentage"] = (sensor["total_successes"] / total_attempts) * 100
|
|
|
|
# Check for recovery
|
|
if (sensor["consecutive_successes"] >= RECOVERY_CONFIRMATION_COUNT and
|
|
sensor_id in self.alerted_sensors):
|
|
self._send_recovery_alert(sensor_id, sensor, mqtt_client)
|
|
self.alerted_sensors.remove(sensor_id)
|
|
logging.info(f"Sensor {sensor['location']} ({sensor['ip']}) has recovered")
|
|
|
|
# Note: Status is now published as part of sensor data, not separately
|
|
|
|
def record_failure(self, host_info, error_message, mqtt_client=None):
|
|
"""Record a failed sensor reading"""
|
|
sensor_id = f"{host_info['ip']}_{host_info['location']}"
|
|
current_time = datetime.now(timezone.utc)
|
|
|
|
with self.lock:
|
|
sensor = self.sensor_status[sensor_id]
|
|
sensor["status"] = "offline"
|
|
sensor["last_failure"] = current_time.isoformat()
|
|
sensor["last_error"] = error_message
|
|
sensor["consecutive_successes"] = 0
|
|
sensor["consecutive_failures"] += 1
|
|
sensor["total_failures"] += 1
|
|
|
|
# Update uptime percentage
|
|
total_attempts = sensor["total_successes"] + sensor["total_failures"]
|
|
if total_attempts > 0:
|
|
sensor["uptime_percentage"] = (sensor["total_successes"] / total_attempts) * 100
|
|
|
|
# Check if we need to send an alert
|
|
if (sensor["consecutive_failures"] >= SENSOR_TIMEOUT_THRESHOLD and
|
|
sensor_id not in self.alerted_sensors):
|
|
self._send_failure_alert(sensor_id, sensor, mqtt_client)
|
|
self.alerted_sensors.add(sensor_id)
|
|
logging.warning(f"Sensor {sensor['location']} ({sensor['ip']}) is now considered offline")
|
|
|
|
# Publish offline status using new topic structure
|
|
if mqtt_client and ALERTING_ENABLED:
|
|
self._publish_offline_status(host_info, sensor, mqtt_client)
|
|
|
|
def _send_failure_alert(self, sensor_id, sensor, mqtt_client):
|
|
"""Send failure alert to MQTT"""
|
|
if not mqtt_client or not ALERTING_ENABLED:
|
|
return
|
|
|
|
# Determine sensor type for topic structure
|
|
if sensor["type"] == "cwt_co2":
|
|
sensor_type = "CO2-gas"
|
|
else:
|
|
sensor_type = "temperature-humidity"
|
|
|
|
# Create alert topic using new structure: Location/{location_name}/{sensor_type}/alerts
|
|
alert_topic = f"Location/{sensor['location']}/{sensor_type}/alerts"
|
|
|
|
alert_message = {
|
|
"alert_type": "sensor_failure",
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"sensor_id": sensor_id,
|
|
"sensor_ip": sensor["ip"],
|
|
"sensor_location": sensor["location"],
|
|
"sensor_type": sensor["type"],
|
|
"consecutive_failures": sensor["consecutive_failures"],
|
|
"last_error": sensor.get("last_error", "Unknown error"),
|
|
"severity": "critical"
|
|
}
|
|
|
|
try:
|
|
result = mqtt_client.publish(alert_topic, json.dumps(alert_message))
|
|
result.wait_for_publish()
|
|
logging.info(f"Sent failure alert for sensor {sensor['location']} to '{alert_topic}'")
|
|
except Exception as e:
|
|
logging.error(f"Failed to send failure alert: {e}")
|
|
|
|
def _send_recovery_alert(self, sensor_id, sensor, mqtt_client):
|
|
"""Send recovery alert to MQTT"""
|
|
if not mqtt_client or not ALERTING_ENABLED:
|
|
return
|
|
|
|
# Determine sensor type for topic structure
|
|
if sensor["type"] == "cwt_co2":
|
|
sensor_type = "CO2-gas"
|
|
else:
|
|
sensor_type = "temperature-humidity"
|
|
|
|
# Create alert topic using new structure: Location/{location_name}/{sensor_type}/alerts
|
|
alert_topic = f"Location/{sensor['location']}/{sensor_type}/alerts"
|
|
|
|
alert_message = {
|
|
"alert_type": "sensor_recovery",
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"sensor_id": sensor_id,
|
|
"sensor_ip": sensor["ip"],
|
|
"sensor_location": sensor["location"],
|
|
"sensor_type": sensor["type"],
|
|
"consecutive_successes": sensor["consecutive_successes"],
|
|
"severity": "info"
|
|
}
|
|
|
|
try:
|
|
result = mqtt_client.publish(alert_topic, json.dumps(alert_message))
|
|
result.wait_for_publish()
|
|
logging.info(f"Sent recovery alert for sensor {sensor['location']} to '{alert_topic}'")
|
|
except Exception as e:
|
|
logging.error(f"Failed to send recovery alert: {e}")
|
|
|
|
def _publish_offline_status(self, host_info, sensor, mqtt_client):
|
|
"""Publish offline status using new topic structure"""
|
|
try:
|
|
location = host_info["location"]
|
|
# Determine sensor type based on host_info type
|
|
if host_info["type"] == "cwt_co2":
|
|
sensor_type = "CO2-gas"
|
|
else:
|
|
sensor_type = "temperature-humidity"
|
|
|
|
# Create base topic path
|
|
base_topic = f"Location/{location}/{sensor_type}"
|
|
current_time = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
# Publish offline status to individual topics
|
|
topics_data = [
|
|
(f"{base_topic}/Time", current_time),
|
|
(f"{base_topic}/Status", "offline")
|
|
]
|
|
|
|
# Publish offline status
|
|
for topic, value in topics_data:
|
|
try:
|
|
payload = str(value)
|
|
result = mqtt_client.publish(topic, payload)
|
|
result.wait_for_publish()
|
|
logging.info(f"Published offline status to '{topic}': {payload}")
|
|
except Exception as e:
|
|
logging.error(f"Error publishing offline status to '{topic}': {e}")
|
|
|
|
except Exception as e:
|
|
logging.error(f"Failed to publish offline status: {e}")
|
|
|
|
def get_sensor_status(self, sensor_id):
|
|
"""Get status of a specific sensor"""
|
|
with self.lock:
|
|
return self.sensor_status.get(sensor_id, {}).copy()
|
|
|
|
def get_all_sensor_status(self):
|
|
"""Get status of all sensors"""
|
|
with self.lock:
|
|
return {
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"total_sensors": len(self.sensor_status),
|
|
"online_sensors": len([s for s in self.sensor_status.values() if s["status"] == "online"]),
|
|
"offline_sensors": len([s for s in self.sensor_status.values() if s["status"] == "offline"]),
|
|
"sensors": self.sensor_status.copy()
|
|
}
|
|
|
|
def get_summary(self):
|
|
"""Get a summary of sensor health"""
|
|
with self.lock:
|
|
total = len(self.sensor_status)
|
|
online = len([s for s in self.sensor_status.values() if s["status"] == "online"])
|
|
offline = len([s for s in self.sensor_status.values() if s["status"] == "offline"])
|
|
unknown = len([s for s in self.sensor_status.values() if s["status"] == "unknown"])
|
|
|
|
return {
|
|
"total_sensors": total,
|
|
"online_sensors": online,
|
|
"offline_sensors": offline,
|
|
"unknown_sensors": unknown,
|
|
"health_percentage": (online / total * 100) if total > 0 else 0,
|
|
"alerted_sensors": len(self.alerted_sensors)
|
|
}
|
|
|
|
# Global sensor tracker instance
|
|
_sensor_tracker = SensorTracker()
|
|
|
|
def get_sensor_tracker():
|
|
"""Get the global sensor tracker instance"""
|
|
return _sensor_tracker
|
|
|
|
def get_all_sensor_status():
|
|
"""Get status of all sensors (convenience function for health check)"""
|
|
return _sensor_tracker.get_all_sensor_status()
|