Refactor health check functionality in POE project. Removed readiness endpoint from health_check.py and improved error handling for sensor status retrieval. Updated logging to reduce noise and adjusted health check server startup process in main.py. Modified Nomad job configuration for network mode and resource allocation, enhancing overall system performance and stability.

This commit is contained in:
Naab2k3
2025-06-23 13:46:59 +07:00
parent 8991a02bf5
commit 9b0f4f6236
4 changed files with 148 additions and 339 deletions

View File

@ -15,8 +15,6 @@ class HealthCheckHandler(BaseHTTPRequestHandler):
self.send_health_response() self.send_health_response()
elif self.path == '/sensors': elif self.path == '/sensors':
self.send_sensors_status() self.send_sensors_status()
elif self.path == '/ready':
self.send_readiness_response()
else: else:
self.send_response(404) self.send_response(404)
self.end_headers() self.end_headers()
@ -24,110 +22,64 @@ class HealthCheckHandler(BaseHTTPRequestHandler):
def send_health_response(self): def send_health_response(self):
"""Send basic health check response""" """Send basic health check response"""
try: try:
# Get basic application status # Try to get sensor status
from sensor_tracker import get_sensor_tracker try:
sensor_tracker = get_sensor_tracker() from sensor_tracker import get_sensor_tracker
summary = sensor_tracker.get_summary() sensor_tracker = get_sensor_tracker()
summary = sensor_tracker.get_summary()
health_data = { health_data = {
"status": "healthy", "status": "healthy",
"timestamp": datetime.now(timezone.utc).isoformat(), "timestamp": datetime.now(timezone.utc).isoformat(),
"service": "modbus-mqtt-bridge", "service": "modbus-mqtt-bridge",
"version": "1.0.0", "version": "1.0.0",
"sensors": { "sensors": {
"total": summary.get('total_sensors', 0), "total": summary.get('total_sensors', 0),
"online": summary.get('online_sensors', 0), "online": summary.get('online_sensors', 0),
"offline": summary.get('offline_sensors', 0), "health_percentage": summary.get('health_percentage', 0.0)
"unknown": summary.get('unknown_sensors', 0), }
"health_percentage": summary.get('health_percentage', 0.0)
},
"uptime": self._get_uptime(),
"host": socket.gethostname()
}
# Service is healthy if the health check server is running
# Don't mark as unhealthy just because sensors are offline
# That's what the degraded status is for
if summary.get('total_sensors', 0) > 0:
if summary.get('online_sensors', 0) == 0:
health_data["status"] = "degraded"
health_data["message"] = "All sensors offline but service is running"
elif summary.get('health_percentage', 0) < 50:
health_data["status"] = "degraded"
health_data["message"] = f"Low sensor health: {summary.get('health_percentage', 0):.1f}%"
except Exception as e:
# If we can't get sensor status, still report as healthy since the service is running
logging.warning(f"Could not get sensor status for health check: {e}")
health_data = {
"status": "healthy",
"timestamp": datetime.now(timezone.utc).isoformat(),
"service": "modbus-mqtt-bridge",
"version": "1.0.0",
"message": "Service running, sensor status unavailable",
"uptime": self._get_uptime(),
"host": socket.gethostname()
}
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.send_header('Cache-Control', 'no-cache')
self.end_headers()
self.wfile.write(json.dumps(health_data, indent=2).encode())
def send_readiness_response(self):
"""Send readiness probe response - checks if service is ready to serve"""
try:
# Check if main components are initialized
from sensor_tracker import get_sensor_tracker
sensor_tracker = get_sensor_tracker()
ready_data = {
"ready": True,
"timestamp": datetime.now(timezone.utc).isoformat(),
"service": "modbus-mqtt-bridge",
"checks": {
"sensor_tracker": True,
"health_server": True
} }
}
self.send_response(200) # Mark as degraded if no sensors online but service is running
self.send_header('Content-type', 'application/json') if summary.get('total_sensors', 0) > 0 and summary.get('online_sensors', 0) == 0:
self.send_header('Cache-Control', 'no-cache') health_data["status"] = "degraded"
self.end_headers() health_data["message"] = "All sensors offline"
self.wfile.write(json.dumps(ready_data, indent=2).encode())
except Exception as e:
# If sensor tracker not available, still report healthy
health_data = {
"status": "healthy",
"timestamp": datetime.now(timezone.utc).isoformat(),
"service": "modbus-mqtt-bridge",
"version": "1.0.0",
"message": "Service running, sensor status unavailable"
}
except Exception as e: except Exception as e:
logging.error(f"Readiness check failed: {e}") health_data = {
ready_data = { "status": "unhealthy",
"ready": False,
"timestamp": datetime.now(timezone.utc).isoformat(), "timestamp": datetime.now(timezone.utc).isoformat(),
"service": "modbus-mqtt-bridge", "service": "modbus-mqtt-bridge",
"error": str(e) "error": str(e)
} }
self.send_response(503) self.send_response(200)
self.send_header('Content-type', 'application/json') self.send_header('Content-type', 'application/json')
self.send_header('Cache-Control', 'no-cache') self.end_headers()
self.end_headers() self.wfile.write(json.dumps(health_data, indent=2).encode())
self.wfile.write(json.dumps(ready_data, indent=2).encode())
def send_sensors_status(self): def send_sensors_status(self):
"""Send detailed sensor status""" """Send detailed sensor status"""
try: try:
# Get sensor status from the global sensor tracker
from sensor_tracker import get_all_sensor_status from sensor_tracker import get_all_sensor_status
sensors_status = get_all_sensor_status() sensors_status = get_all_sensor_status()
self.send_response(200) self.send_response(200)
self.send_header('Content-type', 'application/json') self.send_header('Content-type', 'application/json')
self.send_header('Cache-Control', 'no-cache')
self.end_headers() self.end_headers()
self.wfile.write(json.dumps(sensors_status, indent=2).encode()) self.wfile.write(json.dumps(sensors_status, indent=2).encode())
except Exception as e: except Exception as e:
logging.error(f"Error getting sensor status: {e}")
error_response = { error_response = {
"error": "Failed to get sensor status", "error": "Failed to get sensor status",
"message": str(e), "message": str(e),
@ -139,18 +91,9 @@ class HealthCheckHandler(BaseHTTPRequestHandler):
self.end_headers() self.end_headers()
self.wfile.write(json.dumps(error_response, indent=2).encode()) self.wfile.write(json.dumps(error_response, indent=2).encode())
def _get_uptime(self):
"""Get service uptime"""
try:
with open('/proc/uptime', 'r') as f:
uptime_seconds = float(f.readline().split()[0])
return f"{uptime_seconds:.1f}s"
except:
return "unknown"
def log_message(self, format, *args): def log_message(self, format, *args):
"""Override to use our logging system""" """Override to reduce noise"""
logging.info(f"Health Check - {format % args}") pass
class HealthCheckServer: class HealthCheckServer:
def __init__(self, port=None): def __init__(self, port=None):
@ -160,87 +103,40 @@ class HealthCheckServer:
self.started = False self.started = False
def start(self): def start(self):
"""Start the health check server in a separate thread""" """Start the health check server"""
if not HEALTH_CHECK_ENABLED: if not HEALTH_CHECK_ENABLED:
logging.info("Health check server is disabled") logging.info("Health check server is disabled")
return False return False
try: try:
logging.info(f"Attempting to start health check server on port {self.port}") logging.info(f"Starting health check server on port {self.port}")
# Check if port is available
if not self._is_port_available(self.port):
logging.error(f"Port {self.port} is already in use. Cannot start health check server.")
return False
# Bind to all interfaces to make it accessible from outside container
self.server = HTTPServer(('0.0.0.0', self.port), HealthCheckHandler) self.server = HTTPServer(('0.0.0.0', self.port), HealthCheckHandler)
# Test if the server can actually bind to the port self.thread = threading.Thread(target=self._serve, daemon=True)
logging.info(f"Successfully bound to 0.0.0.0:{self.port}")
self.thread = threading.Thread(target=self._serve_with_error_handling, daemon=True)
self.thread.start() self.thread.start()
# Give the server a moment to start and verify it's working # Give server time to start
time.sleep(1) time.sleep(1)
if self._test_health_endpoint(): self.started = True
self.started = True logging.info(f"Health check server running on http://0.0.0.0:{self.port}/health")
logging.info(f"Health check server started successfully on 0.0.0.0:{self.port}") return True
logging.info(f"Health check endpoints:")
logging.info(f" - http://0.0.0.0:{self.port}/health")
logging.info(f" - http://0.0.0.0:{self.port}/sensors")
logging.info(f" - http://0.0.0.0:{self.port}/ready")
logging.info("Health check server is ready for external health checks")
return True
else:
logging.error("Health check server started but endpoints are not responding")
return False
except OSError as e:
if e.errno == 98: # Address already in use
logging.error(f"Port {self.port} is already in use. Cannot start health check server.")
else:
logging.error(f"OS error starting health check server: {e}")
return False
except Exception as e: except Exception as e:
logging.error(f"Failed to start health check server: {e}") logging.error(f"Failed to start health check server: {e}")
return False return False
def _is_port_available(self, port): def _serve(self):
"""Check if port is available""" """Serve requests"""
try: try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('0.0.0.0', port))
return True
except OSError:
return False
def _test_health_endpoint(self):
"""Test if health endpoint is responding"""
try:
import urllib.request
with urllib.request.urlopen(f'http://localhost:{self.port}/health', timeout=5) as response:
return response.status == 200
except Exception as e:
logging.warning(f"Health endpoint test failed: {e}")
return False
def _serve_with_error_handling(self):
"""Serve forever with error handling"""
try:
logging.info("Health check server thread started, beginning to serve requests")
if self.server: if self.server:
self.server.serve_forever() self.server.serve_forever()
else:
logging.error("Health check server is None, cannot serve requests")
except Exception as e: except Exception as e:
logging.error(f"Health check server error: {e}", exc_info=True) logging.error(f"Health check server error: {e}")
self.started = False self.started = False
def stop(self): def stop(self):
"""Stop the health check server""" """Stop the server"""
if self.server: if self.server:
self.server.shutdown() self.server.shutdown()
self.server.server_close() self.server.server_close()

59
main.py
View File

@ -14,66 +14,31 @@ Author: POE Project
import time import time
import logging import logging
import sys import sys
import signal import os
from health_check import HealthCheckServer
from sensor_bridge import main_loop
# Global health server for cleanup
health_server = None
def signal_handler(signum, frame):
"""Handle shutdown signals gracefully"""
global health_server
logging.info(f"Received signal {signum}, shutting down gracefully...")
if health_server:
health_server.stop()
sys.exit(0)
if __name__ == "__main__": if __name__ == "__main__":
# Setup logging # Setup logging first
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s', format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[ handlers=[logging.StreamHandler(sys.stdout)]
logging.StreamHandler(sys.stdout),
]
) )
# Setup signal handlers
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
logging.info("POE Sensor Bridge starting up...") logging.info("POE Sensor Bridge starting up...")
try: try:
# Start health check server first # Import after logging is setup to avoid circular import issues
logging.info("Starting health check server...") from sensor_bridge import main_loop
health_server = HealthCheckServer()
if health_server.start(): # Give the system a moment to initialize
logging.info("Health check server started successfully") time.sleep(2)
# Give health check server time to be ready logging.info("Starting main sensor loop...")
time.sleep(3) main_loop()
# Verify health check is working except ImportError as e:
if health_server.is_running(): logging.error(f"Import error: {e}")
logging.info("Health check server verified as running") sys.exit(1)
logging.info("Starting main sensor loop...")
main_loop()
else:
logging.error("Health check server failed to start properly")
sys.exit(1)
else:
logging.error("Failed to start health check server")
sys.exit(1)
except KeyboardInterrupt:
logging.info("Received keyboard interrupt, shutting down...")
except Exception as e: except Exception as e:
logging.error(f"Fatal error: {e}", exc_info=True) logging.error(f"Fatal error: {e}", exc_info=True)
sys.exit(1) sys.exit(1)
finally:
if health_server:
health_server.stop()
logging.info("POE Sensor Bridge shutdown complete")

View File

@ -10,29 +10,28 @@ job "poe-sensor" {
group "sensor-bridge" { group "sensor-bridge" {
count = 1 count = 1
# Network configuration - using bridge mode with port mapping for better isolation # Network configuration - using host mode for better compatibility
network { network {
mode = "bridge" mode = "host"
port "health" { port "health" {
static = 8080 static = 8080
to = 8080
} }
} }
# Restart policy - more lenient for startup issues # Restart policy
restart { restart {
attempts = 5 attempts = 3
interval = "30m" interval = "30m"
delay = "30s" delay = "15s"
mode = "fail" mode = "fail"
} }
# Update strategy # Update strategy
update { update {
max_parallel = 1 max_parallel = 1
min_healthy_time = "120s" # Increased from 60s min_healthy_time = "90s"
healthy_deadline = "10m" # Increased from 5m healthy_deadline = "8m"
progress_deadline = "15m" # Increased from 10m progress_deadline = "12m"
auto_revert = true auto_revert = true
canary = 0 canary = 0
} }
@ -51,12 +50,12 @@ job "poe-sensor" {
check { check {
type = "http" type = "http"
path = "/health" path = "/health"
interval = "30s" # Reduced frequency interval = "30s"
timeout = "30s" # Increased timeout timeout = "20s"
initial_status = "critical" # Start as critical until proven healthy initial_status = "critical"
check_restart { check_restart {
limit = 3 limit = 2
grace = "30s" # More time for graceful shutdown grace = "20s"
} }
} }
} }
@ -69,25 +68,11 @@ job "poe-sensor" {
command = "/bin/bash" command = "/bin/bash"
args = [ args = [
"-c", "-c",
<<EOF "cd local/poe-sensor && echo 'Starting POE Sensor...' && apt-get update -qq && apt-get install -y procps curl && python -m pip install --upgrade pip && python -m pip install -r requirements.txt && echo 'Dependencies installed' && python -c 'import pymodbus, paho.mqtt.client; print(\"Dependencies OK\")' && echo 'Starting application...' && python main.py"
cd local/poe-sensor &&
echo "Starting POE Sensor installation..." &&
apt-get update -qq &&
apt-get install -y procps curl &&
python -m pip install --upgrade pip &&
echo "Installing Python dependencies..." &&
python -m pip install -r requirements.txt &&
echo "Testing dependencies..." &&
python -c 'import pymodbus, paho.mqtt.client; print("Dependencies installed successfully")' &&
echo "Starting health check server..." &&
python -c 'from health_check import HealthCheckServer; import time; server = HealthCheckServer(); server.start(); time.sleep(2); print("Health check server started")' &
echo "Starting main application..." &&
python main.py
EOF
] ]
} }
# Git artifact - using SSH similar to qc-scanner # Git artifact
artifact { artifact {
source = "git::ssh://git@gitea.service.mesh:2222/Mei_Sheng_Textiles/POE-sensor.git" source = "git::ssh://git@gitea.service.mesh:2222/Mei_Sheng_Textiles/POE-sensor.git"
destination = "local/poe-sensor" destination = "local/poe-sensor"
@ -101,23 +86,21 @@ EOF
env { env {
LOG_LEVEL = "INFO" LOG_LEVEL = "INFO"
PYTHONUNBUFFERED = "1" PYTHONUNBUFFERED = "1"
PYTHONDONTWRITEBYTECODE = "1" # Prevent .pyc files to save memory PYTHONDONTWRITEBYTECODE = "1"
PYTHONMALLOC = "malloc" # Use system malloc for better memory management PYTHONMALLOC = "malloc"
TZ = "Asia/Ho_Chi_Minh" TZ = "Asia/Ho_Chi_Minh"
# MQTT configuration (can be overridden by config.py)
MQTT_BROKER = "mqtt.service.mesh" MQTT_BROKER = "mqtt.service.mesh"
MQTT_PORT = "1883" MQTT_PORT = "1883"
MQTT_USERNAME = "relay" MQTT_USERNAME = "relay"
MQTT_PASSWORD = "Sey@K9c&Q4^" MQTT_PASSWORD = "Sey@K9c&Q4^"
# Health check configuration
HEALTH_CHECK_ENABLED = "true" HEALTH_CHECK_ENABLED = "true"
HEALTH_CHECK_PORT = "8080" HEALTH_CHECK_PORT = "8080"
} }
# Resource allocation - increased for stability # Resource allocation
resources { resources {
cpu = 512 # Increased from 256 cpu = 256
memory = 1024 # Increased from 512 memory = 512
} }
# Logs configuration # Logs configuration

View File

@ -1,7 +1,7 @@
import logging import logging
import json import json
import time import time
import gc # Add garbage collection import gc
from datetime import datetime, timezone from datetime import datetime, timezone
from pymodbus.client import ModbusTcpClient from pymodbus.client import ModbusTcpClient
from pymodbus.exceptions import ModbusException from pymodbus.exceptions import ModbusException
@ -12,11 +12,6 @@ from config import (
MQTT_BROKER, MQTT_PORT, MQTT_TOPIC, MQTT_CLIENT_ID, MQTT_BROKER, MQTT_PORT, MQTT_TOPIC, MQTT_CLIENT_ID,
MQTT_USERNAME, MQTT_PASSWORD, PUBLISH_INTERVAL MQTT_USERNAME, MQTT_PASSWORD, PUBLISH_INTERVAL
) )
from sensor_tracker import get_sensor_tracker
from health_check import HealthCheckServer
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def on_connect(client, userdata, flags, rc): def on_connect(client, userdata, flags, rc):
"""Callback when MQTT client connects to broker""" """Callback when MQTT client connects to broker"""
@ -27,17 +22,17 @@ def on_connect(client, userdata, flags, rc):
def on_publish(client, userdata, mid): def on_publish(client, userdata, mid):
"""Callback when MQTT message is published""" """Callback when MQTT message is published"""
logging.info(f"Successfully sent message with ID: {mid}") logging.debug(f"Successfully sent message with ID: {mid}")
def read_and_publish_data(mqtt_client, modbus_client, host_info): def read_and_publish_data(mqtt_client, modbus_client, host_info):
"""Read data from Modbus and publish to MQTT""" """Read data from Modbus and publish to MQTT"""
try: try:
# Check and establish Modbus connection with explicit timeout # Check and establish Modbus connection
if not modbus_client.is_socket_open(): if not modbus_client.is_socket_open():
logging.info(f"Attempting to connect to {host_info['ip']}:{MODBUS_PORT}") logging.info(f"Attempting to connect to {host_info['ip']}:{MODBUS_PORT}")
connection_result = modbus_client.connect() connection_result = modbus_client.connect()
if not connection_result: if not connection_result:
logging.error(f"Failed to connect to Modbus server {host_info['ip']}. Connection returned: {connection_result}") logging.error(f"Failed to connect to Modbus server {host_info['ip']}")
return False return False
logging.info(f"Successfully connected to {host_info['ip']}:{MODBUS_PORT}") logging.info(f"Successfully connected to {host_info['ip']}:{MODBUS_PORT}")
@ -48,13 +43,13 @@ def read_and_publish_data(mqtt_client, modbus_client, host_info):
return read_and_publish_temperature_humidity(mqtt_client, modbus_client, host_info) return read_and_publish_temperature_humidity(mqtt_client, modbus_client, host_info)
except ModbusException as e: except ModbusException as e:
logging.error(f"Modbus error from {host_info['ip']}: {e}", exc_info=True) logging.error(f"Modbus error from {host_info['ip']}: {e}")
return False return False
except Exception as e: except Exception as e:
logging.error(f"Unexpected error from {host_info['ip']}: {e}", exc_info=True) logging.error(f"Unexpected error from {host_info['ip']}: {e}")
return False return False
finally: finally:
# Ensure connection is closed after each operation to prevent resource leaks # Ensure connection is closed
try: try:
if modbus_client and modbus_client.is_socket_open(): if modbus_client and modbus_client.is_socket_open():
modbus_client.close() modbus_client.close()
@ -70,8 +65,8 @@ def read_and_publish_temperature_humidity(mqtt_client, modbus_client, host_info)
logging.error(f"Error reading temperature from {host_info['ip']}: {result_temp}") logging.error(f"Error reading temperature from {host_info['ip']}: {result_temp}")
return False return False
raw_temp = result_temp.registers[0] raw_temp = result_temp.registers[0]
temperature = (125 - (-40)) * raw_temp / 1650 - 40 # Correct formula: -40°C to 125°C temperature = (125 - (-40)) * raw_temp / 1650 - 40
logging.info(f"Raw temperature from {host_info['ip']}: {raw_temp}, Corrected: {temperature:.1f}°C") logging.info(f"Temperature from {host_info['ip']}: {temperature:.1f}°C")
# Read humidity (register 1) # Read humidity (register 1)
result_hum = modbus_client.read_holding_registers(address=1, count=1, slave=UNIT_ID) result_hum = modbus_client.read_holding_registers(address=1, count=1, slave=UNIT_ID)
@ -79,18 +74,16 @@ def read_and_publish_temperature_humidity(mqtt_client, modbus_client, host_info)
logging.error(f"Error reading humidity from {host_info['ip']}: {result_hum}") logging.error(f"Error reading humidity from {host_info['ip']}: {result_hum}")
return False return False
raw_hum = result_hum.registers[0] raw_hum = result_hum.registers[0]
humidity = raw_hum * 100 / 1000 # Correct formula: 0% to 100% RH humidity = raw_hum * 100 / 1000
logging.info(f"Raw humidity from {host_info['ip']}: {raw_hum}, Corrected: {humidity:.1f}%RH") logging.info(f"Humidity from {host_info['ip']}: {humidity:.1f}%RH")
# Prepare new topic structure: Location/{location_name}/{type}/... # Publish data
location = host_info["location"] location = host_info["location"]
sensor_type = "temperature-humidity" sensor_type = "temperature-humidity"
current_time = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") current_time = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
# Create base topic path
base_topic = f"Location/{location}/{sensor_type}" base_topic = f"Location/{location}/{sensor_type}"
# Publish to individual topics
topics_data = [ topics_data = [
(f"{base_topic}/Time", current_time), (f"{base_topic}/Time", current_time),
(f"{base_topic}/Status", "online"), (f"{base_topic}/Status", "online"),
@ -98,17 +91,15 @@ def read_and_publish_temperature_humidity(mqtt_client, modbus_client, host_info)
(f"{base_topic}/Data/humidity", round(humidity, 1)) (f"{base_topic}/Data/humidity", round(humidity, 1))
] ]
# Publish all data
all_published = True all_published = True
for topic, value in topics_data: for topic, value in topics_data:
try: try:
# Convert value to string for MQTT
payload = str(value) payload = str(value)
result = mqtt_client.publish(topic, payload) result = mqtt_client.publish(topic, payload)
result.wait_for_publish() result.wait_for_publish()
if result.is_published(): if result.is_published():
logging.info(f"Published to '{topic}': {payload}") logging.debug(f"Published to '{topic}': {payload}")
else: else:
logging.error(f"Failed to publish to '{topic}': {payload}") logging.error(f"Failed to publish to '{topic}': {payload}")
all_published = False all_published = False
@ -116,80 +107,62 @@ def read_and_publish_temperature_humidity(mqtt_client, modbus_client, host_info)
logging.error(f"Error publishing to '{topic}': {e}") logging.error(f"Error publishing to '{topic}': {e}")
all_published = False all_published = False
if all_published: return all_published
logging.info(f"Successfully published all temperature-humidity data for {location}")
return True
else:
logging.error(f"Failed to publish some temperature-humidity data from {host_info['ip']}")
return False
except ModbusException as e:
logging.error(f"Modbus error from {host_info['ip']}: {e}", exc_info=True)
return False
except Exception as e: except Exception as e:
logging.error(f"Unexpected error from {host_info['ip']}: {e}", exc_info=True) logging.error(f"Error in temperature_humidity reading: {e}")
return False return False
def read_and_publish_cwt_co2(mqtt_client, modbus_client, host_info): def read_and_publish_cwt_co2(mqtt_client, modbus_client, host_info):
"""Read CWT CO2 sensor (humidity, temperature, CO2)""" """Read CWT CO2 sensor"""
try: try:
# Read all 3 registers at once (registers 0, 1, 2) # Read all 3 registers
# According to CWT manual: register 0=humidity, 1=temperature, 2=CO2
result = modbus_client.read_holding_registers(address=0, count=3, slave=UNIT_ID) result = modbus_client.read_holding_registers(address=0, count=3, slave=UNIT_ID)
if not hasattr(result, 'registers') or len(result.registers) != 3: if not hasattr(result, 'registers') or len(result.registers) != 3:
logging.error(f"Error reading CWT registers from {host_info['ip']}: {result}") logging.error(f"Error reading CWT registers from {host_info['ip']}: {result}")
return False return False
raw_humidity = result.registers[0] # Register 0: Humidity (0.1%RH) raw_humidity = result.registers[0]
raw_temperature = result.registers[1] # Register 1: Temperature (0.1°C) raw_temperature = result.registers[1]
raw_co2 = result.registers[2] # Register 2: CO2 (1ppm) raw_co2 = result.registers[2]
logging.info(f"Raw CWT values from {host_info['ip']} - Humidity: {raw_humidity}, Temperature: {raw_temperature}, CO2: {raw_co2}") # Process values
# Process values according to CWT manual
# Humidity: 0.1%RH resolution
humidity = raw_humidity / 10.0 humidity = raw_humidity / 10.0
# Temperature: 0.1°C resolution, handle negative values (2's complement) if raw_temperature > 32767:
if raw_temperature > 32767: # Negative temperature in 2's complement
temperature = (raw_temperature - 65536) / 10.0 temperature = (raw_temperature - 65536) / 10.0
else: else:
temperature = raw_temperature / 10.0 temperature = raw_temperature / 10.0
# CO2: 1ppm resolution for standard sensor
co2_ppm = raw_co2 co2_ppm = raw_co2
logging.info(f"Processed CWT values from {host_info['ip']} - Humidity: {humidity:.1f}%RH, Temperature: {temperature:.1f}°C, CO2: {co2_ppm}ppm") logging.info(f"CWT from {host_info['ip']} - Temp: {temperature:.1f}°C, Humidity: {humidity:.1f}%RH, CO2: {co2_ppm}ppm")
# Prepare new topic structure: Location/{location_name}/{type}/... # Publish data
location = host_info["location"] location = host_info["location"]
sensor_type = "CO2-gas" sensor_type = "CO2-gas"
current_time = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") current_time = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
# Create base topic path
base_topic = f"Location/{location}/{sensor_type}" base_topic = f"Location/{location}/{sensor_type}"
# Publish to individual topics
topics_data = [ topics_data = [
(f"{base_topic}/Time", current_time), (f"{base_topic}/Time", current_time),
(f"{base_topic}/Status", "online"), (f"{base_topic}/Status", "online"),
(f"{base_topic}/Data/co2", co2_ppm),
(f"{base_topic}/Data/temperature", round(temperature, 1)), (f"{base_topic}/Data/temperature", round(temperature, 1)),
(f"{base_topic}/Data/humidity", round(humidity, 1)) (f"{base_topic}/Data/humidity", round(humidity, 1)),
(f"{base_topic}/Data/co2", co2_ppm)
] ]
# Publish all data
all_published = True all_published = True
for topic, value in topics_data: for topic, value in topics_data:
try: try:
# Convert value to string for MQTT
payload = str(value) payload = str(value)
result = mqtt_client.publish(topic, payload) result = mqtt_client.publish(topic, payload)
result.wait_for_publish() result.wait_for_publish()
if result.is_published(): if result.is_published():
logging.info(f"Published to '{topic}': {payload}") logging.debug(f"Published to '{topic}': {payload}")
else: else:
logging.error(f"Failed to publish to '{topic}': {payload}") logging.error(f"Failed to publish to '{topic}': {payload}")
all_published = False all_published = False
@ -197,23 +170,19 @@ def read_and_publish_cwt_co2(mqtt_client, modbus_client, host_info):
logging.error(f"Error publishing to '{topic}': {e}") logging.error(f"Error publishing to '{topic}': {e}")
all_published = False all_published = False
if all_published: return all_published
logging.info(f"Successfully published all CO2-gas data for {location}")
return True
else:
logging.error(f"Failed to publish some CO2-gas data from {host_info['ip']}")
return False
except ModbusException as e:
logging.error(f"Modbus error from CWT sensor {host_info['ip']}: {e}", exc_info=True)
return False
except Exception as e: except Exception as e:
logging.error(f"Unexpected error from CWT sensor {host_info['ip']}: {e}", exc_info=True) logging.error(f"Error in CWT CO2 reading: {e}")
return False return False
def main_loop(): def main_loop():
"""Main function to connect and publish data in cycles""" """Main function to connect and publish data in cycles"""
# Initialize sensor tracker and health check server # Import here to avoid circular import
from sensor_tracker import get_sensor_tracker
from health_check import HealthCheckServer
# Initialize components
sensor_tracker = get_sensor_tracker() sensor_tracker = get_sensor_tracker()
health_server = HealthCheckServer() health_server = HealthCheckServer()
@ -227,7 +196,11 @@ def main_loop():
try: try:
# Start health check server # Start health check server
health_server.start() logging.info("Starting health check server...")
if health_server.start():
logging.info("Health check server started successfully")
else:
logging.warning("Health check server failed to start, continuing anyway...")
# Connect to MQTT broker # Connect to MQTT broker
logging.info(f"Connecting to MQTT broker {MQTT_BROKER}:{MQTT_PORT}...") logging.info(f"Connecting to MQTT broker {MQTT_BROKER}:{MQTT_PORT}...")
@ -235,76 +208,68 @@ def main_loop():
mqtt_client.loop_start() mqtt_client.loop_start()
logging.info(f"Starting monitoring of {len(MODBUS_HOSTS)} sensors") logging.info(f"Starting monitoring of {len(MODBUS_HOSTS)} sensors")
logging.info("System status can be monitored at:") if health_server.is_running():
logging.info(f" - Health: http://0.0.0.0:8080/health") logging.info("Health check available at: http://0.0.0.0:8080/health")
logging.info(f" - Sensors: http://0.0.0.0:8080/sensors")
# Main loop to read and publish data from all hosts # Main loop
while True: while True:
for host_info in MODBUS_HOSTS: for host_info in MODBUS_HOSTS:
modbus_client = None modbus_client = None
error_message = None
try: try:
modbus_client = ModbusTcpClient( modbus_client = ModbusTcpClient(
host=host_info["ip"], host=host_info["ip"],
port=MODBUS_PORT, port=MODBUS_PORT,
timeout=10 # Reduced timeout from 30 to 10 seconds timeout=10
) )
logging.info(f"Processing channel {host_info['location']} at {host_info['ip']}:{MODBUS_PORT}") logging.info(f"Processing {host_info['location']} at {host_info['ip']}")
success = read_and_publish_data(mqtt_client, modbus_client, host_info) success = read_and_publish_data(mqtt_client, modbus_client, host_info)
if success: if success:
# Record successful reading
sensor_tracker.record_success(host_info, mqtt_client) sensor_tracker.record_success(host_info, mqtt_client)
logging.info(f"Successfully processed {host_info['location']} ({host_info['ip']})") logging.info(f"Successfully processed {host_info['location']}")
else: else:
error_message = f"Failed to read/publish data from {host_info['ip']}" error_message = f"Failed to read/publish data from {host_info['ip']}"
sensor_tracker.record_failure(host_info, error_message, mqtt_client) sensor_tracker.record_failure(host_info, error_message, mqtt_client)
logging.warning(f"Failed to process {host_info['location']} ({host_info['ip']}), will retry next cycle.") logging.warning(f"Failed to process {host_info['location']}")
except Exception as e: except Exception as e:
error_message = f"Exception processing {host_info['ip']}: {str(e)}" error_message = f"Exception processing {host_info['ip']}: {str(e)}"
sensor_tracker.record_failure(host_info, error_message, mqtt_client) sensor_tracker.record_failure(host_info, error_message, mqtt_client)
logging.error(f"Error processing {host_info['location']} ({host_info['ip']}): {e}", exc_info=True) logging.error(f"Error processing {host_info['location']}: {e}")
finally: finally:
# Ensure modbus connection is always closed
if modbus_client: if modbus_client:
try: try:
modbus_client.close() modbus_client.close()
logging.debug(f"Closed connection to {host_info['ip']}")
except Exception as close_error: except Exception as close_error:
logging.warning(f"Error closing connection to {host_info['ip']}: {close_error}") logging.warning(f"Error closing connection: {close_error}")
finally:
modbus_client = None # Explicit cleanup
# Add small delay between processing each sensor
time.sleep(1) time.sleep(1)
# Log system summary every cycle # Log summary
summary = sensor_tracker.get_summary() summary = sensor_tracker.get_summary()
logging.info(f"Cycle completed - Online: {summary['online_sensors']}/{summary['total_sensors']} sensors " logging.info(f"Cycle completed - Online: {summary['online_sensors']}/{summary['total_sensors']} sensors")
f"({summary['health_percentage']:.1f}% health), "
f"Alerts: {summary['alerted_sensors']}")
# Force garbage collection every cycle to prevent memory buildup
gc.collect() gc.collect()
logging.info(f"Waiting {PUBLISH_INTERVAL} seconds...")
logging.info(f"Waiting {PUBLISH_INTERVAL} seconds until next cycle...")
time.sleep(PUBLISH_INTERVAL) time.sleep(PUBLISH_INTERVAL)
except KeyboardInterrupt: except KeyboardInterrupt:
logging.info("Received stop signal from user, shutting down...") logging.info("Received stop signal, shutting down...")
except Exception as e: except Exception as e:
logging.error(f"Unexpected error in main loop: {e}", exc_info=True) logging.error(f"Unexpected error in main loop: {e}", exc_info=True)
finally: finally:
# Cleanup # Cleanup
try: try:
health_server.stop() if health_server:
health_server.stop()
except: except:
pass pass
mqtt_client.loop_stop() try:
mqtt_client.disconnect() mqtt_client.loop_stop()
logging.info("Successfully closed all connections.") mqtt_client.disconnect()
except:
pass
logging.info("Shutdown complete")
if __name__ == "__main__": if __name__ == "__main__":
main_loop() main_loop()