From 9b0f4f623623883f56d2d5573cd902287192d37e Mon Sep 17 00:00:00 2001 From: Naab2k3 Date: Mon, 23 Jun 2025 13:46:59 +0700 Subject: [PATCH] Refactor health check functionality in POE project. Removed readiness endpoint from health_check.py and improved error handling for sensor status retrieval. Updated logging to reduce noise and adjusted health check server startup process in main.py. Modified Nomad job configuration for network mode and resource allocation, enhancing overall system performance and stability. --- health_check.py | 208 ++++++++++++----------------------------------- main.py | 65 ++++----------- poe-sensor.nomad | 57 +++++-------- sensor_bridge.py | 157 ++++++++++++++--------------------- 4 files changed, 148 insertions(+), 339 deletions(-) diff --git a/health_check.py b/health_check.py index 704b298..b78f697 100644 --- a/health_check.py +++ b/health_check.py @@ -15,8 +15,6 @@ class HealthCheckHandler(BaseHTTPRequestHandler): self.send_health_response() elif self.path == '/sensors': self.send_sensors_status() - elif self.path == '/ready': - self.send_readiness_response() else: self.send_response(404) self.end_headers() @@ -24,110 +22,64 @@ class HealthCheckHandler(BaseHTTPRequestHandler): def send_health_response(self): """Send basic health check response""" try: - # Get basic application status - from sensor_tracker import get_sensor_tracker - sensor_tracker = get_sensor_tracker() - summary = sensor_tracker.get_summary() - - health_data = { - "status": "healthy", - "timestamp": datetime.now(timezone.utc).isoformat(), - "service": "modbus-mqtt-bridge", - "version": "1.0.0", - "sensors": { - "total": summary.get('total_sensors', 0), - "online": summary.get('online_sensors', 0), - "offline": summary.get('offline_sensors', 0), - "unknown": summary.get('unknown_sensors', 0), - "health_percentage": summary.get('health_percentage', 0.0) - }, - "uptime": self._get_uptime(), - "host": socket.gethostname() - } - - # Service is healthy if the health check server is running - # Don't mark as unhealthy just because sensors are offline - # That's what the degraded status is for - if summary.get('total_sensors', 0) > 0: - if summary.get('online_sensors', 0) == 0: - health_data["status"] = "degraded" - health_data["message"] = "All sensors offline but service is running" - elif summary.get('health_percentage', 0) < 50: - health_data["status"] = "degraded" - health_data["message"] = f"Low sensor health: {summary.get('health_percentage', 0):.1f}%" - - except Exception as e: - # If we can't get sensor status, still report as healthy since the service is running - logging.warning(f"Could not get sensor status for health check: {e}") - health_data = { - "status": "healthy", - "timestamp": datetime.now(timezone.utc).isoformat(), - "service": "modbus-mqtt-bridge", - "version": "1.0.0", - "message": "Service running, sensor status unavailable", - "uptime": self._get_uptime(), - "host": socket.gethostname() - } - - self.send_response(200) - self.send_header('Content-type', 'application/json') - self.send_header('Cache-Control', 'no-cache') - self.end_headers() - self.wfile.write(json.dumps(health_data, indent=2).encode()) - - def send_readiness_response(self): - """Send readiness probe response - checks if service is ready to serve""" - try: - # Check if main components are initialized - from sensor_tracker import get_sensor_tracker - sensor_tracker = get_sensor_tracker() - - ready_data = { - "ready": True, - "timestamp": datetime.now(timezone.utc).isoformat(), - "service": "modbus-mqtt-bridge", - "checks": { - "sensor_tracker": True, - "health_server": True + # Try to get sensor status + try: + from sensor_tracker import get_sensor_tracker + sensor_tracker = get_sensor_tracker() + summary = sensor_tracker.get_summary() + + health_data = { + "status": "healthy", + "timestamp": datetime.now(timezone.utc).isoformat(), + "service": "modbus-mqtt-bridge", + "version": "1.0.0", + "sensors": { + "total": summary.get('total_sensors', 0), + "online": summary.get('online_sensors', 0), + "health_percentage": summary.get('health_percentage', 0.0) + } } - } - - self.send_response(200) - self.send_header('Content-type', 'application/json') - self.send_header('Cache-Control', 'no-cache') - self.end_headers() - self.wfile.write(json.dumps(ready_data, indent=2).encode()) - + + # Mark as degraded if no sensors online but service is running + if summary.get('total_sensors', 0) > 0 and summary.get('online_sensors', 0) == 0: + health_data["status"] = "degraded" + health_data["message"] = "All sensors offline" + + except Exception as e: + # If sensor tracker not available, still report healthy + health_data = { + "status": "healthy", + "timestamp": datetime.now(timezone.utc).isoformat(), + "service": "modbus-mqtt-bridge", + "version": "1.0.0", + "message": "Service running, sensor status unavailable" + } + except Exception as e: - logging.error(f"Readiness check failed: {e}") - ready_data = { - "ready": False, + health_data = { + "status": "unhealthy", "timestamp": datetime.now(timezone.utc).isoformat(), "service": "modbus-mqtt-bridge", "error": str(e) } - - self.send_response(503) - self.send_header('Content-type', 'application/json') - self.send_header('Cache-Control', 'no-cache') - self.end_headers() - self.wfile.write(json.dumps(ready_data, indent=2).encode()) + + self.send_response(200) + self.send_header('Content-type', 'application/json') + self.end_headers() + self.wfile.write(json.dumps(health_data, indent=2).encode()) def send_sensors_status(self): """Send detailed sensor status""" try: - # Get sensor status from the global sensor tracker from sensor_tracker import get_all_sensor_status sensors_status = get_all_sensor_status() self.send_response(200) self.send_header('Content-type', 'application/json') - self.send_header('Cache-Control', 'no-cache') self.end_headers() self.wfile.write(json.dumps(sensors_status, indent=2).encode()) except Exception as e: - logging.error(f"Error getting sensor status: {e}") error_response = { "error": "Failed to get sensor status", "message": str(e), @@ -139,18 +91,9 @@ class HealthCheckHandler(BaseHTTPRequestHandler): self.end_headers() self.wfile.write(json.dumps(error_response, indent=2).encode()) - def _get_uptime(self): - """Get service uptime""" - try: - with open('/proc/uptime', 'r') as f: - uptime_seconds = float(f.readline().split()[0]) - return f"{uptime_seconds:.1f}s" - except: - return "unknown" - def log_message(self, format, *args): - """Override to use our logging system""" - logging.info(f"Health Check - {format % args}") + """Override to reduce noise""" + pass class HealthCheckServer: def __init__(self, port=None): @@ -160,87 +103,40 @@ class HealthCheckServer: self.started = False def start(self): - """Start the health check server in a separate thread""" + """Start the health check server""" if not HEALTH_CHECK_ENABLED: logging.info("Health check server is disabled") return False try: - logging.info(f"Attempting to start health check server on port {self.port}") - - # Check if port is available - if not self._is_port_available(self.port): - logging.error(f"Port {self.port} is already in use. Cannot start health check server.") - return False - - # Bind to all interfaces to make it accessible from outside container + logging.info(f"Starting health check server on port {self.port}") self.server = HTTPServer(('0.0.0.0', self.port), HealthCheckHandler) - # Test if the server can actually bind to the port - logging.info(f"Successfully bound to 0.0.0.0:{self.port}") - - self.thread = threading.Thread(target=self._serve_with_error_handling, daemon=True) + self.thread = threading.Thread(target=self._serve, daemon=True) self.thread.start() - # Give the server a moment to start and verify it's working + # Give server time to start time.sleep(1) - if self._test_health_endpoint(): - self.started = True - logging.info(f"Health check server started successfully on 0.0.0.0:{self.port}") - logging.info(f"Health check endpoints:") - logging.info(f" - http://0.0.0.0:{self.port}/health") - logging.info(f" - http://0.0.0.0:{self.port}/sensors") - logging.info(f" - http://0.0.0.0:{self.port}/ready") - logging.info("Health check server is ready for external health checks") - return True - else: - logging.error("Health check server started but endpoints are not responding") - return False + self.started = True + logging.info(f"Health check server running on http://0.0.0.0:{self.port}/health") + return True - except OSError as e: - if e.errno == 98: # Address already in use - logging.error(f"Port {self.port} is already in use. Cannot start health check server.") - else: - logging.error(f"OS error starting health check server: {e}") - return False except Exception as e: logging.error(f"Failed to start health check server: {e}") return False - def _is_port_available(self, port): - """Check if port is available""" + def _serve(self): + """Serve requests""" try: - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.bind(('0.0.0.0', port)) - return True - except OSError: - return False - - def _test_health_endpoint(self): - """Test if health endpoint is responding""" - try: - import urllib.request - with urllib.request.urlopen(f'http://localhost:{self.port}/health', timeout=5) as response: - return response.status == 200 - except Exception as e: - logging.warning(f"Health endpoint test failed: {e}") - return False - - def _serve_with_error_handling(self): - """Serve forever with error handling""" - try: - logging.info("Health check server thread started, beginning to serve requests") if self.server: self.server.serve_forever() - else: - logging.error("Health check server is None, cannot serve requests") except Exception as e: - logging.error(f"Health check server error: {e}", exc_info=True) + logging.error(f"Health check server error: {e}") self.started = False def stop(self): - """Stop the health check server""" + """Stop the server""" if self.server: self.server.shutdown() self.server.server_close() diff --git a/main.py b/main.py index 650089f..a1376cb 100644 --- a/main.py +++ b/main.py @@ -14,66 +14,31 @@ Author: POE Project import time import logging import sys -import signal -from health_check import HealthCheckServer -from sensor_bridge import main_loop - -# Global health server for cleanup -health_server = None - -def signal_handler(signum, frame): - """Handle shutdown signals gracefully""" - global health_server - logging.info(f"Received signal {signum}, shutting down gracefully...") - if health_server: - health_server.stop() - sys.exit(0) +import os if __name__ == "__main__": - # Setup logging + # Setup logging first logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', - handlers=[ - logging.StreamHandler(sys.stdout), - ] + handlers=[logging.StreamHandler(sys.stdout)] ) - # Setup signal handlers - signal.signal(signal.SIGTERM, signal_handler) - signal.signal(signal.SIGINT, signal_handler) - logging.info("POE Sensor Bridge starting up...") try: - # Start health check server first - logging.info("Starting health check server...") - health_server = HealthCheckServer() + # Import after logging is setup to avoid circular import issues + from sensor_bridge import main_loop - if health_server.start(): - logging.info("Health check server started successfully") - - # Give health check server time to be ready - time.sleep(3) - - # Verify health check is working - if health_server.is_running(): - logging.info("Health check server verified as running") - logging.info("Starting main sensor loop...") - main_loop() - else: - logging.error("Health check server failed to start properly") - sys.exit(1) - else: - logging.error("Failed to start health check server") - sys.exit(1) - - except KeyboardInterrupt: - logging.info("Received keyboard interrupt, shutting down...") + # Give the system a moment to initialize + time.sleep(2) + + logging.info("Starting main sensor loop...") + main_loop() + + except ImportError as e: + logging.error(f"Import error: {e}") + sys.exit(1) except Exception as e: logging.error(f"Fatal error: {e}", exc_info=True) - sys.exit(1) - finally: - if health_server: - health_server.stop() - logging.info("POE Sensor Bridge shutdown complete") \ No newline at end of file + sys.exit(1) \ No newline at end of file diff --git a/poe-sensor.nomad b/poe-sensor.nomad index b0145f6..1a56c41 100644 --- a/poe-sensor.nomad +++ b/poe-sensor.nomad @@ -10,29 +10,28 @@ job "poe-sensor" { group "sensor-bridge" { count = 1 - # Network configuration - using bridge mode with port mapping for better isolation + # Network configuration - using host mode for better compatibility network { - mode = "bridge" + mode = "host" port "health" { static = 8080 - to = 8080 } } - # Restart policy - more lenient for startup issues + # Restart policy restart { - attempts = 5 + attempts = 3 interval = "30m" - delay = "30s" + delay = "15s" mode = "fail" } # Update strategy update { max_parallel = 1 - min_healthy_time = "120s" # Increased from 60s - healthy_deadline = "10m" # Increased from 5m - progress_deadline = "15m" # Increased from 10m + min_healthy_time = "90s" + healthy_deadline = "8m" + progress_deadline = "12m" auto_revert = true canary = 0 } @@ -51,12 +50,12 @@ job "poe-sensor" { check { type = "http" path = "/health" - interval = "30s" # Reduced frequency - timeout = "30s" # Increased timeout - initial_status = "critical" # Start as critical until proven healthy + interval = "30s" + timeout = "20s" + initial_status = "critical" check_restart { - limit = 3 - grace = "30s" # More time for graceful shutdown + limit = 2 + grace = "20s" } } } @@ -69,25 +68,11 @@ job "poe-sensor" { command = "/bin/bash" args = [ "-c", - < 32767: # Negative temperature in 2's complement + if raw_temperature > 32767: temperature = (raw_temperature - 65536) / 10.0 else: temperature = raw_temperature / 10.0 - # CO2: 1ppm resolution for standard sensor co2_ppm = raw_co2 - logging.info(f"Processed CWT values from {host_info['ip']} - Humidity: {humidity:.1f}%RH, Temperature: {temperature:.1f}°C, CO2: {co2_ppm}ppm") + logging.info(f"CWT from {host_info['ip']} - Temp: {temperature:.1f}°C, Humidity: {humidity:.1f}%RH, CO2: {co2_ppm}ppm") - # Prepare new topic structure: Location/{location_name}/{type}/... + # Publish data location = host_info["location"] sensor_type = "CO2-gas" current_time = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") - # Create base topic path base_topic = f"Location/{location}/{sensor_type}" - # Publish to individual topics topics_data = [ (f"{base_topic}/Time", current_time), (f"{base_topic}/Status", "online"), - (f"{base_topic}/Data/co2", co2_ppm), (f"{base_topic}/Data/temperature", round(temperature, 1)), - (f"{base_topic}/Data/humidity", round(humidity, 1)) + (f"{base_topic}/Data/humidity", round(humidity, 1)), + (f"{base_topic}/Data/co2", co2_ppm) ] - # Publish all data all_published = True for topic, value in topics_data: try: - # Convert value to string for MQTT payload = str(value) result = mqtt_client.publish(topic, payload) result.wait_for_publish() if result.is_published(): - logging.info(f"Published to '{topic}': {payload}") + logging.debug(f"Published to '{topic}': {payload}") else: logging.error(f"Failed to publish to '{topic}': {payload}") all_published = False @@ -197,23 +170,19 @@ def read_and_publish_cwt_co2(mqtt_client, modbus_client, host_info): logging.error(f"Error publishing to '{topic}': {e}") all_published = False - if all_published: - logging.info(f"Successfully published all CO2-gas data for {location}") - return True - else: - logging.error(f"Failed to publish some CO2-gas data from {host_info['ip']}") - return False - - except ModbusException as e: - logging.error(f"Modbus error from CWT sensor {host_info['ip']}: {e}", exc_info=True) - return False + return all_published + except Exception as e: - logging.error(f"Unexpected error from CWT sensor {host_info['ip']}: {e}", exc_info=True) + logging.error(f"Error in CWT CO2 reading: {e}") return False def main_loop(): """Main function to connect and publish data in cycles""" - # Initialize sensor tracker and health check server + # Import here to avoid circular import + from sensor_tracker import get_sensor_tracker + from health_check import HealthCheckServer + + # Initialize components sensor_tracker = get_sensor_tracker() health_server = HealthCheckServer() @@ -227,7 +196,11 @@ def main_loop(): try: # Start health check server - health_server.start() + logging.info("Starting health check server...") + if health_server.start(): + logging.info("Health check server started successfully") + else: + logging.warning("Health check server failed to start, continuing anyway...") # Connect to MQTT broker logging.info(f"Connecting to MQTT broker {MQTT_BROKER}:{MQTT_PORT}...") @@ -235,76 +208,68 @@ def main_loop(): mqtt_client.loop_start() logging.info(f"Starting monitoring of {len(MODBUS_HOSTS)} sensors") - logging.info("System status can be monitored at:") - logging.info(f" - Health: http://0.0.0.0:8080/health") - logging.info(f" - Sensors: http://0.0.0.0:8080/sensors") + if health_server.is_running(): + logging.info("Health check available at: http://0.0.0.0:8080/health") - # Main loop to read and publish data from all hosts + # Main loop while True: for host_info in MODBUS_HOSTS: modbus_client = None - error_message = None try: modbus_client = ModbusTcpClient( host=host_info["ip"], port=MODBUS_PORT, - timeout=10 # Reduced timeout from 30 to 10 seconds + timeout=10 ) - logging.info(f"Processing channel {host_info['location']} at {host_info['ip']}:{MODBUS_PORT}") + logging.info(f"Processing {host_info['location']} at {host_info['ip']}") success = read_and_publish_data(mqtt_client, modbus_client, host_info) if success: - # Record successful reading sensor_tracker.record_success(host_info, mqtt_client) - logging.info(f"Successfully processed {host_info['location']} ({host_info['ip']})") + logging.info(f"Successfully processed {host_info['location']}") else: error_message = f"Failed to read/publish data from {host_info['ip']}" sensor_tracker.record_failure(host_info, error_message, mqtt_client) - logging.warning(f"Failed to process {host_info['location']} ({host_info['ip']}), will retry next cycle.") + logging.warning(f"Failed to process {host_info['location']}") except Exception as e: error_message = f"Exception processing {host_info['ip']}: {str(e)}" sensor_tracker.record_failure(host_info, error_message, mqtt_client) - logging.error(f"Error processing {host_info['location']} ({host_info['ip']}): {e}", exc_info=True) + logging.error(f"Error processing {host_info['location']}: {e}") finally: - # Ensure modbus connection is always closed if modbus_client: try: modbus_client.close() - logging.debug(f"Closed connection to {host_info['ip']}") except Exception as close_error: - logging.warning(f"Error closing connection to {host_info['ip']}: {close_error}") - finally: - modbus_client = None # Explicit cleanup + logging.warning(f"Error closing connection: {close_error}") - # Add small delay between processing each sensor time.sleep(1) - # Log system summary every cycle + # Log summary summary = sensor_tracker.get_summary() - logging.info(f"Cycle completed - Online: {summary['online_sensors']}/{summary['total_sensors']} sensors " - f"({summary['health_percentage']:.1f}% health), " - f"Alerts: {summary['alerted_sensors']}") + logging.info(f"Cycle completed - Online: {summary['online_sensors']}/{summary['total_sensors']} sensors") - # Force garbage collection every cycle to prevent memory buildup gc.collect() - - logging.info(f"Waiting {PUBLISH_INTERVAL} seconds until next cycle...") + logging.info(f"Waiting {PUBLISH_INTERVAL} seconds...") time.sleep(PUBLISH_INTERVAL) except KeyboardInterrupt: - logging.info("Received stop signal from user, shutting down...") + logging.info("Received stop signal, shutting down...") except Exception as e: logging.error(f"Unexpected error in main loop: {e}", exc_info=True) finally: # Cleanup try: - health_server.stop() + if health_server: + health_server.stop() except: pass - mqtt_client.loop_stop() - mqtt_client.disconnect() - logging.info("Successfully closed all connections.") + try: + mqtt_client.loop_stop() + mqtt_client.disconnect() + except: + pass + logging.info("Shutdown complete") if __name__ == "__main__": main_loop()