diff --git a/config.py b/config.py index ab751e6..1664d63 100644 --- a/config.py +++ b/config.py @@ -33,46 +33,3 @@ HEALTH_CHECK_ENABLED = True ALERTING_ENABLED = True SENSOR_TIMEOUT_THRESHOLD = 3 # Number of consecutive failures before alert RECOVERY_CONFIRMATION_COUNT = 2 # Number of consecutive successes to confirm recovery - -# Sensor status tracking - now integrated into data payload -# New topic structure: Location/{location_name}/{sensor_type}/{data_type} -# Where sensor_type is: "temperature-humidity" or "CO2-gas" -# Where data_type is: "data" or "alerts" -# Example topics: -# - Location/Warehouse-B/temperature-humidity/data (contains temp, humidity, status) -# - Location/Office/CO2-gas/data (contains co2, temp, humidity, status) -# - Location/Warehouse-B/temperature-humidity/alerts (contains failure/recovery alerts) - -# New MQTT Topic Structure: -# Base: Location/{location_name}/{sensor_type}/{data_type} -# -# Data topics: Location/{location_name}/{sensor_type}/data -# Alert topics: Location/{location_name}/{sensor_type}/alerts -# -# Payload format for data (JSON): -# { -# "timestamp": "2024-01-01 12:00:00", -# "location": "Warehouse-B", -# "sensor_type": "temperature-humidity", -# "ip": "10.84.60.31", -# "status": "online|offline", -# "data": { -# "temperature": 25.5, -# "humidity": 60.2 -# } -# } -# -# Payload format for alerts (JSON): -# { -# "alert_type": "sensor_failure|sensor_recovery", -# "timestamp": "2024-01-01T12:00:00.000000+00:00", -# "sensor_id": "10.84.60.31_Warehouse-B", -# "sensor_ip": "10.84.60.31", -# "sensor_location": "Warehouse-B", -# "sensor_type": "temperature_humidity", -# "consecutive_failures": 3, -# "last_error": "Failed to read data", -# "severity": "critical|info" -# } - - diff --git a/health_check.py b/health_check.py index d7ca481..de5f4fa 100644 --- a/health_check.py +++ b/health_check.py @@ -59,15 +59,17 @@ class HealthCheckServer: return try: - self.server = HTTPServer(('', self.port), HealthCheckHandler) + # Bind to all interfaces to make it accessible from outside container + self.server = HTTPServer(('0.0.0.0', self.port), HealthCheckHandler) self.thread = threading.Thread(target=self.server.serve_forever, daemon=True) self.thread.start() - logging.info(f"Health check server started on port {self.port}") + logging.info(f"Health check server started on 0.0.0.0:{self.port}") logging.info(f"Health check endpoints:") - logging.info(f" - http://localhost:{self.port}/health") - logging.info(f" - http://localhost:{self.port}/sensors") + logging.info(f" - http://0.0.0.0:{self.port}/health") + logging.info(f" - http://0.0.0.0:{self.port}/sensors") except Exception as e: logging.error(f"Failed to start health check server: {e}") + raise e # Re-raise to make the issue visible def stop(self): """Stop the health check server""" diff --git a/poe-sensor.nomad b/poe-sensor.nomad index 7c5bef8..21c6b3f 100644 --- a/poe-sensor.nomad +++ b/poe-sensor.nomad @@ -85,6 +85,8 @@ job "poe-sensor" { env { LOG_LEVEL = "INFO" PYTHONUNBUFFERED = "1" + PYTHONDONTWRITEBYTECODE = "1" # Prevent .pyc files to save memory + PYTHONMALLOC = "malloc" # Use system malloc for better memory management TZ = "Asia/Ho_Chi_Minh" # MQTT configuration (can be overridden by config.py) MQTT_BROKER = "mqtt.service.mesh" @@ -96,7 +98,7 @@ job "poe-sensor" { # Resource allocation resources { cpu = 256 - memory = 256 + memory = 512 } # Logs configuration diff --git a/sensor_bridge.py b/sensor_bridge.py index 08e2f05..0a9e3d9 100644 --- a/sensor_bridge.py +++ b/sensor_bridge.py @@ -1,6 +1,7 @@ import logging import json import time +import gc # Add garbage collection from datetime import datetime, timezone from pymodbus.client import ModbusTcpClient from pymodbus.exceptions import ModbusException @@ -31,7 +32,7 @@ def on_publish(client, userdata, mid): def read_and_publish_data(mqtt_client, modbus_client, host_info): """Read data from Modbus and publish to MQTT""" try: - # Check and establish Modbus connection + # Check and establish Modbus connection with explicit timeout if not modbus_client.is_socket_open(): logging.info(f"Attempting to connect to {host_info['ip']}:{MODBUS_PORT}") connection_result = modbus_client.connect() @@ -52,6 +53,13 @@ def read_and_publish_data(mqtt_client, modbus_client, host_info): except Exception as e: logging.error(f"Unexpected error from {host_info['ip']}: {e}", exc_info=True) return False + finally: + # Ensure connection is closed after each operation to prevent resource leaks + try: + if modbus_client and modbus_client.is_socket_open(): + modbus_client.close() + except Exception as e: + logging.warning(f"Error closing modbus connection for {host_info['ip']}: {e}") def read_and_publish_temperature_humidity(mqtt_client, modbus_client, host_info): """Read temperature and humidity sensors""" @@ -74,39 +82,45 @@ def read_and_publish_temperature_humidity(mqtt_client, modbus_client, host_info) humidity = raw_hum * 100 / 1000 # Correct formula: 0% to 100% RH logging.info(f"Raw humidity from {host_info['ip']}: {raw_hum}, Corrected: {humidity:.1f}%RH") - # Prepare new topic structure: Location/{location_name}/{sensor_type}/data + # Prepare new topic structure: Location/{location_name}/{type}/... location = host_info["location"] sensor_type = "temperature-humidity" current_time = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") - # Create topic for combined data - data_topic = f"Location/{location}/{sensor_type}/data" + # Create base topic path + base_topic = f"Location/{location}/{sensor_type}" - # Create JSON payload with all data and status - payload = { - "timestamp": current_time, - "location": location, - "sensor_type": sensor_type, - "ip": host_info["ip"], - "status": "online", - "data": { - "temperature": round(temperature, 1), - "humidity": round(humidity, 1) - } - } + # Publish to individual topics + topics_data = [ + (f"{base_topic}/Time", current_time), + (f"{base_topic}/Status", "online"), + (f"{base_topic}/Data/temperature", round(temperature, 1)), + (f"{base_topic}/Data/humidity", round(humidity, 1)) + ] - # Publish combined data as JSON - result = mqtt_client.publish(data_topic, json.dumps(payload)) - result.wait_for_publish() + # Publish all data + all_published = True + for topic, value in topics_data: + try: + # Convert value to string for MQTT + payload = str(value) + result = mqtt_client.publish(topic, payload) + result.wait_for_publish() + + if result.is_published(): + logging.info(f"Published to '{topic}': {payload}") + else: + logging.error(f"Failed to publish to '{topic}': {payload}") + all_published = False + except Exception as e: + logging.error(f"Error publishing to '{topic}': {e}") + all_published = False - logging.info(f"Published to '{data_topic}': {json.dumps(payload, indent=2)}") - - # Check if published successfully - if result.is_published(): - logging.info(f"Successfully published temperature-humidity data for {location}") + if all_published: + logging.info(f"Successfully published all temperature-humidity data for {location}") return True else: - logging.error(f"Failed to publish temperature-humidity data from {host_info['ip']}") + logging.error(f"Failed to publish some temperature-humidity data from {host_info['ip']}") return False except ModbusException as e: @@ -148,40 +162,46 @@ def read_and_publish_cwt_co2(mqtt_client, modbus_client, host_info): logging.info(f"Processed CWT values from {host_info['ip']} - Humidity: {humidity:.1f}%RH, Temperature: {temperature:.1f}°C, CO2: {co2_ppm}ppm") - # Prepare new topic structure: Location/{location_name}/{sensor_type}/data + # Prepare new topic structure: Location/{location_name}/{type}/... location = host_info["location"] sensor_type = "CO2-gas" current_time = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") - # Create topic for combined data - data_topic = f"Location/{location}/{sensor_type}/data" + # Create base topic path + base_topic = f"Location/{location}/{sensor_type}" - # Create JSON payload with all data and status - payload = { - "timestamp": current_time, - "location": location, - "sensor_type": sensor_type, - "ip": host_info["ip"], - "status": "online", - "data": { - "co2": co2_ppm, - "temperature": round(temperature, 1), - "humidity": round(humidity, 1) - } - } + # Publish to individual topics + topics_data = [ + (f"{base_topic}/Time", current_time), + (f"{base_topic}/Status", "online"), + (f"{base_topic}/Data/co2", co2_ppm), + (f"{base_topic}/Data/temperature", round(temperature, 1)), + (f"{base_topic}/Data/humidity", round(humidity, 1)) + ] - # Publish combined data as JSON - result = mqtt_client.publish(data_topic, json.dumps(payload)) - result.wait_for_publish() + # Publish all data + all_published = True + for topic, value in topics_data: + try: + # Convert value to string for MQTT + payload = str(value) + result = mqtt_client.publish(topic, payload) + result.wait_for_publish() + + if result.is_published(): + logging.info(f"Published to '{topic}': {payload}") + else: + logging.error(f"Failed to publish to '{topic}': {payload}") + all_published = False + except Exception as e: + logging.error(f"Error publishing to '{topic}': {e}") + all_published = False - logging.info(f"Published to '{data_topic}': {json.dumps(payload, indent=2)}") - - # Check if published successfully - if result.is_published(): - logging.info(f"Successfully published CO2-gas data for {location}") + if all_published: + logging.info(f"Successfully published all CO2-gas data for {location}") return True else: - logging.error(f"Failed to publish CO2-gas data from {host_info['ip']}") + logging.error(f"Failed to publish some CO2-gas data from {host_info['ip']}") return False except ModbusException as e: @@ -222,6 +242,7 @@ def main_loop(): # Main loop to read and publish data from all hosts while True: for host_info in MODBUS_HOSTS: + modbus_client = None error_message = None try: modbus_client = ModbusTcpClient( @@ -246,11 +267,15 @@ def main_loop(): sensor_tracker.record_failure(host_info, error_message, mqtt_client) logging.error(f"Error processing {host_info['location']} ({host_info['ip']}): {e}", exc_info=True) finally: - try: - modbus_client.close() - logging.debug(f"Closed connection to {host_info['ip']}") - except: - pass + # Ensure modbus connection is always closed + if modbus_client: + try: + modbus_client.close() + logging.debug(f"Closed connection to {host_info['ip']}") + except Exception as close_error: + logging.warning(f"Error closing connection to {host_info['ip']}: {close_error}") + finally: + modbus_client = None # Explicit cleanup # Add small delay between processing each sensor time.sleep(1) @@ -260,6 +285,9 @@ def main_loop(): logging.info(f"Cycle completed - Online: {summary['online_sensors']}/{summary['total_sensors']} sensors " f"({summary['health_percentage']:.1f}% health), " f"Alerts: {summary['alerted_sensors']}") + + # Force garbage collection every cycle to prevent memory buildup + gc.collect() logging.info(f"Waiting {PUBLISH_INTERVAL} seconds until next cycle...") time.sleep(PUBLISH_INTERVAL) diff --git a/sensor_tracker.py b/sensor_tracker.py index 6a52f49..81e2414 100644 --- a/sensor_tracker.py +++ b/sensor_tracker.py @@ -166,24 +166,26 @@ class SensorTracker: else: sensor_type = "temperature-humidity" - # Create topic for offline status using new structure: Location/{location_name}/{sensor_type}/data - data_topic = f"Location/{location}/{sensor_type}/data" + # Create base topic path + base_topic = f"Location/{location}/{sensor_type}" + current_time = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") - # Create JSON payload with offline status - payload = { - "timestamp": datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S"), - "location": location, - "sensor_type": sensor_type, - "ip": host_info["ip"], - "status": "offline", - "error": sensor.get("last_error", "Unknown error"), - "consecutive_failures": sensor["consecutive_failures"], - "uptime_percentage": round(sensor["uptime_percentage"], 2) - } + # Publish offline status to individual topics + topics_data = [ + (f"{base_topic}/Time", current_time), + (f"{base_topic}/Status", "offline") + ] - result = mqtt_client.publish(data_topic, json.dumps(payload)) - result.wait_for_publish() - logging.info(f"Published offline status to '{data_topic}'") + # Publish offline status + for topic, value in topics_data: + try: + payload = str(value) + result = mqtt_client.publish(topic, payload) + result.wait_for_publish() + logging.info(f"Published offline status to '{topic}': {payload}") + except Exception as e: + logging.error(f"Error publishing offline status to '{topic}': {e}") + except Exception as e: logging.error(f"Failed to publish offline status: {e}")