diff --git a/config.py b/config.py index 1664d63..8be72e5 100644 --- a/config.py +++ b/config.py @@ -1,4 +1,5 @@ import time +import os # Modbus configuration MODBUS_HOSTS = [ @@ -14,22 +15,22 @@ MODBUS_PORT = 505 UNIT_ID = 1 # MQTT configuration -MQTT_BROKER = "mqtt.service.mesh" -MQTT_PORT = 1883 +MQTT_BROKER = os.getenv("MQTT_BROKER", "mqtt.service.mesh") +MQTT_PORT = int(os.getenv("MQTT_PORT", 1883)) # Legacy topic - now using dynamic topic structure: {location}/{sensor_type}/data MQTT_TOPIC = "Temperature_Humidity" # Keep for backward compatibility MQTT_CLIENT_ID = f"modbus-mqtt-client-{int(time.time())}" -MQTT_USERNAME = "relay" -MQTT_PASSWORD = "Sey@K9c&Q4^" +MQTT_USERNAME = os.getenv("MQTT_USERNAME", "relay") +MQTT_PASSWORD = os.getenv("MQTT_PASSWORD", "Sey@K9c&Q4^") # Read and publish cycle configuration (seconds) -PUBLISH_INTERVAL = 10 +PUBLISH_INTERVAL = int(os.getenv("PUBLISH_INTERVAL", 10)) # Health check and alerting configuration -HEALTH_CHECK_PORT = 8080 -HEALTH_CHECK_ENABLED = True +HEALTH_CHECK_PORT = int(os.getenv("HEALTH_CHECK_PORT", 8080)) +HEALTH_CHECK_ENABLED = os.getenv("HEALTH_CHECK_ENABLED", "true").lower() in ["true", "1", "yes"] # Alerting configuration -ALERTING_ENABLED = True -SENSOR_TIMEOUT_THRESHOLD = 3 # Number of consecutive failures before alert -RECOVERY_CONFIRMATION_COUNT = 2 # Number of consecutive successes to confirm recovery +ALERTING_ENABLED = os.getenv("ALERTING_ENABLED", "true").lower() in ["true", "1", "yes"] +SENSOR_TIMEOUT_THRESHOLD = int(os.getenv("SENSOR_TIMEOUT_THRESHOLD", 3)) # Number of consecutive failures before alert +RECOVERY_CONFIRMATION_COUNT = int(os.getenv("RECOVERY_CONFIRMATION_COUNT", 2)) # Number of consecutive successes to confirm recovery diff --git a/health_check.py b/health_check.py index 9b278b3..704b298 100644 --- a/health_check.py +++ b/health_check.py @@ -4,6 +4,8 @@ from http.server import HTTPServer, BaseHTTPRequestHandler from datetime import datetime, timezone import logging import time +import socket +import os from config import HEALTH_CHECK_PORT, HEALTH_CHECK_ENABLED @@ -13,6 +15,8 @@ class HealthCheckHandler(BaseHTTPRequestHandler): self.send_health_response() elif self.path == '/sensors': self.send_sensors_status() + elif self.path == '/ready': + self.send_readiness_response() else: self.send_response(404) self.end_headers() @@ -33,16 +37,24 @@ class HealthCheckHandler(BaseHTTPRequestHandler): "sensors": { "total": summary.get('total_sensors', 0), "online": summary.get('online_sensors', 0), + "offline": summary.get('offline_sensors', 0), + "unknown": summary.get('unknown_sensors', 0), "health_percentage": summary.get('health_percentage', 0.0) - } + }, + "uptime": self._get_uptime(), + "host": socket.gethostname() } - # Consider the service unhealthy if no sensors are working - if summary.get('total_sensors', 0) > 0 and summary.get('online_sensors', 0) == 0: - # If we have sensors configured but none are online, report as degraded but still healthy - # (since the health check server itself is working) - health_data["status"] = "degraded" - health_data["message"] = "All sensors offline" + # Service is healthy if the health check server is running + # Don't mark as unhealthy just because sensors are offline + # That's what the degraded status is for + if summary.get('total_sensors', 0) > 0: + if summary.get('online_sensors', 0) == 0: + health_data["status"] = "degraded" + health_data["message"] = "All sensors offline but service is running" + elif summary.get('health_percentage', 0) < 50: + health_data["status"] = "degraded" + health_data["message"] = f"Low sensor health: {summary.get('health_percentage', 0):.1f}%" except Exception as e: # If we can't get sensor status, still report as healthy since the service is running @@ -52,44 +64,115 @@ class HealthCheckHandler(BaseHTTPRequestHandler): "timestamp": datetime.now(timezone.utc).isoformat(), "service": "modbus-mqtt-bridge", "version": "1.0.0", - "message": "Service running, sensor status unavailable" + "message": "Service running, sensor status unavailable", + "uptime": self._get_uptime(), + "host": socket.gethostname() } self.send_response(200) self.send_header('Content-type', 'application/json') + self.send_header('Cache-Control', 'no-cache') self.end_headers() self.wfile.write(json.dumps(health_data, indent=2).encode()) + def send_readiness_response(self): + """Send readiness probe response - checks if service is ready to serve""" + try: + # Check if main components are initialized + from sensor_tracker import get_sensor_tracker + sensor_tracker = get_sensor_tracker() + + ready_data = { + "ready": True, + "timestamp": datetime.now(timezone.utc).isoformat(), + "service": "modbus-mqtt-bridge", + "checks": { + "sensor_tracker": True, + "health_server": True + } + } + + self.send_response(200) + self.send_header('Content-type', 'application/json') + self.send_header('Cache-Control', 'no-cache') + self.end_headers() + self.wfile.write(json.dumps(ready_data, indent=2).encode()) + + except Exception as e: + logging.error(f"Readiness check failed: {e}") + ready_data = { + "ready": False, + "timestamp": datetime.now(timezone.utc).isoformat(), + "service": "modbus-mqtt-bridge", + "error": str(e) + } + + self.send_response(503) + self.send_header('Content-type', 'application/json') + self.send_header('Cache-Control', 'no-cache') + self.end_headers() + self.wfile.write(json.dumps(ready_data, indent=2).encode()) + def send_sensors_status(self): """Send detailed sensor status""" - # Get sensor status from the global sensor tracker - from sensor_tracker import get_all_sensor_status - - sensors_status = get_all_sensor_status() - - self.send_response(200) - self.send_header('Content-type', 'application/json') - self.end_headers() - self.wfile.write(json.dumps(sensors_status, indent=2).encode()) + try: + # Get sensor status from the global sensor tracker + from sensor_tracker import get_all_sensor_status + sensors_status = get_all_sensor_status() + + self.send_response(200) + self.send_header('Content-type', 'application/json') + self.send_header('Cache-Control', 'no-cache') + self.end_headers() + self.wfile.write(json.dumps(sensors_status, indent=2).encode()) + + except Exception as e: + logging.error(f"Error getting sensor status: {e}") + error_response = { + "error": "Failed to get sensor status", + "message": str(e), + "timestamp": datetime.now(timezone.utc).isoformat() + } + + self.send_response(500) + self.send_header('Content-type', 'application/json') + self.end_headers() + self.wfile.write(json.dumps(error_response, indent=2).encode()) + + def _get_uptime(self): + """Get service uptime""" + try: + with open('/proc/uptime', 'r') as f: + uptime_seconds = float(f.readline().split()[0]) + return f"{uptime_seconds:.1f}s" + except: + return "unknown" def log_message(self, format, *args): """Override to use our logging system""" logging.info(f"Health Check - {format % args}") class HealthCheckServer: - def __init__(self, port=HEALTH_CHECK_PORT): - self.port = port + def __init__(self, port=None): + self.port = port or int(os.getenv('HEALTH_CHECK_PORT', HEALTH_CHECK_PORT)) self.server = None self.thread = None + self.started = False def start(self): """Start the health check server in a separate thread""" if not HEALTH_CHECK_ENABLED: logging.info("Health check server is disabled") - return + return False try: logging.info(f"Attempting to start health check server on port {self.port}") + + # Check if port is available + if not self._is_port_available(self.port): + logging.error(f"Port {self.port} is already in use. Cannot start health check server.") + return False + # Bind to all interfaces to make it accessible from outside container self.server = HTTPServer(('0.0.0.0', self.port), HealthCheckHandler) @@ -99,24 +182,50 @@ class HealthCheckServer: self.thread = threading.Thread(target=self._serve_with_error_handling, daemon=True) self.thread.start() - # Give the server a moment to start - time.sleep(0.5) + # Give the server a moment to start and verify it's working + time.sleep(1) - logging.info(f"Health check server started on 0.0.0.0:{self.port}") - logging.info(f"Health check endpoints:") - logging.info(f" - http://0.0.0.0:{self.port}/health") - logging.info(f" - http://0.0.0.0:{self.port}/sensors") - logging.info("Health check server is ready for external health checks") + if self._test_health_endpoint(): + self.started = True + logging.info(f"Health check server started successfully on 0.0.0.0:{self.port}") + logging.info(f"Health check endpoints:") + logging.info(f" - http://0.0.0.0:{self.port}/health") + logging.info(f" - http://0.0.0.0:{self.port}/sensors") + logging.info(f" - http://0.0.0.0:{self.port}/ready") + logging.info("Health check server is ready for external health checks") + return True + else: + logging.error("Health check server started but endpoints are not responding") + return False except OSError as e: if e.errno == 98: # Address already in use logging.error(f"Port {self.port} is already in use. Cannot start health check server.") else: logging.error(f"OS error starting health check server: {e}") - raise e + return False except Exception as e: logging.error(f"Failed to start health check server: {e}") - raise e # Re-raise to make the issue visible + return False + + def _is_port_available(self, port): + """Check if port is available""" + try: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(('0.0.0.0', port)) + return True + except OSError: + return False + + def _test_health_endpoint(self): + """Test if health endpoint is responding""" + try: + import urllib.request + with urllib.request.urlopen(f'http://localhost:{self.port}/health', timeout=5) as response: + return response.status == 200 + except Exception as e: + logging.warning(f"Health endpoint test failed: {e}") + return False def _serve_with_error_handling(self): """Serve forever with error handling""" @@ -128,10 +237,16 @@ class HealthCheckServer: logging.error("Health check server is None, cannot serve requests") except Exception as e: logging.error(f"Health check server error: {e}", exc_info=True) + self.started = False def stop(self): """Stop the health check server""" if self.server: self.server.shutdown() self.server.server_close() + self.started = False logging.info("Health check server stopped") + + def is_running(self): + """Check if server is running""" + return self.started and self.thread and self.thread.is_alive() diff --git a/main.py b/main.py index ded51d4..650089f 100644 --- a/main.py +++ b/main.py @@ -13,14 +13,67 @@ Author: POE Project import time import logging +import sys +import signal +from health_check import HealthCheckServer from sensor_bridge import main_loop +# Global health server for cleanup +health_server = None + +def signal_handler(signum, frame): + """Handle shutdown signals gracefully""" + global health_server + logging.info(f"Received signal {signum}, shutting down gracefully...") + if health_server: + health_server.stop() + sys.exit(0) + if __name__ == "__main__": - logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + # Setup logging + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[ + logging.StreamHandler(sys.stdout), + ] + ) + + # Setup signal handlers + signal.signal(signal.SIGTERM, signal_handler) + signal.signal(signal.SIGINT, signal_handler) + logging.info("POE Sensor Bridge starting up...") - # Give the system a moment to initialize - time.sleep(2) - - logging.info("Starting main sensor loop...") - main_loop() \ No newline at end of file + try: + # Start health check server first + logging.info("Starting health check server...") + health_server = HealthCheckServer() + + if health_server.start(): + logging.info("Health check server started successfully") + + # Give health check server time to be ready + time.sleep(3) + + # Verify health check is working + if health_server.is_running(): + logging.info("Health check server verified as running") + logging.info("Starting main sensor loop...") + main_loop() + else: + logging.error("Health check server failed to start properly") + sys.exit(1) + else: + logging.error("Failed to start health check server") + sys.exit(1) + + except KeyboardInterrupt: + logging.info("Received keyboard interrupt, shutting down...") + except Exception as e: + logging.error(f"Fatal error: {e}", exc_info=True) + sys.exit(1) + finally: + if health_server: + health_server.stop() + logging.info("POE Sensor Bridge shutdown complete") \ No newline at end of file diff --git a/poe-sensor.nomad b/poe-sensor.nomad index e052b41..b0145f6 100644 --- a/poe-sensor.nomad +++ b/poe-sensor.nomad @@ -10,28 +10,29 @@ job "poe-sensor" { group "sensor-bridge" { count = 1 - # Network configuration - using host mode for Modbus access + # Network configuration - using bridge mode with port mapping for better isolation network { - mode = "host" + mode = "bridge" port "health" { static = 8080 + to = 8080 } } - # Restart policy + # Restart policy - more lenient for startup issues restart { - attempts = 3 + attempts = 5 interval = "30m" - delay = "15s" + delay = "30s" mode = "fail" } # Update strategy update { max_parallel = 1 - min_healthy_time = "60s" - healthy_deadline = "5m" - progress_deadline = "10m" + min_healthy_time = "120s" # Increased from 60s + healthy_deadline = "10m" # Increased from 5m + progress_deadline = "15m" # Increased from 10m auto_revert = true canary = 0 } @@ -50,12 +51,12 @@ job "poe-sensor" { check { type = "http" path = "/health" - interval = "60s" - timeout = "15s" - initial_status = "passing" + interval = "30s" # Reduced frequency + timeout = "30s" # Increased timeout + initial_status = "critical" # Start as critical until proven healthy check_restart { - limit = 2 - grace = "15s" + limit = 3 + grace = "30s" # More time for graceful shutdown } } } @@ -68,7 +69,21 @@ job "poe-sensor" { command = "/bin/bash" args = [ "-c", - "cd local/poe-sensor && apt-get update -qq && apt-get install -y procps && python -m pip install --upgrade pip && python -m pip install -r requirements.txt && python -c 'import pymodbus, paho.mqtt.client; print(\"Dependencies installed successfully\")' && python main.py" + <