Refactor configuration and health check server in POE project. Updated config.py to use environment variables for MQTT and health check settings, enhancing flexibility. Improved health check server in health_check.py with readiness endpoint and better error handling. Main application in main.py now includes graceful shutdown handling and health server verification. Adjusted Nomad job configuration for improved resource allocation and health check parameters.

This commit is contained in:
Naab2k3
2025-06-23 13:42:58 +07:00
parent c515b6d2e9
commit 8991a02bf5
4 changed files with 249 additions and 62 deletions

View File

@ -1,4 +1,5 @@
import time import time
import os
# Modbus configuration # Modbus configuration
MODBUS_HOSTS = [ MODBUS_HOSTS = [
@ -14,22 +15,22 @@ MODBUS_PORT = 505
UNIT_ID = 1 UNIT_ID = 1
# MQTT configuration # MQTT configuration
MQTT_BROKER = "mqtt.service.mesh" MQTT_BROKER = os.getenv("MQTT_BROKER", "mqtt.service.mesh")
MQTT_PORT = 1883 MQTT_PORT = int(os.getenv("MQTT_PORT", 1883))
# Legacy topic - now using dynamic topic structure: {location}/{sensor_type}/data # Legacy topic - now using dynamic topic structure: {location}/{sensor_type}/data
MQTT_TOPIC = "Temperature_Humidity" # Keep for backward compatibility MQTT_TOPIC = "Temperature_Humidity" # Keep for backward compatibility
MQTT_CLIENT_ID = f"modbus-mqtt-client-{int(time.time())}" MQTT_CLIENT_ID = f"modbus-mqtt-client-{int(time.time())}"
MQTT_USERNAME = "relay" MQTT_USERNAME = os.getenv("MQTT_USERNAME", "relay")
MQTT_PASSWORD = "Sey@K9c&Q4^" MQTT_PASSWORD = os.getenv("MQTT_PASSWORD", "Sey@K9c&Q4^")
# Read and publish cycle configuration (seconds) # Read and publish cycle configuration (seconds)
PUBLISH_INTERVAL = 10 PUBLISH_INTERVAL = int(os.getenv("PUBLISH_INTERVAL", 10))
# Health check and alerting configuration # Health check and alerting configuration
HEALTH_CHECK_PORT = 8080 HEALTH_CHECK_PORT = int(os.getenv("HEALTH_CHECK_PORT", 8080))
HEALTH_CHECK_ENABLED = True HEALTH_CHECK_ENABLED = os.getenv("HEALTH_CHECK_ENABLED", "true").lower() in ["true", "1", "yes"]
# Alerting configuration # Alerting configuration
ALERTING_ENABLED = True ALERTING_ENABLED = os.getenv("ALERTING_ENABLED", "true").lower() in ["true", "1", "yes"]
SENSOR_TIMEOUT_THRESHOLD = 3 # Number of consecutive failures before alert SENSOR_TIMEOUT_THRESHOLD = int(os.getenv("SENSOR_TIMEOUT_THRESHOLD", 3)) # Number of consecutive failures before alert
RECOVERY_CONFIRMATION_COUNT = 2 # Number of consecutive successes to confirm recovery RECOVERY_CONFIRMATION_COUNT = int(os.getenv("RECOVERY_CONFIRMATION_COUNT", 2)) # Number of consecutive successes to confirm recovery

View File

@ -4,6 +4,8 @@ from http.server import HTTPServer, BaseHTTPRequestHandler
from datetime import datetime, timezone from datetime import datetime, timezone
import logging import logging
import time import time
import socket
import os
from config import HEALTH_CHECK_PORT, HEALTH_CHECK_ENABLED from config import HEALTH_CHECK_PORT, HEALTH_CHECK_ENABLED
@ -13,6 +15,8 @@ class HealthCheckHandler(BaseHTTPRequestHandler):
self.send_health_response() self.send_health_response()
elif self.path == '/sensors': elif self.path == '/sensors':
self.send_sensors_status() self.send_sensors_status()
elif self.path == '/ready':
self.send_readiness_response()
else: else:
self.send_response(404) self.send_response(404)
self.end_headers() self.end_headers()
@ -33,16 +37,24 @@ class HealthCheckHandler(BaseHTTPRequestHandler):
"sensors": { "sensors": {
"total": summary.get('total_sensors', 0), "total": summary.get('total_sensors', 0),
"online": summary.get('online_sensors', 0), "online": summary.get('online_sensors', 0),
"offline": summary.get('offline_sensors', 0),
"unknown": summary.get('unknown_sensors', 0),
"health_percentage": summary.get('health_percentage', 0.0) "health_percentage": summary.get('health_percentage', 0.0)
} },
"uptime": self._get_uptime(),
"host": socket.gethostname()
} }
# Consider the service unhealthy if no sensors are working # Service is healthy if the health check server is running
if summary.get('total_sensors', 0) > 0 and summary.get('online_sensors', 0) == 0: # Don't mark as unhealthy just because sensors are offline
# If we have sensors configured but none are online, report as degraded but still healthy # That's what the degraded status is for
# (since the health check server itself is working) if summary.get('total_sensors', 0) > 0:
health_data["status"] = "degraded" if summary.get('online_sensors', 0) == 0:
health_data["message"] = "All sensors offline" health_data["status"] = "degraded"
health_data["message"] = "All sensors offline but service is running"
elif summary.get('health_percentage', 0) < 50:
health_data["status"] = "degraded"
health_data["message"] = f"Low sensor health: {summary.get('health_percentage', 0):.1f}%"
except Exception as e: except Exception as e:
# If we can't get sensor status, still report as healthy since the service is running # If we can't get sensor status, still report as healthy since the service is running
@ -52,44 +64,115 @@ class HealthCheckHandler(BaseHTTPRequestHandler):
"timestamp": datetime.now(timezone.utc).isoformat(), "timestamp": datetime.now(timezone.utc).isoformat(),
"service": "modbus-mqtt-bridge", "service": "modbus-mqtt-bridge",
"version": "1.0.0", "version": "1.0.0",
"message": "Service running, sensor status unavailable" "message": "Service running, sensor status unavailable",
"uptime": self._get_uptime(),
"host": socket.gethostname()
} }
self.send_response(200) self.send_response(200)
self.send_header('Content-type', 'application/json') self.send_header('Content-type', 'application/json')
self.send_header('Cache-Control', 'no-cache')
self.end_headers() self.end_headers()
self.wfile.write(json.dumps(health_data, indent=2).encode()) self.wfile.write(json.dumps(health_data, indent=2).encode())
def send_readiness_response(self):
"""Send readiness probe response - checks if service is ready to serve"""
try:
# Check if main components are initialized
from sensor_tracker import get_sensor_tracker
sensor_tracker = get_sensor_tracker()
ready_data = {
"ready": True,
"timestamp": datetime.now(timezone.utc).isoformat(),
"service": "modbus-mqtt-bridge",
"checks": {
"sensor_tracker": True,
"health_server": True
}
}
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.send_header('Cache-Control', 'no-cache')
self.end_headers()
self.wfile.write(json.dumps(ready_data, indent=2).encode())
except Exception as e:
logging.error(f"Readiness check failed: {e}")
ready_data = {
"ready": False,
"timestamp": datetime.now(timezone.utc).isoformat(),
"service": "modbus-mqtt-bridge",
"error": str(e)
}
self.send_response(503)
self.send_header('Content-type', 'application/json')
self.send_header('Cache-Control', 'no-cache')
self.end_headers()
self.wfile.write(json.dumps(ready_data, indent=2).encode())
def send_sensors_status(self): def send_sensors_status(self):
"""Send detailed sensor status""" """Send detailed sensor status"""
# Get sensor status from the global sensor tracker try:
from sensor_tracker import get_all_sensor_status # Get sensor status from the global sensor tracker
from sensor_tracker import get_all_sensor_status
sensors_status = get_all_sensor_status()
sensors_status = get_all_sensor_status() self.send_response(200)
self.send_header('Content-type', 'application/json')
self.send_header('Cache-Control', 'no-cache')
self.end_headers()
self.wfile.write(json.dumps(sensors_status, indent=2).encode())
self.send_response(200) except Exception as e:
self.send_header('Content-type', 'application/json') logging.error(f"Error getting sensor status: {e}")
self.end_headers() error_response = {
self.wfile.write(json.dumps(sensors_status, indent=2).encode()) "error": "Failed to get sensor status",
"message": str(e),
"timestamp": datetime.now(timezone.utc).isoformat()
}
self.send_response(500)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps(error_response, indent=2).encode())
def _get_uptime(self):
"""Get service uptime"""
try:
with open('/proc/uptime', 'r') as f:
uptime_seconds = float(f.readline().split()[0])
return f"{uptime_seconds:.1f}s"
except:
return "unknown"
def log_message(self, format, *args): def log_message(self, format, *args):
"""Override to use our logging system""" """Override to use our logging system"""
logging.info(f"Health Check - {format % args}") logging.info(f"Health Check - {format % args}")
class HealthCheckServer: class HealthCheckServer:
def __init__(self, port=HEALTH_CHECK_PORT): def __init__(self, port=None):
self.port = port self.port = port or int(os.getenv('HEALTH_CHECK_PORT', HEALTH_CHECK_PORT))
self.server = None self.server = None
self.thread = None self.thread = None
self.started = False
def start(self): def start(self):
"""Start the health check server in a separate thread""" """Start the health check server in a separate thread"""
if not HEALTH_CHECK_ENABLED: if not HEALTH_CHECK_ENABLED:
logging.info("Health check server is disabled") logging.info("Health check server is disabled")
return return False
try: try:
logging.info(f"Attempting to start health check server on port {self.port}") logging.info(f"Attempting to start health check server on port {self.port}")
# Check if port is available
if not self._is_port_available(self.port):
logging.error(f"Port {self.port} is already in use. Cannot start health check server.")
return False
# Bind to all interfaces to make it accessible from outside container # Bind to all interfaces to make it accessible from outside container
self.server = HTTPServer(('0.0.0.0', self.port), HealthCheckHandler) self.server = HTTPServer(('0.0.0.0', self.port), HealthCheckHandler)
@ -99,24 +182,50 @@ class HealthCheckServer:
self.thread = threading.Thread(target=self._serve_with_error_handling, daemon=True) self.thread = threading.Thread(target=self._serve_with_error_handling, daemon=True)
self.thread.start() self.thread.start()
# Give the server a moment to start # Give the server a moment to start and verify it's working
time.sleep(0.5) time.sleep(1)
logging.info(f"Health check server started on 0.0.0.0:{self.port}") if self._test_health_endpoint():
logging.info(f"Health check endpoints:") self.started = True
logging.info(f" - http://0.0.0.0:{self.port}/health") logging.info(f"Health check server started successfully on 0.0.0.0:{self.port}")
logging.info(f" - http://0.0.0.0:{self.port}/sensors") logging.info(f"Health check endpoints:")
logging.info("Health check server is ready for external health checks") logging.info(f" - http://0.0.0.0:{self.port}/health")
logging.info(f" - http://0.0.0.0:{self.port}/sensors")
logging.info(f" - http://0.0.0.0:{self.port}/ready")
logging.info("Health check server is ready for external health checks")
return True
else:
logging.error("Health check server started but endpoints are not responding")
return False
except OSError as e: except OSError as e:
if e.errno == 98: # Address already in use if e.errno == 98: # Address already in use
logging.error(f"Port {self.port} is already in use. Cannot start health check server.") logging.error(f"Port {self.port} is already in use. Cannot start health check server.")
else: else:
logging.error(f"OS error starting health check server: {e}") logging.error(f"OS error starting health check server: {e}")
raise e return False
except Exception as e: except Exception as e:
logging.error(f"Failed to start health check server: {e}") logging.error(f"Failed to start health check server: {e}")
raise e # Re-raise to make the issue visible return False
def _is_port_available(self, port):
"""Check if port is available"""
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('0.0.0.0', port))
return True
except OSError:
return False
def _test_health_endpoint(self):
"""Test if health endpoint is responding"""
try:
import urllib.request
with urllib.request.urlopen(f'http://localhost:{self.port}/health', timeout=5) as response:
return response.status == 200
except Exception as e:
logging.warning(f"Health endpoint test failed: {e}")
return False
def _serve_with_error_handling(self): def _serve_with_error_handling(self):
"""Serve forever with error handling""" """Serve forever with error handling"""
@ -128,10 +237,16 @@ class HealthCheckServer:
logging.error("Health check server is None, cannot serve requests") logging.error("Health check server is None, cannot serve requests")
except Exception as e: except Exception as e:
logging.error(f"Health check server error: {e}", exc_info=True) logging.error(f"Health check server error: {e}", exc_info=True)
self.started = False
def stop(self): def stop(self):
"""Stop the health check server""" """Stop the health check server"""
if self.server: if self.server:
self.server.shutdown() self.server.shutdown()
self.server.server_close() self.server.server_close()
self.started = False
logging.info("Health check server stopped") logging.info("Health check server stopped")
def is_running(self):
"""Check if server is running"""
return self.started and self.thread and self.thread.is_alive()

63
main.py
View File

@ -13,14 +13,67 @@ Author: POE Project
import time import time
import logging import logging
import sys
import signal
from health_check import HealthCheckServer
from sensor_bridge import main_loop from sensor_bridge import main_loop
# Global health server for cleanup
health_server = None
def signal_handler(signum, frame):
"""Handle shutdown signals gracefully"""
global health_server
logging.info(f"Received signal {signum}, shutting down gracefully...")
if health_server:
health_server.stop()
sys.exit(0)
if __name__ == "__main__": if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout),
]
)
# Setup signal handlers
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
logging.info("POE Sensor Bridge starting up...") logging.info("POE Sensor Bridge starting up...")
# Give the system a moment to initialize try:
time.sleep(2) # Start health check server first
logging.info("Starting health check server...")
health_server = HealthCheckServer()
logging.info("Starting main sensor loop...") if health_server.start():
main_loop() logging.info("Health check server started successfully")
# Give health check server time to be ready
time.sleep(3)
# Verify health check is working
if health_server.is_running():
logging.info("Health check server verified as running")
logging.info("Starting main sensor loop...")
main_loop()
else:
logging.error("Health check server failed to start properly")
sys.exit(1)
else:
logging.error("Failed to start health check server")
sys.exit(1)
except KeyboardInterrupt:
logging.info("Received keyboard interrupt, shutting down...")
except Exception as e:
logging.error(f"Fatal error: {e}", exc_info=True)
sys.exit(1)
finally:
if health_server:
health_server.stop()
logging.info("POE Sensor Bridge shutdown complete")

View File

@ -10,28 +10,29 @@ job "poe-sensor" {
group "sensor-bridge" { group "sensor-bridge" {
count = 1 count = 1
# Network configuration - using host mode for Modbus access # Network configuration - using bridge mode with port mapping for better isolation
network { network {
mode = "host" mode = "bridge"
port "health" { port "health" {
static = 8080 static = 8080
to = 8080
} }
} }
# Restart policy # Restart policy - more lenient for startup issues
restart { restart {
attempts = 3 attempts = 5
interval = "30m" interval = "30m"
delay = "15s" delay = "30s"
mode = "fail" mode = "fail"
} }
# Update strategy # Update strategy
update { update {
max_parallel = 1 max_parallel = 1
min_healthy_time = "60s" min_healthy_time = "120s" # Increased from 60s
healthy_deadline = "5m" healthy_deadline = "10m" # Increased from 5m
progress_deadline = "10m" progress_deadline = "15m" # Increased from 10m
auto_revert = true auto_revert = true
canary = 0 canary = 0
} }
@ -50,12 +51,12 @@ job "poe-sensor" {
check { check {
type = "http" type = "http"
path = "/health" path = "/health"
interval = "60s" interval = "30s" # Reduced frequency
timeout = "15s" timeout = "30s" # Increased timeout
initial_status = "passing" initial_status = "critical" # Start as critical until proven healthy
check_restart { check_restart {
limit = 2 limit = 3
grace = "15s" grace = "30s" # More time for graceful shutdown
} }
} }
} }
@ -68,7 +69,21 @@ job "poe-sensor" {
command = "/bin/bash" command = "/bin/bash"
args = [ args = [
"-c", "-c",
"cd local/poe-sensor && apt-get update -qq && apt-get install -y procps && python -m pip install --upgrade pip && python -m pip install -r requirements.txt && python -c 'import pymodbus, paho.mqtt.client; print(\"Dependencies installed successfully\")' && python main.py" <<EOF
cd local/poe-sensor &&
echo "Starting POE Sensor installation..." &&
apt-get update -qq &&
apt-get install -y procps curl &&
python -m pip install --upgrade pip &&
echo "Installing Python dependencies..." &&
python -m pip install -r requirements.txt &&
echo "Testing dependencies..." &&
python -c 'import pymodbus, paho.mqtt.client; print("Dependencies installed successfully")' &&
echo "Starting health check server..." &&
python -c 'from health_check import HealthCheckServer; import time; server = HealthCheckServer(); server.start(); time.sleep(2); print("Health check server started")' &
echo "Starting main application..." &&
python main.py
EOF
] ]
} }
@ -94,12 +109,15 @@ job "poe-sensor" {
MQTT_PORT = "1883" MQTT_PORT = "1883"
MQTT_USERNAME = "relay" MQTT_USERNAME = "relay"
MQTT_PASSWORD = "Sey@K9c&Q4^" MQTT_PASSWORD = "Sey@K9c&Q4^"
# Health check configuration
HEALTH_CHECK_ENABLED = "true"
HEALTH_CHECK_PORT = "8080"
} }
# Resource allocation # Resource allocation - increased for stability
resources { resources {
cpu = 256 cpu = 512 # Increased from 256
memory = 512 memory = 1024 # Increased from 512
} }
# Logs configuration # Logs configuration