Refactor sensor data publishing and health check server configuration. Updated MQTT topic structure for individual data points, improved error handling, and added garbage collection to manage memory. Enhanced logging for health check server accessibility and sensor status updates.
This commit is contained in:
43
config.py
43
config.py
@ -33,46 +33,3 @@ HEALTH_CHECK_ENABLED = True
|
|||||||
ALERTING_ENABLED = True
|
ALERTING_ENABLED = True
|
||||||
SENSOR_TIMEOUT_THRESHOLD = 3 # Number of consecutive failures before alert
|
SENSOR_TIMEOUT_THRESHOLD = 3 # Number of consecutive failures before alert
|
||||||
RECOVERY_CONFIRMATION_COUNT = 2 # Number of consecutive successes to confirm recovery
|
RECOVERY_CONFIRMATION_COUNT = 2 # Number of consecutive successes to confirm recovery
|
||||||
|
|
||||||
# Sensor status tracking - now integrated into data payload
|
|
||||||
# New topic structure: Location/{location_name}/{sensor_type}/{data_type}
|
|
||||||
# Where sensor_type is: "temperature-humidity" or "CO2-gas"
|
|
||||||
# Where data_type is: "data" or "alerts"
|
|
||||||
# Example topics:
|
|
||||||
# - Location/Warehouse-B/temperature-humidity/data (contains temp, humidity, status)
|
|
||||||
# - Location/Office/CO2-gas/data (contains co2, temp, humidity, status)
|
|
||||||
# - Location/Warehouse-B/temperature-humidity/alerts (contains failure/recovery alerts)
|
|
||||||
|
|
||||||
# New MQTT Topic Structure:
|
|
||||||
# Base: Location/{location_name}/{sensor_type}/{data_type}
|
|
||||||
#
|
|
||||||
# Data topics: Location/{location_name}/{sensor_type}/data
|
|
||||||
# Alert topics: Location/{location_name}/{sensor_type}/alerts
|
|
||||||
#
|
|
||||||
# Payload format for data (JSON):
|
|
||||||
# {
|
|
||||||
# "timestamp": "2024-01-01 12:00:00",
|
|
||||||
# "location": "Warehouse-B",
|
|
||||||
# "sensor_type": "temperature-humidity",
|
|
||||||
# "ip": "10.84.60.31",
|
|
||||||
# "status": "online|offline",
|
|
||||||
# "data": {
|
|
||||||
# "temperature": 25.5,
|
|
||||||
# "humidity": 60.2
|
|
||||||
# }
|
|
||||||
# }
|
|
||||||
#
|
|
||||||
# Payload format for alerts (JSON):
|
|
||||||
# {
|
|
||||||
# "alert_type": "sensor_failure|sensor_recovery",
|
|
||||||
# "timestamp": "2024-01-01T12:00:00.000000+00:00",
|
|
||||||
# "sensor_id": "10.84.60.31_Warehouse-B",
|
|
||||||
# "sensor_ip": "10.84.60.31",
|
|
||||||
# "sensor_location": "Warehouse-B",
|
|
||||||
# "sensor_type": "temperature_humidity",
|
|
||||||
# "consecutive_failures": 3,
|
|
||||||
# "last_error": "Failed to read data",
|
|
||||||
# "severity": "critical|info"
|
|
||||||
# }
|
|
||||||
|
|
||||||
|
|
||||||
|
@ -59,15 +59,17 @@ class HealthCheckServer:
|
|||||||
return
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.server = HTTPServer(('', self.port), HealthCheckHandler)
|
# Bind to all interfaces to make it accessible from outside container
|
||||||
|
self.server = HTTPServer(('0.0.0.0', self.port), HealthCheckHandler)
|
||||||
self.thread = threading.Thread(target=self.server.serve_forever, daemon=True)
|
self.thread = threading.Thread(target=self.server.serve_forever, daemon=True)
|
||||||
self.thread.start()
|
self.thread.start()
|
||||||
logging.info(f"Health check server started on port {self.port}")
|
logging.info(f"Health check server started on 0.0.0.0:{self.port}")
|
||||||
logging.info(f"Health check endpoints:")
|
logging.info(f"Health check endpoints:")
|
||||||
logging.info(f" - http://localhost:{self.port}/health")
|
logging.info(f" - http://0.0.0.0:{self.port}/health")
|
||||||
logging.info(f" - http://localhost:{self.port}/sensors")
|
logging.info(f" - http://0.0.0.0:{self.port}/sensors")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error(f"Failed to start health check server: {e}")
|
logging.error(f"Failed to start health check server: {e}")
|
||||||
|
raise e # Re-raise to make the issue visible
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
"""Stop the health check server"""
|
"""Stop the health check server"""
|
||||||
|
@ -85,6 +85,8 @@ job "poe-sensor" {
|
|||||||
env {
|
env {
|
||||||
LOG_LEVEL = "INFO"
|
LOG_LEVEL = "INFO"
|
||||||
PYTHONUNBUFFERED = "1"
|
PYTHONUNBUFFERED = "1"
|
||||||
|
PYTHONDONTWRITEBYTECODE = "1" # Prevent .pyc files to save memory
|
||||||
|
PYTHONMALLOC = "malloc" # Use system malloc for better memory management
|
||||||
TZ = "Asia/Ho_Chi_Minh"
|
TZ = "Asia/Ho_Chi_Minh"
|
||||||
# MQTT configuration (can be overridden by config.py)
|
# MQTT configuration (can be overridden by config.py)
|
||||||
MQTT_BROKER = "mqtt.service.mesh"
|
MQTT_BROKER = "mqtt.service.mesh"
|
||||||
@ -96,7 +98,7 @@ job "poe-sensor" {
|
|||||||
# Resource allocation
|
# Resource allocation
|
||||||
resources {
|
resources {
|
||||||
cpu = 256
|
cpu = 256
|
||||||
memory = 256
|
memory = 512
|
||||||
}
|
}
|
||||||
|
|
||||||
# Logs configuration
|
# Logs configuration
|
||||||
|
124
sensor_bridge.py
124
sensor_bridge.py
@ -1,6 +1,7 @@
|
|||||||
import logging
|
import logging
|
||||||
import json
|
import json
|
||||||
import time
|
import time
|
||||||
|
import gc # Add garbage collection
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
from pymodbus.client import ModbusTcpClient
|
from pymodbus.client import ModbusTcpClient
|
||||||
from pymodbus.exceptions import ModbusException
|
from pymodbus.exceptions import ModbusException
|
||||||
@ -31,7 +32,7 @@ def on_publish(client, userdata, mid):
|
|||||||
def read_and_publish_data(mqtt_client, modbus_client, host_info):
|
def read_and_publish_data(mqtt_client, modbus_client, host_info):
|
||||||
"""Read data from Modbus and publish to MQTT"""
|
"""Read data from Modbus and publish to MQTT"""
|
||||||
try:
|
try:
|
||||||
# Check and establish Modbus connection
|
# Check and establish Modbus connection with explicit timeout
|
||||||
if not modbus_client.is_socket_open():
|
if not modbus_client.is_socket_open():
|
||||||
logging.info(f"Attempting to connect to {host_info['ip']}:{MODBUS_PORT}")
|
logging.info(f"Attempting to connect to {host_info['ip']}:{MODBUS_PORT}")
|
||||||
connection_result = modbus_client.connect()
|
connection_result = modbus_client.connect()
|
||||||
@ -52,6 +53,13 @@ def read_and_publish_data(mqtt_client, modbus_client, host_info):
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error(f"Unexpected error from {host_info['ip']}: {e}", exc_info=True)
|
logging.error(f"Unexpected error from {host_info['ip']}: {e}", exc_info=True)
|
||||||
return False
|
return False
|
||||||
|
finally:
|
||||||
|
# Ensure connection is closed after each operation to prevent resource leaks
|
||||||
|
try:
|
||||||
|
if modbus_client and modbus_client.is_socket_open():
|
||||||
|
modbus_client.close()
|
||||||
|
except Exception as e:
|
||||||
|
logging.warning(f"Error closing modbus connection for {host_info['ip']}: {e}")
|
||||||
|
|
||||||
def read_and_publish_temperature_humidity(mqtt_client, modbus_client, host_info):
|
def read_and_publish_temperature_humidity(mqtt_client, modbus_client, host_info):
|
||||||
"""Read temperature and humidity sensors"""
|
"""Read temperature and humidity sensors"""
|
||||||
@ -74,39 +82,45 @@ def read_and_publish_temperature_humidity(mqtt_client, modbus_client, host_info)
|
|||||||
humidity = raw_hum * 100 / 1000 # Correct formula: 0% to 100% RH
|
humidity = raw_hum * 100 / 1000 # Correct formula: 0% to 100% RH
|
||||||
logging.info(f"Raw humidity from {host_info['ip']}: {raw_hum}, Corrected: {humidity:.1f}%RH")
|
logging.info(f"Raw humidity from {host_info['ip']}: {raw_hum}, Corrected: {humidity:.1f}%RH")
|
||||||
|
|
||||||
# Prepare new topic structure: Location/{location_name}/{sensor_type}/data
|
# Prepare new topic structure: Location/{location_name}/{type}/...
|
||||||
location = host_info["location"]
|
location = host_info["location"]
|
||||||
sensor_type = "temperature-humidity"
|
sensor_type = "temperature-humidity"
|
||||||
current_time = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
|
current_time = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
|
||||||
|
|
||||||
# Create topic for combined data
|
# Create base topic path
|
||||||
data_topic = f"Location/{location}/{sensor_type}/data"
|
base_topic = f"Location/{location}/{sensor_type}"
|
||||||
|
|
||||||
# Create JSON payload with all data and status
|
# Publish to individual topics
|
||||||
payload = {
|
topics_data = [
|
||||||
"timestamp": current_time,
|
(f"{base_topic}/Time", current_time),
|
||||||
"location": location,
|
(f"{base_topic}/Status", "online"),
|
||||||
"sensor_type": sensor_type,
|
(f"{base_topic}/Data/temperature", round(temperature, 1)),
|
||||||
"ip": host_info["ip"],
|
(f"{base_topic}/Data/humidity", round(humidity, 1))
|
||||||
"status": "online",
|
]
|
||||||
"data": {
|
|
||||||
"temperature": round(temperature, 1),
|
|
||||||
"humidity": round(humidity, 1)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
# Publish combined data as JSON
|
# Publish all data
|
||||||
result = mqtt_client.publish(data_topic, json.dumps(payload))
|
all_published = True
|
||||||
|
for topic, value in topics_data:
|
||||||
|
try:
|
||||||
|
# Convert value to string for MQTT
|
||||||
|
payload = str(value)
|
||||||
|
result = mqtt_client.publish(topic, payload)
|
||||||
result.wait_for_publish()
|
result.wait_for_publish()
|
||||||
|
|
||||||
logging.info(f"Published to '{data_topic}': {json.dumps(payload, indent=2)}")
|
|
||||||
|
|
||||||
# Check if published successfully
|
|
||||||
if result.is_published():
|
if result.is_published():
|
||||||
logging.info(f"Successfully published temperature-humidity data for {location}")
|
logging.info(f"Published to '{topic}': {payload}")
|
||||||
|
else:
|
||||||
|
logging.error(f"Failed to publish to '{topic}': {payload}")
|
||||||
|
all_published = False
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"Error publishing to '{topic}': {e}")
|
||||||
|
all_published = False
|
||||||
|
|
||||||
|
if all_published:
|
||||||
|
logging.info(f"Successfully published all temperature-humidity data for {location}")
|
||||||
return True
|
return True
|
||||||
else:
|
else:
|
||||||
logging.error(f"Failed to publish temperature-humidity data from {host_info['ip']}")
|
logging.error(f"Failed to publish some temperature-humidity data from {host_info['ip']}")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
except ModbusException as e:
|
except ModbusException as e:
|
||||||
@ -148,40 +162,46 @@ def read_and_publish_cwt_co2(mqtt_client, modbus_client, host_info):
|
|||||||
|
|
||||||
logging.info(f"Processed CWT values from {host_info['ip']} - Humidity: {humidity:.1f}%RH, Temperature: {temperature:.1f}°C, CO2: {co2_ppm}ppm")
|
logging.info(f"Processed CWT values from {host_info['ip']} - Humidity: {humidity:.1f}%RH, Temperature: {temperature:.1f}°C, CO2: {co2_ppm}ppm")
|
||||||
|
|
||||||
# Prepare new topic structure: Location/{location_name}/{sensor_type}/data
|
# Prepare new topic structure: Location/{location_name}/{type}/...
|
||||||
location = host_info["location"]
|
location = host_info["location"]
|
||||||
sensor_type = "CO2-gas"
|
sensor_type = "CO2-gas"
|
||||||
current_time = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
|
current_time = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
|
||||||
|
|
||||||
# Create topic for combined data
|
# Create base topic path
|
||||||
data_topic = f"Location/{location}/{sensor_type}/data"
|
base_topic = f"Location/{location}/{sensor_type}"
|
||||||
|
|
||||||
# Create JSON payload with all data and status
|
# Publish to individual topics
|
||||||
payload = {
|
topics_data = [
|
||||||
"timestamp": current_time,
|
(f"{base_topic}/Time", current_time),
|
||||||
"location": location,
|
(f"{base_topic}/Status", "online"),
|
||||||
"sensor_type": sensor_type,
|
(f"{base_topic}/Data/co2", co2_ppm),
|
||||||
"ip": host_info["ip"],
|
(f"{base_topic}/Data/temperature", round(temperature, 1)),
|
||||||
"status": "online",
|
(f"{base_topic}/Data/humidity", round(humidity, 1))
|
||||||
"data": {
|
]
|
||||||
"co2": co2_ppm,
|
|
||||||
"temperature": round(temperature, 1),
|
|
||||||
"humidity": round(humidity, 1)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
# Publish combined data as JSON
|
# Publish all data
|
||||||
result = mqtt_client.publish(data_topic, json.dumps(payload))
|
all_published = True
|
||||||
|
for topic, value in topics_data:
|
||||||
|
try:
|
||||||
|
# Convert value to string for MQTT
|
||||||
|
payload = str(value)
|
||||||
|
result = mqtt_client.publish(topic, payload)
|
||||||
result.wait_for_publish()
|
result.wait_for_publish()
|
||||||
|
|
||||||
logging.info(f"Published to '{data_topic}': {json.dumps(payload, indent=2)}")
|
|
||||||
|
|
||||||
# Check if published successfully
|
|
||||||
if result.is_published():
|
if result.is_published():
|
||||||
logging.info(f"Successfully published CO2-gas data for {location}")
|
logging.info(f"Published to '{topic}': {payload}")
|
||||||
|
else:
|
||||||
|
logging.error(f"Failed to publish to '{topic}': {payload}")
|
||||||
|
all_published = False
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"Error publishing to '{topic}': {e}")
|
||||||
|
all_published = False
|
||||||
|
|
||||||
|
if all_published:
|
||||||
|
logging.info(f"Successfully published all CO2-gas data for {location}")
|
||||||
return True
|
return True
|
||||||
else:
|
else:
|
||||||
logging.error(f"Failed to publish CO2-gas data from {host_info['ip']}")
|
logging.error(f"Failed to publish some CO2-gas data from {host_info['ip']}")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
except ModbusException as e:
|
except ModbusException as e:
|
||||||
@ -222,6 +242,7 @@ def main_loop():
|
|||||||
# Main loop to read and publish data from all hosts
|
# Main loop to read and publish data from all hosts
|
||||||
while True:
|
while True:
|
||||||
for host_info in MODBUS_HOSTS:
|
for host_info in MODBUS_HOSTS:
|
||||||
|
modbus_client = None
|
||||||
error_message = None
|
error_message = None
|
||||||
try:
|
try:
|
||||||
modbus_client = ModbusTcpClient(
|
modbus_client = ModbusTcpClient(
|
||||||
@ -246,11 +267,15 @@ def main_loop():
|
|||||||
sensor_tracker.record_failure(host_info, error_message, mqtt_client)
|
sensor_tracker.record_failure(host_info, error_message, mqtt_client)
|
||||||
logging.error(f"Error processing {host_info['location']} ({host_info['ip']}): {e}", exc_info=True)
|
logging.error(f"Error processing {host_info['location']} ({host_info['ip']}): {e}", exc_info=True)
|
||||||
finally:
|
finally:
|
||||||
|
# Ensure modbus connection is always closed
|
||||||
|
if modbus_client:
|
||||||
try:
|
try:
|
||||||
modbus_client.close()
|
modbus_client.close()
|
||||||
logging.debug(f"Closed connection to {host_info['ip']}")
|
logging.debug(f"Closed connection to {host_info['ip']}")
|
||||||
except:
|
except Exception as close_error:
|
||||||
pass
|
logging.warning(f"Error closing connection to {host_info['ip']}: {close_error}")
|
||||||
|
finally:
|
||||||
|
modbus_client = None # Explicit cleanup
|
||||||
|
|
||||||
# Add small delay between processing each sensor
|
# Add small delay between processing each sensor
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
@ -261,6 +286,9 @@ def main_loop():
|
|||||||
f"({summary['health_percentage']:.1f}% health), "
|
f"({summary['health_percentage']:.1f}% health), "
|
||||||
f"Alerts: {summary['alerted_sensors']}")
|
f"Alerts: {summary['alerted_sensors']}")
|
||||||
|
|
||||||
|
# Force garbage collection every cycle to prevent memory buildup
|
||||||
|
gc.collect()
|
||||||
|
|
||||||
logging.info(f"Waiting {PUBLISH_INTERVAL} seconds until next cycle...")
|
logging.info(f"Waiting {PUBLISH_INTERVAL} seconds until next cycle...")
|
||||||
time.sleep(PUBLISH_INTERVAL)
|
time.sleep(PUBLISH_INTERVAL)
|
||||||
|
|
||||||
|
@ -166,24 +166,26 @@ class SensorTracker:
|
|||||||
else:
|
else:
|
||||||
sensor_type = "temperature-humidity"
|
sensor_type = "temperature-humidity"
|
||||||
|
|
||||||
# Create topic for offline status using new structure: Location/{location_name}/{sensor_type}/data
|
# Create base topic path
|
||||||
data_topic = f"Location/{location}/{sensor_type}/data"
|
base_topic = f"Location/{location}/{sensor_type}"
|
||||||
|
current_time = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
|
||||||
|
|
||||||
# Create JSON payload with offline status
|
# Publish offline status to individual topics
|
||||||
payload = {
|
topics_data = [
|
||||||
"timestamp": datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S"),
|
(f"{base_topic}/Time", current_time),
|
||||||
"location": location,
|
(f"{base_topic}/Status", "offline")
|
||||||
"sensor_type": sensor_type,
|
]
|
||||||
"ip": host_info["ip"],
|
|
||||||
"status": "offline",
|
|
||||||
"error": sensor.get("last_error", "Unknown error"),
|
|
||||||
"consecutive_failures": sensor["consecutive_failures"],
|
|
||||||
"uptime_percentage": round(sensor["uptime_percentage"], 2)
|
|
||||||
}
|
|
||||||
|
|
||||||
result = mqtt_client.publish(data_topic, json.dumps(payload))
|
# Publish offline status
|
||||||
|
for topic, value in topics_data:
|
||||||
|
try:
|
||||||
|
payload = str(value)
|
||||||
|
result = mqtt_client.publish(topic, payload)
|
||||||
result.wait_for_publish()
|
result.wait_for_publish()
|
||||||
logging.info(f"Published offline status to '{data_topic}'")
|
logging.info(f"Published offline status to '{topic}': {payload}")
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"Error publishing offline status to '{topic}': {e}")
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error(f"Failed to publish offline status: {e}")
|
logging.error(f"Failed to publish offline status: {e}")
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user