Refactor health check server implementation in POE project. Renamed HealthCheckHandler to SimpleHealthHandler for clarity, improved error handling in health response and sensor status retrieval, and added default health response for root path. Enhanced server startup process with retry logic and introduced a SimpleTCPHealthServer as a fallback. Updated main.py to initialize health server non-blocking and ensure graceful shutdown. Adjusted Nomad job configuration for health check parameters and removed unnecessary health check definitions to prevent unhealthy issues.

This commit is contained in:
Naab2k3
2025-06-24 08:27:29 +07:00
parent 9b0f4f6236
commit 9f8ac5b5c2
4 changed files with 229 additions and 123 deletions

View File

@ -9,90 +9,89 @@ import os
from config import HEALTH_CHECK_PORT, HEALTH_CHECK_ENABLED
class HealthCheckHandler(BaseHTTPRequestHandler):
class SimpleHealthHandler(BaseHTTPRequestHandler):
def do_GET(self):
if self.path == '/health':
self.send_health_response()
elif self.path == '/sensors':
self.send_sensors_status()
else:
self.send_response(404)
"""Handle GET requests"""
try:
if self.path == '/health':
self.send_health_response()
elif self.path == '/sensors':
self.send_sensors_response()
elif self.path == '/':
self.send_health_response() # Default to health
else:
self.send_response(404)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(b'Not Found')
except Exception as e:
logging.error(f"Error handling request: {e}")
self.send_response(500)
self.end_headers()
def send_health_response(self):
"""Send basic health check response"""
"""Send simple health response"""
try:
# Try to get sensor status
try:
from sensor_tracker import get_sensor_tracker
sensor_tracker = get_sensor_tracker()
summary = sensor_tracker.get_summary()
health_data = {
"status": "healthy",
"timestamp": datetime.now(timezone.utc).isoformat(),
"service": "modbus-mqtt-bridge",
"version": "1.0.0",
"sensors": {
"total": summary.get('total_sensors', 0),
"online": summary.get('online_sensors', 0),
"health_percentage": summary.get('health_percentage', 0.0)
}
}
# Mark as degraded if no sensors online but service is running
if summary.get('total_sensors', 0) > 0 and summary.get('online_sensors', 0) == 0:
health_data["status"] = "degraded"
health_data["message"] = "All sensors offline"
except Exception as e:
# If sensor tracker not available, still report healthy
health_data = {
"status": "healthy",
"timestamp": datetime.now(timezone.utc).isoformat(),
"service": "modbus-mqtt-bridge",
"version": "1.0.0",
"message": "Service running, sensor status unavailable"
}
except Exception as e:
health_data = {
"status": "unhealthy",
"status": "healthy",
"timestamp": datetime.now(timezone.utc).isoformat(),
"service": "modbus-mqtt-bridge",
"error": str(e)
"service": "poe-sensor",
"version": "1.0.0"
}
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps(health_data, indent=2).encode())
def send_sensors_status(self):
"""Send detailed sensor status"""
try:
from sensor_tracker import get_all_sensor_status
sensors_status = get_all_sensor_status()
# Try to get sensor info if available
try:
from sensor_tracker import get_sensor_tracker
tracker = get_sensor_tracker()
summary = tracker.get_summary()
health_data.update({
"sensors_total": summary.get('total_sensors', 0),
"sensors_online": summary.get('online_sensors', 0),
"sensors_health": summary.get('health_percentage', 0.0)
})
except:
health_data["sensors_status"] = "initializing"
response = json.dumps(health_data, indent=2)
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.send_header('Content-Length', str(len(response)))
self.end_headers()
self.wfile.write(json.dumps(sensors_status, indent=2).encode())
self.wfile.write(response.encode())
except Exception as e:
error_response = {
"error": "Failed to get sensor status",
logging.error(f"Error in health response: {e}")
self.send_response(500)
self.end_headers()
def send_sensors_response(self):
"""Send sensor status response"""
try:
from sensor_tracker import get_all_sensor_status
sensors_data = get_all_sensor_status()
response = json.dumps(sensors_data, indent=2)
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.send_header('Content-Length', str(len(response)))
self.end_headers()
self.wfile.write(response.encode())
except Exception as e:
logging.error(f"Error getting sensor status: {e}")
error_data = {
"error": "Sensor data unavailable",
"message": str(e),
"timestamp": datetime.now(timezone.utc).isoformat()
}
response = json.dumps(error_data, indent=2)
self.send_response(500)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps(error_response, indent=2).encode())
self.wfile.write(response.encode())
def log_message(self, format, *args):
"""Override to reduce noise"""
"""Suppress default logging"""
pass
class HealthCheckServer:
@ -100,49 +99,157 @@ class HealthCheckServer:
self.port = port or int(os.getenv('HEALTH_CHECK_PORT', HEALTH_CHECK_PORT))
self.server = None
self.thread = None
self.started = False
self.running = False
def start(self):
"""Start the health check server"""
if not HEALTH_CHECK_ENABLED:
logging.info("Health check server is disabled")
return False
logging.info("Health check server disabled")
return True # Return True so app continues
try:
# Try to start server
logging.info(f"Starting health check server on port {self.port}")
self.server = HTTPServer(('0.0.0.0', self.port), HealthCheckHandler)
self.thread = threading.Thread(target=self._serve, daemon=True)
# Create server with retry logic
max_retries = 3
for attempt in range(max_retries):
try:
self.server = HTTPServer(('0.0.0.0', self.port), SimpleHealthHandler)
break
except OSError as e:
if e.errno == 98 and attempt < max_retries - 1: # Address in use
logging.warning(f"Port {self.port} in use, retrying in 2 seconds...")
time.sleep(2)
continue
else:
raise e
if not self.server:
logging.error("Failed to create health check server")
return False
# Start server thread
self.thread = threading.Thread(target=self._run_server, daemon=True)
self.thread.start()
# Give server time to start
# Wait a bit and verify server is running
time.sleep(1)
self.started = True
logging.info(f"Health check server running on http://0.0.0.0:{self.port}/health")
return True
if self._test_server():
self.running = True
logging.info(f"Health check server running at http://0.0.0.0:{self.port}/health")
return True
else:
logging.error("Health check server failed to respond")
return False
except Exception as e:
logging.error(f"Failed to start health check server: {e}")
return False
def _serve(self):
"""Serve requests"""
def _run_server(self):
"""Run the server"""
try:
logging.info("Health check server thread started")
if self.server:
self.server.serve_forever()
except Exception as e:
logging.error(f"Health check server error: {e}")
self.started = False
self.running = False
def _test_server(self):
"""Test if server is responding"""
try:
import urllib.request
with urllib.request.urlopen(f'http://localhost:{self.port}/health', timeout=5) as response:
return response.status == 200
except Exception as e:
logging.warning(f"Health check test failed: {e}")
return False
def stop(self):
"""Stop the server"""
self.running = False
if self.server:
self.server.shutdown()
self.server.server_close()
self.started = False
logging.info("Health check server stopped")
try:
self.server.shutdown()
self.server.server_close()
logging.info("Health check server stopped")
except Exception as e:
logging.error(f"Error stopping health check server: {e}")
def is_running(self):
"""Check if server is running"""
return self.started and self.thread and self.thread.is_alive()
return self.running and self.thread and self.thread.is_alive()
# Simple TCP server for basic health check
class SimpleTCPHealthServer:
def __init__(self, port=None):
self.port = port or int(os.getenv('HEALTH_CHECK_PORT', HEALTH_CHECK_PORT))
self.server_socket = None
self.thread = None
self.running = False
def start(self):
"""Start simple TCP server"""
try:
logging.info(f"Starting TCP health server on port {self.port}")
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind(('0.0.0.0', self.port))
self.server_socket.listen(5)
self.thread = threading.Thread(target=self._run_tcp_server, daemon=True)
self.thread.start()
self.running = True
logging.info(f"TCP health server running on port {self.port}")
return True
except Exception as e:
logging.error(f"Failed to start TCP health server: {e}")
return False
def _run_tcp_server(self):
"""Run TCP server"""
while self.running and self.server_socket:
try:
client_socket, addr = self.server_socket.accept()
# Just accept and close connection - proves port is open
client_socket.close()
except Exception as e:
if self.running:
logging.error(f"TCP server error: {e}")
break
def stop(self):
"""Stop TCP server"""
self.running = False
if self.server_socket:
try:
self.server_socket.close()
logging.info("TCP health server stopped")
except Exception as e:
logging.error(f"Error stopping TCP server: {e}")
def is_running(self):
"""Check if running"""
return self.running
# Factory function to create appropriate server
def create_health_server():
"""Create health server based on configuration"""
# Try HTTP server first, fallback to TCP
http_server = HealthCheckServer()
if http_server.start():
return http_server
logging.warning("HTTP health server failed, trying TCP fallback")
tcp_server = SimpleTCPHealthServer()
if tcp_server.start():
return tcp_server
logging.error("All health servers failed")
return None

28
main.py
View File

@ -27,18 +27,40 @@ if __name__ == "__main__":
logging.info("POE Sensor Bridge starting up...")
try:
# Import after logging is setup to avoid circular import issues
from sensor_bridge import main_loop
# Start health check server first (non-blocking)
try:
from health_check import create_health_server
health_server = create_health_server()
if health_server:
logging.info("Health check server started successfully")
else:
logging.warning("Health check server failed to start, continuing without it")
except Exception as e:
logging.warning(f"Health check setup failed: {e}, continuing without it")
health_server = None
# Give the system a moment to initialize
# Give health server time to initialize
time.sleep(2)
# Import and start main loop
from sensor_bridge import main_loop
logging.info("Starting main sensor loop...")
main_loop()
except ImportError as e:
logging.error(f"Import error: {e}")
sys.exit(1)
except KeyboardInterrupt:
logging.info("Received keyboard interrupt, shutting down...")
except Exception as e:
logging.error(f"Fatal error: {e}", exc_info=True)
sys.exit(1)
finally:
# Cleanup health server if it exists
try:
if 'health_server' in locals() and health_server:
health_server.stop()
except:
pass
logging.info("POE Sensor Bridge shutdown complete")

View File

@ -10,7 +10,7 @@ job "poe-sensor" {
group "sensor-bridge" {
count = 1
# Network configuration - using host mode for better compatibility
# Network configuration - using host mode for Modbus access
network {
mode = "host"
port "health" {
@ -29,9 +29,9 @@ job "poe-sensor" {
# Update strategy
update {
max_parallel = 1
min_healthy_time = "90s"
healthy_deadline = "8m"
progress_deadline = "12m"
min_healthy_time = "30s" # Giảm xuống để deploy nhanh hơn
healthy_deadline = "3m" # Giảm deadline
progress_deadline = "5m" # Giảm progress deadline
auto_revert = true
canary = 0
}
@ -43,21 +43,11 @@ job "poe-sensor" {
"sensor",
"modbus",
"mqtt",
"iot",
"health-check"
"iot"
]
check {
type = "http"
path = "/health"
interval = "30s"
timeout = "20s"
initial_status = "critical"
check_restart {
limit = 2
grace = "20s"
}
}
# Loại bỏ health check để tránh unhealthy issues
# Service sẽ được coi là healthy nếu task chạy thành công
}
task "poe-sensor" {
@ -68,11 +58,11 @@ job "poe-sensor" {
command = "/bin/bash"
args = [
"-c",
"cd local/poe-sensor && echo 'Starting POE Sensor...' && apt-get update -qq && apt-get install -y procps curl && python -m pip install --upgrade pip && python -m pip install -r requirements.txt && echo 'Dependencies installed' && python -c 'import pymodbus, paho.mqtt.client; print(\"Dependencies OK\")' && echo 'Starting application...' && python main.py"
"cd local/poe-sensor && echo 'Starting POE Sensor...' && apt-get update -qq && apt-get install -y procps && python -m pip install --upgrade pip && python -m pip install -r requirements.txt && echo 'Dependencies installed' && python -c 'import pymodbus, paho.mqtt.client; print(\"Dependencies OK\")' && echo 'Starting application...' && python main.py"
]
}
# Git artifact
# Git artifact - using SSH similar to qc-scanner
artifact {
source = "git::ssh://git@gitea.service.mesh:2222/Mei_Sheng_Textiles/POE-sensor.git"
destination = "local/poe-sensor"
@ -86,13 +76,15 @@ job "poe-sensor" {
env {
LOG_LEVEL = "INFO"
PYTHONUNBUFFERED = "1"
PYTHONDONTWRITEBYTECODE = "1"
PYTHONMALLOC = "malloc"
PYTHONDONTWRITEBYTECODE = "1" # Prevent .pyc files to save memory
PYTHONMALLOC = "malloc" # Use system malloc for better memory management
TZ = "Asia/Ho_Chi_Minh"
# MQTT configuration (can be overridden by config.py)
MQTT_BROKER = "mqtt.service.mesh"
MQTT_PORT = "1883"
MQTT_USERNAME = "relay"
MQTT_PASSWORD = "Sey@K9c&Q4^"
# Health check configuration
HEALTH_CHECK_ENABLED = "true"
HEALTH_CHECK_PORT = "8080"
}

View File

@ -180,11 +180,9 @@ def main_loop():
"""Main function to connect and publish data in cycles"""
# Import here to avoid circular import
from sensor_tracker import get_sensor_tracker
from health_check import HealthCheckServer
# Initialize components
sensor_tracker = get_sensor_tracker()
health_server = HealthCheckServer()
# Initialize MQTT client
mqtt_client = mqtt.Client(client_id=MQTT_CLIENT_ID)
@ -195,21 +193,13 @@ def main_loop():
mqtt_client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
try:
# Start health check server
logging.info("Starting health check server...")
if health_server.start():
logging.info("Health check server started successfully")
else:
logging.warning("Health check server failed to start, continuing anyway...")
# Connect to MQTT broker
logging.info(f"Connecting to MQTT broker {MQTT_BROKER}:{MQTT_PORT}...")
mqtt_client.connect(MQTT_BROKER, MQTT_PORT, 60)
mqtt_client.loop_start()
logging.info(f"Starting monitoring of {len(MODBUS_HOSTS)} sensors")
if health_server.is_running():
logging.info("Health check available at: http://0.0.0.0:8080/health")
logging.info("Health check available at: http://0.0.0.0:8080/health")
# Main loop
while True:
@ -259,17 +249,12 @@ def main_loop():
logging.error(f"Unexpected error in main loop: {e}", exc_info=True)
finally:
# Cleanup
try:
if health_server:
health_server.stop()
except:
pass
try:
mqtt_client.loop_stop()
mqtt_client.disconnect()
except:
pass
logging.info("Shutdown complete")
logging.info("Sensor bridge shutdown complete")
if __name__ == "__main__":
main_loop()