diff --git a/health_check.py b/health_check.py index 704b298..b78f697 100644 --- a/health_check.py +++ b/health_check.py @@ -15,8 +15,6 @@ class HealthCheckHandler(BaseHTTPRequestHandler): self.send_health_response() elif self.path == '/sensors': self.send_sensors_status() - elif self.path == '/ready': - self.send_readiness_response() else: self.send_response(404) self.end_headers() @@ -24,110 +22,64 @@ class HealthCheckHandler(BaseHTTPRequestHandler): def send_health_response(self): """Send basic health check response""" try: - # Get basic application status - from sensor_tracker import get_sensor_tracker - sensor_tracker = get_sensor_tracker() - summary = sensor_tracker.get_summary() - - health_data = { - "status": "healthy", - "timestamp": datetime.now(timezone.utc).isoformat(), - "service": "modbus-mqtt-bridge", - "version": "1.0.0", - "sensors": { - "total": summary.get('total_sensors', 0), - "online": summary.get('online_sensors', 0), - "offline": summary.get('offline_sensors', 0), - "unknown": summary.get('unknown_sensors', 0), - "health_percentage": summary.get('health_percentage', 0.0) - }, - "uptime": self._get_uptime(), - "host": socket.gethostname() - } - - # Service is healthy if the health check server is running - # Don't mark as unhealthy just because sensors are offline - # That's what the degraded status is for - if summary.get('total_sensors', 0) > 0: - if summary.get('online_sensors', 0) == 0: - health_data["status"] = "degraded" - health_data["message"] = "All sensors offline but service is running" - elif summary.get('health_percentage', 0) < 50: - health_data["status"] = "degraded" - health_data["message"] = f"Low sensor health: {summary.get('health_percentage', 0):.1f}%" - - except Exception as e: - # If we can't get sensor status, still report as healthy since the service is running - logging.warning(f"Could not get sensor status for health check: {e}") - health_data = { - "status": "healthy", - "timestamp": datetime.now(timezone.utc).isoformat(), - "service": "modbus-mqtt-bridge", - "version": "1.0.0", - "message": "Service running, sensor status unavailable", - "uptime": self._get_uptime(), - "host": socket.gethostname() - } - - self.send_response(200) - self.send_header('Content-type', 'application/json') - self.send_header('Cache-Control', 'no-cache') - self.end_headers() - self.wfile.write(json.dumps(health_data, indent=2).encode()) - - def send_readiness_response(self): - """Send readiness probe response - checks if service is ready to serve""" - try: - # Check if main components are initialized - from sensor_tracker import get_sensor_tracker - sensor_tracker = get_sensor_tracker() - - ready_data = { - "ready": True, - "timestamp": datetime.now(timezone.utc).isoformat(), - "service": "modbus-mqtt-bridge", - "checks": { - "sensor_tracker": True, - "health_server": True + # Try to get sensor status + try: + from sensor_tracker import get_sensor_tracker + sensor_tracker = get_sensor_tracker() + summary = sensor_tracker.get_summary() + + health_data = { + "status": "healthy", + "timestamp": datetime.now(timezone.utc).isoformat(), + "service": "modbus-mqtt-bridge", + "version": "1.0.0", + "sensors": { + "total": summary.get('total_sensors', 0), + "online": summary.get('online_sensors', 0), + "health_percentage": summary.get('health_percentage', 0.0) + } } - } - - self.send_response(200) - self.send_header('Content-type', 'application/json') - self.send_header('Cache-Control', 'no-cache') - self.end_headers() - self.wfile.write(json.dumps(ready_data, indent=2).encode()) - + + # Mark as degraded if no sensors online but service is running + if summary.get('total_sensors', 0) > 0 and summary.get('online_sensors', 0) == 0: + health_data["status"] = "degraded" + health_data["message"] = "All sensors offline" + + except Exception as e: + # If sensor tracker not available, still report healthy + health_data = { + "status": "healthy", + "timestamp": datetime.now(timezone.utc).isoformat(), + "service": "modbus-mqtt-bridge", + "version": "1.0.0", + "message": "Service running, sensor status unavailable" + } + except Exception as e: - logging.error(f"Readiness check failed: {e}") - ready_data = { - "ready": False, + health_data = { + "status": "unhealthy", "timestamp": datetime.now(timezone.utc).isoformat(), "service": "modbus-mqtt-bridge", "error": str(e) } - - self.send_response(503) - self.send_header('Content-type', 'application/json') - self.send_header('Cache-Control', 'no-cache') - self.end_headers() - self.wfile.write(json.dumps(ready_data, indent=2).encode()) + + self.send_response(200) + self.send_header('Content-type', 'application/json') + self.end_headers() + self.wfile.write(json.dumps(health_data, indent=2).encode()) def send_sensors_status(self): """Send detailed sensor status""" try: - # Get sensor status from the global sensor tracker from sensor_tracker import get_all_sensor_status sensors_status = get_all_sensor_status() self.send_response(200) self.send_header('Content-type', 'application/json') - self.send_header('Cache-Control', 'no-cache') self.end_headers() self.wfile.write(json.dumps(sensors_status, indent=2).encode()) except Exception as e: - logging.error(f"Error getting sensor status: {e}") error_response = { "error": "Failed to get sensor status", "message": str(e), @@ -139,18 +91,9 @@ class HealthCheckHandler(BaseHTTPRequestHandler): self.end_headers() self.wfile.write(json.dumps(error_response, indent=2).encode()) - def _get_uptime(self): - """Get service uptime""" - try: - with open('/proc/uptime', 'r') as f: - uptime_seconds = float(f.readline().split()[0]) - return f"{uptime_seconds:.1f}s" - except: - return "unknown" - def log_message(self, format, *args): - """Override to use our logging system""" - logging.info(f"Health Check - {format % args}") + """Override to reduce noise""" + pass class HealthCheckServer: def __init__(self, port=None): @@ -160,87 +103,40 @@ class HealthCheckServer: self.started = False def start(self): - """Start the health check server in a separate thread""" + """Start the health check server""" if not HEALTH_CHECK_ENABLED: logging.info("Health check server is disabled") return False try: - logging.info(f"Attempting to start health check server on port {self.port}") - - # Check if port is available - if not self._is_port_available(self.port): - logging.error(f"Port {self.port} is already in use. Cannot start health check server.") - return False - - # Bind to all interfaces to make it accessible from outside container + logging.info(f"Starting health check server on port {self.port}") self.server = HTTPServer(('0.0.0.0', self.port), HealthCheckHandler) - # Test if the server can actually bind to the port - logging.info(f"Successfully bound to 0.0.0.0:{self.port}") - - self.thread = threading.Thread(target=self._serve_with_error_handling, daemon=True) + self.thread = threading.Thread(target=self._serve, daemon=True) self.thread.start() - # Give the server a moment to start and verify it's working + # Give server time to start time.sleep(1) - if self._test_health_endpoint(): - self.started = True - logging.info(f"Health check server started successfully on 0.0.0.0:{self.port}") - logging.info(f"Health check endpoints:") - logging.info(f" - http://0.0.0.0:{self.port}/health") - logging.info(f" - http://0.0.0.0:{self.port}/sensors") - logging.info(f" - http://0.0.0.0:{self.port}/ready") - logging.info("Health check server is ready for external health checks") - return True - else: - logging.error("Health check server started but endpoints are not responding") - return False + self.started = True + logging.info(f"Health check server running on http://0.0.0.0:{self.port}/health") + return True - except OSError as e: - if e.errno == 98: # Address already in use - logging.error(f"Port {self.port} is already in use. Cannot start health check server.") - else: - logging.error(f"OS error starting health check server: {e}") - return False except Exception as e: logging.error(f"Failed to start health check server: {e}") return False - def _is_port_available(self, port): - """Check if port is available""" + def _serve(self): + """Serve requests""" try: - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.bind(('0.0.0.0', port)) - return True - except OSError: - return False - - def _test_health_endpoint(self): - """Test if health endpoint is responding""" - try: - import urllib.request - with urllib.request.urlopen(f'http://localhost:{self.port}/health', timeout=5) as response: - return response.status == 200 - except Exception as e: - logging.warning(f"Health endpoint test failed: {e}") - return False - - def _serve_with_error_handling(self): - """Serve forever with error handling""" - try: - logging.info("Health check server thread started, beginning to serve requests") if self.server: self.server.serve_forever() - else: - logging.error("Health check server is None, cannot serve requests") except Exception as e: - logging.error(f"Health check server error: {e}", exc_info=True) + logging.error(f"Health check server error: {e}") self.started = False def stop(self): - """Stop the health check server""" + """Stop the server""" if self.server: self.server.shutdown() self.server.server_close() diff --git a/main.py b/main.py index 650089f..a1376cb 100644 --- a/main.py +++ b/main.py @@ -14,66 +14,31 @@ Author: POE Project import time import logging import sys -import signal -from health_check import HealthCheckServer -from sensor_bridge import main_loop - -# Global health server for cleanup -health_server = None - -def signal_handler(signum, frame): - """Handle shutdown signals gracefully""" - global health_server - logging.info(f"Received signal {signum}, shutting down gracefully...") - if health_server: - health_server.stop() - sys.exit(0) +import os if __name__ == "__main__": - # Setup logging + # Setup logging first logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', - handlers=[ - logging.StreamHandler(sys.stdout), - ] + handlers=[logging.StreamHandler(sys.stdout)] ) - # Setup signal handlers - signal.signal(signal.SIGTERM, signal_handler) - signal.signal(signal.SIGINT, signal_handler) - logging.info("POE Sensor Bridge starting up...") try: - # Start health check server first - logging.info("Starting health check server...") - health_server = HealthCheckServer() + # Import after logging is setup to avoid circular import issues + from sensor_bridge import main_loop - if health_server.start(): - logging.info("Health check server started successfully") - - # Give health check server time to be ready - time.sleep(3) - - # Verify health check is working - if health_server.is_running(): - logging.info("Health check server verified as running") - logging.info("Starting main sensor loop...") - main_loop() - else: - logging.error("Health check server failed to start properly") - sys.exit(1) - else: - logging.error("Failed to start health check server") - sys.exit(1) - - except KeyboardInterrupt: - logging.info("Received keyboard interrupt, shutting down...") + # Give the system a moment to initialize + time.sleep(2) + + logging.info("Starting main sensor loop...") + main_loop() + + except ImportError as e: + logging.error(f"Import error: {e}") + sys.exit(1) except Exception as e: logging.error(f"Fatal error: {e}", exc_info=True) - sys.exit(1) - finally: - if health_server: - health_server.stop() - logging.info("POE Sensor Bridge shutdown complete") \ No newline at end of file + sys.exit(1) \ No newline at end of file diff --git a/poe-sensor.nomad b/poe-sensor.nomad index b0145f6..1a56c41 100644 --- a/poe-sensor.nomad +++ b/poe-sensor.nomad @@ -10,29 +10,28 @@ job "poe-sensor" { group "sensor-bridge" { count = 1 - # Network configuration - using bridge mode with port mapping for better isolation + # Network configuration - using host mode for better compatibility network { - mode = "bridge" + mode = "host" port "health" { static = 8080 - to = 8080 } } - # Restart policy - more lenient for startup issues + # Restart policy restart { - attempts = 5 + attempts = 3 interval = "30m" - delay = "30s" + delay = "15s" mode = "fail" } # Update strategy update { max_parallel = 1 - min_healthy_time = "120s" # Increased from 60s - healthy_deadline = "10m" # Increased from 5m - progress_deadline = "15m" # Increased from 10m + min_healthy_time = "90s" + healthy_deadline = "8m" + progress_deadline = "12m" auto_revert = true canary = 0 } @@ -51,12 +50,12 @@ job "poe-sensor" { check { type = "http" path = "/health" - interval = "30s" # Reduced frequency - timeout = "30s" # Increased timeout - initial_status = "critical" # Start as critical until proven healthy + interval = "30s" + timeout = "20s" + initial_status = "critical" check_restart { - limit = 3 - grace = "30s" # More time for graceful shutdown + limit = 2 + grace = "20s" } } } @@ -69,25 +68,11 @@ job "poe-sensor" { command = "/bin/bash" args = [ "-c", - < 32767: # Negative temperature in 2's complement + if raw_temperature > 32767: temperature = (raw_temperature - 65536) / 10.0 else: temperature = raw_temperature / 10.0 - # CO2: 1ppm resolution for standard sensor co2_ppm = raw_co2 - logging.info(f"Processed CWT values from {host_info['ip']} - Humidity: {humidity:.1f}%RH, Temperature: {temperature:.1f}°C, CO2: {co2_ppm}ppm") + logging.info(f"CWT from {host_info['ip']} - Temp: {temperature:.1f}°C, Humidity: {humidity:.1f}%RH, CO2: {co2_ppm}ppm") - # Prepare new topic structure: Location/{location_name}/{type}/... + # Publish data location = host_info["location"] sensor_type = "CO2-gas" current_time = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") - # Create base topic path base_topic = f"Location/{location}/{sensor_type}" - # Publish to individual topics topics_data = [ (f"{base_topic}/Time", current_time), (f"{base_topic}/Status", "online"), - (f"{base_topic}/Data/co2", co2_ppm), (f"{base_topic}/Data/temperature", round(temperature, 1)), - (f"{base_topic}/Data/humidity", round(humidity, 1)) + (f"{base_topic}/Data/humidity", round(humidity, 1)), + (f"{base_topic}/Data/co2", co2_ppm) ] - # Publish all data all_published = True for topic, value in topics_data: try: - # Convert value to string for MQTT payload = str(value) result = mqtt_client.publish(topic, payload) result.wait_for_publish() if result.is_published(): - logging.info(f"Published to '{topic}': {payload}") + logging.debug(f"Published to '{topic}': {payload}") else: logging.error(f"Failed to publish to '{topic}': {payload}") all_published = False @@ -197,23 +170,19 @@ def read_and_publish_cwt_co2(mqtt_client, modbus_client, host_info): logging.error(f"Error publishing to '{topic}': {e}") all_published = False - if all_published: - logging.info(f"Successfully published all CO2-gas data for {location}") - return True - else: - logging.error(f"Failed to publish some CO2-gas data from {host_info['ip']}") - return False - - except ModbusException as e: - logging.error(f"Modbus error from CWT sensor {host_info['ip']}: {e}", exc_info=True) - return False + return all_published + except Exception as e: - logging.error(f"Unexpected error from CWT sensor {host_info['ip']}: {e}", exc_info=True) + logging.error(f"Error in CWT CO2 reading: {e}") return False def main_loop(): """Main function to connect and publish data in cycles""" - # Initialize sensor tracker and health check server + # Import here to avoid circular import + from sensor_tracker import get_sensor_tracker + from health_check import HealthCheckServer + + # Initialize components sensor_tracker = get_sensor_tracker() health_server = HealthCheckServer() @@ -227,7 +196,11 @@ def main_loop(): try: # Start health check server - health_server.start() + logging.info("Starting health check server...") + if health_server.start(): + logging.info("Health check server started successfully") + else: + logging.warning("Health check server failed to start, continuing anyway...") # Connect to MQTT broker logging.info(f"Connecting to MQTT broker {MQTT_BROKER}:{MQTT_PORT}...") @@ -235,76 +208,68 @@ def main_loop(): mqtt_client.loop_start() logging.info(f"Starting monitoring of {len(MODBUS_HOSTS)} sensors") - logging.info("System status can be monitored at:") - logging.info(f" - Health: http://0.0.0.0:8080/health") - logging.info(f" - Sensors: http://0.0.0.0:8080/sensors") + if health_server.is_running(): + logging.info("Health check available at: http://0.0.0.0:8080/health") - # Main loop to read and publish data from all hosts + # Main loop while True: for host_info in MODBUS_HOSTS: modbus_client = None - error_message = None try: modbus_client = ModbusTcpClient( host=host_info["ip"], port=MODBUS_PORT, - timeout=10 # Reduced timeout from 30 to 10 seconds + timeout=10 ) - logging.info(f"Processing channel {host_info['location']} at {host_info['ip']}:{MODBUS_PORT}") + logging.info(f"Processing {host_info['location']} at {host_info['ip']}") success = read_and_publish_data(mqtt_client, modbus_client, host_info) if success: - # Record successful reading sensor_tracker.record_success(host_info, mqtt_client) - logging.info(f"Successfully processed {host_info['location']} ({host_info['ip']})") + logging.info(f"Successfully processed {host_info['location']}") else: error_message = f"Failed to read/publish data from {host_info['ip']}" sensor_tracker.record_failure(host_info, error_message, mqtt_client) - logging.warning(f"Failed to process {host_info['location']} ({host_info['ip']}), will retry next cycle.") + logging.warning(f"Failed to process {host_info['location']}") except Exception as e: error_message = f"Exception processing {host_info['ip']}: {str(e)}" sensor_tracker.record_failure(host_info, error_message, mqtt_client) - logging.error(f"Error processing {host_info['location']} ({host_info['ip']}): {e}", exc_info=True) + logging.error(f"Error processing {host_info['location']}: {e}") finally: - # Ensure modbus connection is always closed if modbus_client: try: modbus_client.close() - logging.debug(f"Closed connection to {host_info['ip']}") except Exception as close_error: - logging.warning(f"Error closing connection to {host_info['ip']}: {close_error}") - finally: - modbus_client = None # Explicit cleanup + logging.warning(f"Error closing connection: {close_error}") - # Add small delay between processing each sensor time.sleep(1) - # Log system summary every cycle + # Log summary summary = sensor_tracker.get_summary() - logging.info(f"Cycle completed - Online: {summary['online_sensors']}/{summary['total_sensors']} sensors " - f"({summary['health_percentage']:.1f}% health), " - f"Alerts: {summary['alerted_sensors']}") + logging.info(f"Cycle completed - Online: {summary['online_sensors']}/{summary['total_sensors']} sensors") - # Force garbage collection every cycle to prevent memory buildup gc.collect() - - logging.info(f"Waiting {PUBLISH_INTERVAL} seconds until next cycle...") + logging.info(f"Waiting {PUBLISH_INTERVAL} seconds...") time.sleep(PUBLISH_INTERVAL) except KeyboardInterrupt: - logging.info("Received stop signal from user, shutting down...") + logging.info("Received stop signal, shutting down...") except Exception as e: logging.error(f"Unexpected error in main loop: {e}", exc_info=True) finally: # Cleanup try: - health_server.stop() + if health_server: + health_server.stop() except: pass - mqtt_client.loop_stop() - mqtt_client.disconnect() - logging.info("Successfully closed all connections.") + try: + mqtt_client.loop_stop() + mqtt_client.disconnect() + except: + pass + logging.info("Shutdown complete") if __name__ == "__main__": main_loop()