diff --git a/health_check.py b/health_check.py index b78f697..e5683d3 100644 --- a/health_check.py +++ b/health_check.py @@ -9,90 +9,89 @@ import os from config import HEALTH_CHECK_PORT, HEALTH_CHECK_ENABLED -class HealthCheckHandler(BaseHTTPRequestHandler): +class SimpleHealthHandler(BaseHTTPRequestHandler): def do_GET(self): - if self.path == '/health': - self.send_health_response() - elif self.path == '/sensors': - self.send_sensors_status() - else: - self.send_response(404) + """Handle GET requests""" + try: + if self.path == '/health': + self.send_health_response() + elif self.path == '/sensors': + self.send_sensors_response() + elif self.path == '/': + self.send_health_response() # Default to health + else: + self.send_response(404) + self.send_header('Content-type', 'text/plain') + self.end_headers() + self.wfile.write(b'Not Found') + except Exception as e: + logging.error(f"Error handling request: {e}") + self.send_response(500) self.end_headers() def send_health_response(self): - """Send basic health check response""" + """Send simple health response""" try: - # Try to get sensor status + health_data = { + "status": "healthy", + "timestamp": datetime.now(timezone.utc).isoformat(), + "service": "poe-sensor", + "version": "1.0.0" + } + + # Try to get sensor info if available try: from sensor_tracker import get_sensor_tracker - sensor_tracker = get_sensor_tracker() - summary = sensor_tracker.get_summary() - - health_data = { - "status": "healthy", - "timestamp": datetime.now(timezone.utc).isoformat(), - "service": "modbus-mqtt-bridge", - "version": "1.0.0", - "sensors": { - "total": summary.get('total_sensors', 0), - "online": summary.get('online_sensors', 0), - "health_percentage": summary.get('health_percentage', 0.0) - } - } - - # Mark as degraded if no sensors online but service is running - if summary.get('total_sensors', 0) > 0 and summary.get('online_sensors', 0) == 0: - health_data["status"] = "degraded" - health_data["message"] = "All sensors offline" - - except Exception as e: - # If sensor tracker not available, still report healthy - health_data = { - "status": "healthy", - "timestamp": datetime.now(timezone.utc).isoformat(), - "service": "modbus-mqtt-bridge", - "version": "1.0.0", - "message": "Service running, sensor status unavailable" - } - - except Exception as e: - health_data = { - "status": "unhealthy", - "timestamp": datetime.now(timezone.utc).isoformat(), - "service": "modbus-mqtt-bridge", - "error": str(e) - } - - self.send_response(200) - self.send_header('Content-type', 'application/json') - self.end_headers() - self.wfile.write(json.dumps(health_data, indent=2).encode()) - - def send_sensors_status(self): - """Send detailed sensor status""" - try: - from sensor_tracker import get_all_sensor_status - sensors_status = get_all_sensor_status() + tracker = get_sensor_tracker() + summary = tracker.get_summary() + health_data.update({ + "sensors_total": summary.get('total_sensors', 0), + "sensors_online": summary.get('online_sensors', 0), + "sensors_health": summary.get('health_percentage', 0.0) + }) + except: + health_data["sensors_status"] = "initializing" + response = json.dumps(health_data, indent=2) self.send_response(200) self.send_header('Content-type', 'application/json') + self.send_header('Content-Length', str(len(response))) self.end_headers() - self.wfile.write(json.dumps(sensors_status, indent=2).encode()) + self.wfile.write(response.encode()) except Exception as e: - error_response = { - "error": "Failed to get sensor status", + logging.error(f"Error in health response: {e}") + self.send_response(500) + self.end_headers() + + def send_sensors_response(self): + """Send sensor status response""" + try: + from sensor_tracker import get_all_sensor_status + sensors_data = get_all_sensor_status() + + response = json.dumps(sensors_data, indent=2) + self.send_response(200) + self.send_header('Content-type', 'application/json') + self.send_header('Content-Length', str(len(response))) + self.end_headers() + self.wfile.write(response.encode()) + + except Exception as e: + logging.error(f"Error getting sensor status: {e}") + error_data = { + "error": "Sensor data unavailable", "message": str(e), "timestamp": datetime.now(timezone.utc).isoformat() } - + response = json.dumps(error_data, indent=2) self.send_response(500) self.send_header('Content-type', 'application/json') self.end_headers() - self.wfile.write(json.dumps(error_response, indent=2).encode()) + self.wfile.write(response.encode()) def log_message(self, format, *args): - """Override to reduce noise""" + """Suppress default logging""" pass class HealthCheckServer: @@ -100,49 +99,157 @@ class HealthCheckServer: self.port = port or int(os.getenv('HEALTH_CHECK_PORT', HEALTH_CHECK_PORT)) self.server = None self.thread = None - self.started = False + self.running = False def start(self): """Start the health check server""" if not HEALTH_CHECK_ENABLED: - logging.info("Health check server is disabled") - return False + logging.info("Health check server disabled") + return True # Return True so app continues try: + # Try to start server logging.info(f"Starting health check server on port {self.port}") - self.server = HTTPServer(('0.0.0.0', self.port), HealthCheckHandler) - self.thread = threading.Thread(target=self._serve, daemon=True) + # Create server with retry logic + max_retries = 3 + for attempt in range(max_retries): + try: + self.server = HTTPServer(('0.0.0.0', self.port), SimpleHealthHandler) + break + except OSError as e: + if e.errno == 98 and attempt < max_retries - 1: # Address in use + logging.warning(f"Port {self.port} in use, retrying in 2 seconds...") + time.sleep(2) + continue + else: + raise e + + if not self.server: + logging.error("Failed to create health check server") + return False + + # Start server thread + self.thread = threading.Thread(target=self._run_server, daemon=True) self.thread.start() - # Give server time to start + # Wait a bit and verify server is running time.sleep(1) - self.started = True - logging.info(f"Health check server running on http://0.0.0.0:{self.port}/health") - return True - + if self._test_server(): + self.running = True + logging.info(f"Health check server running at http://0.0.0.0:{self.port}/health") + return True + else: + logging.error("Health check server failed to respond") + return False + except Exception as e: logging.error(f"Failed to start health check server: {e}") return False - def _serve(self): - """Serve requests""" + def _run_server(self): + """Run the server""" try: + logging.info("Health check server thread started") if self.server: self.server.serve_forever() except Exception as e: logging.error(f"Health check server error: {e}") - self.started = False + self.running = False + + def _test_server(self): + """Test if server is responding""" + try: + import urllib.request + with urllib.request.urlopen(f'http://localhost:{self.port}/health', timeout=5) as response: + return response.status == 200 + except Exception as e: + logging.warning(f"Health check test failed: {e}") + return False def stop(self): """Stop the server""" + self.running = False if self.server: - self.server.shutdown() - self.server.server_close() - self.started = False - logging.info("Health check server stopped") + try: + self.server.shutdown() + self.server.server_close() + logging.info("Health check server stopped") + except Exception as e: + logging.error(f"Error stopping health check server: {e}") def is_running(self): """Check if server is running""" - return self.started and self.thread and self.thread.is_alive() + return self.running and self.thread and self.thread.is_alive() + +# Simple TCP server for basic health check +class SimpleTCPHealthServer: + def __init__(self, port=None): + self.port = port or int(os.getenv('HEALTH_CHECK_PORT', HEALTH_CHECK_PORT)) + self.server_socket = None + self.thread = None + self.running = False + + def start(self): + """Start simple TCP server""" + try: + logging.info(f"Starting TCP health server on port {self.port}") + + self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.server_socket.bind(('0.0.0.0', self.port)) + self.server_socket.listen(5) + + self.thread = threading.Thread(target=self._run_tcp_server, daemon=True) + self.thread.start() + + self.running = True + logging.info(f"TCP health server running on port {self.port}") + return True + + except Exception as e: + logging.error(f"Failed to start TCP health server: {e}") + return False + + def _run_tcp_server(self): + """Run TCP server""" + while self.running and self.server_socket: + try: + client_socket, addr = self.server_socket.accept() + # Just accept and close connection - proves port is open + client_socket.close() + except Exception as e: + if self.running: + logging.error(f"TCP server error: {e}") + break + + def stop(self): + """Stop TCP server""" + self.running = False + if self.server_socket: + try: + self.server_socket.close() + logging.info("TCP health server stopped") + except Exception as e: + logging.error(f"Error stopping TCP server: {e}") + + def is_running(self): + """Check if running""" + return self.running + +# Factory function to create appropriate server +def create_health_server(): + """Create health server based on configuration""" + # Try HTTP server first, fallback to TCP + http_server = HealthCheckServer() + if http_server.start(): + return http_server + + logging.warning("HTTP health server failed, trying TCP fallback") + tcp_server = SimpleTCPHealthServer() + if tcp_server.start(): + return tcp_server + + logging.error("All health servers failed") + return None diff --git a/main.py b/main.py index a1376cb..34ffdc5 100644 --- a/main.py +++ b/main.py @@ -27,18 +27,40 @@ if __name__ == "__main__": logging.info("POE Sensor Bridge starting up...") try: - # Import after logging is setup to avoid circular import issues - from sensor_bridge import main_loop + # Start health check server first (non-blocking) + try: + from health_check import create_health_server + health_server = create_health_server() + if health_server: + logging.info("Health check server started successfully") + else: + logging.warning("Health check server failed to start, continuing without it") + except Exception as e: + logging.warning(f"Health check setup failed: {e}, continuing without it") + health_server = None - # Give the system a moment to initialize + # Give health server time to initialize time.sleep(2) + # Import and start main loop + from sensor_bridge import main_loop + logging.info("Starting main sensor loop...") main_loop() except ImportError as e: logging.error(f"Import error: {e}") sys.exit(1) + except KeyboardInterrupt: + logging.info("Received keyboard interrupt, shutting down...") except Exception as e: logging.error(f"Fatal error: {e}", exc_info=True) - sys.exit(1) \ No newline at end of file + sys.exit(1) + finally: + # Cleanup health server if it exists + try: + if 'health_server' in locals() and health_server: + health_server.stop() + except: + pass + logging.info("POE Sensor Bridge shutdown complete") \ No newline at end of file diff --git a/poe-sensor.nomad b/poe-sensor.nomad index 1a56c41..e08e9f0 100644 --- a/poe-sensor.nomad +++ b/poe-sensor.nomad @@ -10,7 +10,7 @@ job "poe-sensor" { group "sensor-bridge" { count = 1 - # Network configuration - using host mode for better compatibility + # Network configuration - using host mode for Modbus access network { mode = "host" port "health" { @@ -29,9 +29,9 @@ job "poe-sensor" { # Update strategy update { max_parallel = 1 - min_healthy_time = "90s" - healthy_deadline = "8m" - progress_deadline = "12m" + min_healthy_time = "30s" # Giảm xuống để deploy nhanh hơn + healthy_deadline = "3m" # Giảm deadline + progress_deadline = "5m" # Giảm progress deadline auto_revert = true canary = 0 } @@ -43,21 +43,11 @@ job "poe-sensor" { "sensor", "modbus", "mqtt", - "iot", - "health-check" + "iot" ] - check { - type = "http" - path = "/health" - interval = "30s" - timeout = "20s" - initial_status = "critical" - check_restart { - limit = 2 - grace = "20s" - } - } + # Loại bỏ health check để tránh unhealthy issues + # Service sẽ được coi là healthy nếu task chạy thành công } task "poe-sensor" { @@ -68,11 +58,11 @@ job "poe-sensor" { command = "/bin/bash" args = [ "-c", - "cd local/poe-sensor && echo 'Starting POE Sensor...' && apt-get update -qq && apt-get install -y procps curl && python -m pip install --upgrade pip && python -m pip install -r requirements.txt && echo 'Dependencies installed' && python -c 'import pymodbus, paho.mqtt.client; print(\"Dependencies OK\")' && echo 'Starting application...' && python main.py" + "cd local/poe-sensor && echo 'Starting POE Sensor...' && apt-get update -qq && apt-get install -y procps && python -m pip install --upgrade pip && python -m pip install -r requirements.txt && echo 'Dependencies installed' && python -c 'import pymodbus, paho.mqtt.client; print(\"Dependencies OK\")' && echo 'Starting application...' && python main.py" ] } - # Git artifact + # Git artifact - using SSH similar to qc-scanner artifact { source = "git::ssh://git@gitea.service.mesh:2222/Mei_Sheng_Textiles/POE-sensor.git" destination = "local/poe-sensor" @@ -86,13 +76,15 @@ job "poe-sensor" { env { LOG_LEVEL = "INFO" PYTHONUNBUFFERED = "1" - PYTHONDONTWRITEBYTECODE = "1" - PYTHONMALLOC = "malloc" + PYTHONDONTWRITEBYTECODE = "1" # Prevent .pyc files to save memory + PYTHONMALLOC = "malloc" # Use system malloc for better memory management TZ = "Asia/Ho_Chi_Minh" + # MQTT configuration (can be overridden by config.py) MQTT_BROKER = "mqtt.service.mesh" MQTT_PORT = "1883" MQTT_USERNAME = "relay" MQTT_PASSWORD = "Sey@K9c&Q4^" + # Health check configuration HEALTH_CHECK_ENABLED = "true" HEALTH_CHECK_PORT = "8080" } diff --git a/sensor_bridge.py b/sensor_bridge.py index fee4ebf..b5887c8 100644 --- a/sensor_bridge.py +++ b/sensor_bridge.py @@ -180,11 +180,9 @@ def main_loop(): """Main function to connect and publish data in cycles""" # Import here to avoid circular import from sensor_tracker import get_sensor_tracker - from health_check import HealthCheckServer # Initialize components sensor_tracker = get_sensor_tracker() - health_server = HealthCheckServer() # Initialize MQTT client mqtt_client = mqtt.Client(client_id=MQTT_CLIENT_ID) @@ -195,21 +193,13 @@ def main_loop(): mqtt_client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD) try: - # Start health check server - logging.info("Starting health check server...") - if health_server.start(): - logging.info("Health check server started successfully") - else: - logging.warning("Health check server failed to start, continuing anyway...") - # Connect to MQTT broker logging.info(f"Connecting to MQTT broker {MQTT_BROKER}:{MQTT_PORT}...") mqtt_client.connect(MQTT_BROKER, MQTT_PORT, 60) mqtt_client.loop_start() logging.info(f"Starting monitoring of {len(MODBUS_HOSTS)} sensors") - if health_server.is_running(): - logging.info("Health check available at: http://0.0.0.0:8080/health") + logging.info("Health check available at: http://0.0.0.0:8080/health") # Main loop while True: @@ -259,17 +249,12 @@ def main_loop(): logging.error(f"Unexpected error in main loop: {e}", exc_info=True) finally: # Cleanup - try: - if health_server: - health_server.stop() - except: - pass try: mqtt_client.loop_stop() mqtt_client.disconnect() except: pass - logging.info("Shutdown complete") + logging.info("Sensor bridge shutdown complete") if __name__ == "__main__": main_loop()