import json import threading from datetime import datetime, timezone from collections import defaultdict import logging from config import ( MODBUS_HOSTS, SENSOR_TIMEOUT_THRESHOLD, RECOVERY_CONFIRMATION_COUNT, ALERTING_ENABLED, LOCAL_TIMEZONE ) class SensorTracker: def __init__(self): self.sensor_status = {} self.failure_counts = defaultdict(int) self.success_counts = defaultdict(int) self.alerted_sensors = set() self.lock = threading.Lock() # Initialize sensor status for host in MODBUS_HOSTS: sensor_id = f"{host['ip']}_{host['location']}" self.sensor_status[sensor_id] = { "ip": host["ip"], "location": host["location"], "type": host["type"], "status": "unknown", "last_success": None, "last_failure": None, "consecutive_failures": 0, "consecutive_successes": 0, "total_failures": 0, "total_successes": 0, "uptime_percentage": 0.0 } def record_success(self, host_info, mqtt_client=None): """Record a successful sensor reading""" sensor_id = f"{host_info['ip']}_{host_info['location']}" current_time = datetime.now(LOCAL_TIMEZONE) with self.lock: sensor = self.sensor_status[sensor_id] sensor["status"] = "online" sensor["last_success"] = current_time.isoformat() sensor["consecutive_failures"] = 0 sensor["consecutive_successes"] += 1 sensor["total_successes"] += 1 # Update uptime percentage total_attempts = sensor["total_successes"] + sensor["total_failures"] if total_attempts > 0: sensor["uptime_percentage"] = (sensor["total_successes"] / total_attempts) * 100 # Check for recovery if (sensor["consecutive_successes"] >= RECOVERY_CONFIRMATION_COUNT and sensor_id in self.alerted_sensors): self._send_recovery_alert(sensor_id, sensor, mqtt_client) self.alerted_sensors.remove(sensor_id) logging.info(f"Sensor {sensor['location']} ({sensor['ip']}) has recovered") # Note: Status is now published as part of sensor data, not separately def record_failure(self, host_info, error_message, mqtt_client=None): """Record a failed sensor reading""" sensor_id = f"{host_info['ip']}_{host_info['location']}" current_time = datetime.now(LOCAL_TIMEZONE) with self.lock: sensor = self.sensor_status[sensor_id] sensor["status"] = "offline" sensor["last_failure"] = current_time.isoformat() sensor["last_error"] = error_message sensor["consecutive_successes"] = 0 sensor["consecutive_failures"] += 1 sensor["total_failures"] += 1 # Update uptime percentage total_attempts = sensor["total_successes"] + sensor["total_failures"] if total_attempts > 0: sensor["uptime_percentage"] = (sensor["total_successes"] / total_attempts) * 100 # Check if we need to send an alert if (sensor["consecutive_failures"] >= SENSOR_TIMEOUT_THRESHOLD and sensor_id not in self.alerted_sensors): self._send_failure_alert(sensor_id, sensor, mqtt_client) self.alerted_sensors.add(sensor_id) logging.warning(f"Sensor {sensor['location']} ({sensor['ip']}) is now considered offline") # Publish offline status using new topic structure if mqtt_client and ALERTING_ENABLED: self._publish_offline_status(host_info, sensor, mqtt_client) def _send_failure_alert(self, sensor_id, sensor, mqtt_client): """Send failure alert to MQTT""" if not mqtt_client or not ALERTING_ENABLED: return # Create alert topic using new structure: Location/{location_name}/alerts alert_topic = f"Location/{sensor['location']}/alerts" alert_message = { "alert_type": "sensor_failure", "sensor_location": sensor["location"], "sensor_type": sensor["type"], "consecutive_failures": sensor["consecutive_failures"], "last_error": sensor.get("last_error"), "severity": "critical" } try: result = mqtt_client.publish(alert_topic, json.dumps(alert_message)) result.wait_for_publish() logging.info(f"Sent failure alert for sensor {sensor['location']} to '{alert_topic}'") except Exception as e: logging.error(f"Failed to send failure alert: {e}") def _send_recovery_alert(self, sensor_id, sensor, mqtt_client): """Send recovery alert to MQTT""" if not mqtt_client or not ALERTING_ENABLED: return # Create alert topic using new structure: Location/{location_name}/alerts alert_topic = f"Location/{sensor['location']}/alerts" alert_message = { "alert_type": "sensor_recovery", "sensor_location": sensor["location"], "sensor_type": sensor["type"], "consecutive_successes": sensor["consecutive_successes"], "severity": "info" } try: result = mqtt_client.publish(alert_topic, json.dumps(alert_message)) result.wait_for_publish() logging.info(f"Sent recovery alert for sensor {sensor['location']} to '{alert_topic}'") except Exception as e: logging.error(f"Failed to send recovery alert: {e}") def _publish_offline_status(self, host_info, sensor, mqtt_client): """Publish offline status using new topic structure""" try: location = host_info["location"] # Create base topic path - same structure as online sensors base_topic = f"Location/{location}" current_time = datetime.now(LOCAL_TIMEZONE).strftime("%Y-%m-%d %H:%M:%S") # Publish offline status to match online sensor structure topics_data = [ (f"{base_topic}/Time", current_time), (f"{base_topic}/Status","offline"), (f"{base_topic}/Temperature/temperature"), (f"{base_topic}/Humidity/humidity") ] # Add CO2 topic for CO2 sensors if host_info["type"] == "cwt_co2": topics_data.append((f"{base_topic}/CO2/CO2")) # Publish offline status for topic, value in topics_data: try: payload = str(value) result = mqtt_client.publish(topic, payload) result.wait_for_publish() logging.info(f"Published offline status to '{topic}': {payload}") except Exception as e: logging.error(f"Error publishing offline status to '{topic}': {e}") except Exception as e: logging.error(f"Failed to publish offline status: {e}") def get_sensor_status(self, sensor_id): """Get status of a specific sensor""" with self.lock: return self.sensor_status.get(sensor_id, {}).copy() def get_all_sensor_status(self): """Get status of all sensors""" with self.lock: return { "timestamp": datetime.now(LOCAL_TIMEZONE).isoformat(), "total_sensors": len(self.sensor_status), "online_sensors": len([s for s in self.sensor_status.values() if s["status"] == "online"]), "offline_sensors": len([s for s in self.sensor_status.values() if s["status"] == "offline"]), "sensors": self.sensor_status.copy() } def get_summary(self): """Get a summary of sensor health""" with self.lock: total = len(self.sensor_status) online = len([s for s in self.sensor_status.values() if s["status"] == "online"]) offline = len([s for s in self.sensor_status.values() if s["status"] == "offline"]) unknown = len([s for s in self.sensor_status.values() if s["status"] == "unknown"]) return { "total_sensors": total, "online_sensors": online, "offline_sensors": offline, "unknown_sensors": unknown, "health_percentage": (online / total * 100) if total > 0 else 0, "alerted_sensors": len(self.alerted_sensors) } # Global sensor tracker instance _sensor_tracker = SensorTracker() def get_sensor_tracker(): """Get the global sensor tracker instance""" return _sensor_tracker def get_all_sensor_status(): """Get status of all sensors (convenience function for health check)""" return _sensor_tracker.get_all_sensor_status()