Enhance POE project with health check server, sensor tracking, and dynamic MQTT topic structure. Updated configuration for multiple Modbus hosts and added alerting system for sensor failures and recoveries. Improved logging and error handling throughout the application.

This commit is contained in:
Naab2k3
2025-06-23 09:14:40 +07:00
parent ad87c01f34
commit 81ab6191da
15 changed files with 752 additions and 222 deletions

View File

@ -46,6 +46,10 @@ python main.py
- **Data Processing**: Converts raw sensor values to calibrated readings
- **Error Handling**: Robust error handling and retry mechanisms
- **Logging**: Comprehensive logging for monitoring and debugging
- **Health Check API**: HTTP endpoints for system health monitoring
- **Sensor Status Tracking**: Real-time tracking of individual sensor health
- **Alerting System**: Automatic alerts for sensor failures and recovery
- **Recovery Detection**: Detects when failed sensors come back online
## 📈 Data Format
@ -77,6 +81,51 @@ All configuration is centralized in `config.py`:
- **Register 0**: Temperature (raw value × 0.1 - 40)
- **Register 1**: Humidity (raw value × 0.1)
## 🏥 Health Check & Monitoring
### Health Check Endpoints
The service provides HTTP endpoints for monitoring:
- **http://localhost:8080/health** - Basic service health check
- **http://localhost:8080/sensors** - Detailed sensor status information
### Alerting System
The system automatically sends MQTT alerts for:
- **Sensor Failures**: When a sensor fails 3 consecutive times
- **Sensor Recovery**: When a failed sensor comes back online
Alert topics:
- `sensor-alerts` - Failure and recovery alerts
- `sensor-status/{location}/status` - Individual sensor status updates
### Demo Monitoring Script
Use the demo script to test monitoring features:
```bash
# Check health endpoint
python demo_monitoring.py health
# Check sensors status
python demo_monitoring.py sensors
# Monitor system for 5 minutes
python demo_monitoring.py monitor 300
```
### Configuration Options
| Parameter | Description | Default |
|-----------|-------------|---------|
| `HEALTH_CHECK_ENABLED` | Enable/disable health check server | True |
| `HEALTH_CHECK_PORT` | HTTP server port | 8080 |
| `ALERTING_ENABLED` | Enable/disable MQTT alerts | True |
| `SENSOR_TIMEOUT_THRESHOLD` | Failures before alert | 3 |
| `RECOVERY_CONFIRMATION_COUNT` | Successes to confirm recovery | 2 |
## 🛠️ Maintenance
### Running as a Service

Binary file not shown.

View File

@ -1,20 +1,78 @@
import time
# Modbus configuration
MODBUS_HOST = "10.84.48.153"
MODBUS_HOSTS = [
{"ip": "10.84.60.31", "location": "Warehouse-B", "type": "temperature_humidity"},
{"ip": "10.84.60.32", "location": "Warehouse-C", "type": "temperature_humidity"},
{"ip": "10.84.60.33", "location": "Dyeing-Kitchen", "type": "temperature_humidity"},
{"ip": "10.84.60.34", "location": "Second-Floor", "type": "temperature_humidity"},
{"ip": "10.84.60.35", "location": "Warehouse-A", "type": "temperature_humidity"},
{"ip": "10.84.60.36", "location": "Lab-Room", "type": "temperature_humidity"},
{"ip": "10.84.60.37", "location": "Office", "type": "cwt_co2"},
]
MODBUS_PORT = 505
UNIT_ID = 1
# MQTT configuration
MQTT_BROKER = "mqtt.service.mesh"
MQTT_PORT = 1883
MQTT_TOPIC = "Temperature_Humidity"
# Legacy topic - now using dynamic topic structure: {location}/{sensor_type}/data
MQTT_TOPIC = "Temperature_Humidity" # Keep for backward compatibility
MQTT_CLIENT_ID = f"modbus-mqtt-client-{int(time.time())}"
MQTT_USERNAME = "relay"
MQTT_PASSWORD = "Sey@K9c&Q4^"
# Location information
LOCATION = "Office"
# Read and publish cycle configuration (seconds)
PUBLISH_INTERVAL = 10
# Health check and alerting configuration
HEALTH_CHECK_PORT = 8080
HEALTH_CHECK_ENABLED = True
# Alerting configuration
ALERTING_ENABLED = True
SENSOR_TIMEOUT_THRESHOLD = 3 # Number of consecutive failures before alert
RECOVERY_CONFIRMATION_COUNT = 2 # Number of consecutive successes to confirm recovery
# Sensor status tracking - now integrated into data payload
# New topic structure: Location/{location_name}/{sensor_type}/{data_type}
# Where sensor_type is: "temperature-humidity" or "CO2-gas"
# Where data_type is: "data" or "alerts"
# Example topics:
# - Location/Warehouse-B/temperature-humidity/data (contains temp, humidity, status)
# - Location/Office/CO2-gas/data (contains co2, temp, humidity, status)
# - Location/Warehouse-B/temperature-humidity/alerts (contains failure/recovery alerts)
# New MQTT Topic Structure:
# Base: Location/{location_name}/{sensor_type}/{data_type}
#
# Data topics: Location/{location_name}/{sensor_type}/data
# Alert topics: Location/{location_name}/{sensor_type}/alerts
#
# Payload format for data (JSON):
# {
# "timestamp": "2024-01-01 12:00:00",
# "location": "Warehouse-B",
# "sensor_type": "temperature-humidity",
# "ip": "10.84.60.31",
# "status": "online|offline",
# "data": {
# "temperature": 25.5,
# "humidity": 60.2
# }
# }
#
# Payload format for alerts (JSON):
# {
# "alert_type": "sensor_failure|sensor_recovery",
# "timestamp": "2024-01-01T12:00:00.000000+00:00",
# "sensor_id": "10.84.60.31_Warehouse-B",
# "sensor_ip": "10.84.60.31",
# "sensor_location": "Warehouse-B",
# "sensor_type": "temperature_humidity",
# "consecutive_failures": 3,
# "last_error": "Failed to read data",
# "severity": "critical|info"
# }

77
health_check.py Normal file
View File

@ -0,0 +1,77 @@
import json
import threading
from http.server import HTTPServer, BaseHTTPRequestHandler
from datetime import datetime, timezone
import logging
from config import HEALTH_CHECK_PORT, HEALTH_CHECK_ENABLED
class HealthCheckHandler(BaseHTTPRequestHandler):
def do_GET(self):
if self.path == '/health':
self.send_health_response()
elif self.path == '/sensors':
self.send_sensors_status()
else:
self.send_response(404)
self.end_headers()
def send_health_response(self):
"""Send basic health check response"""
health_data = {
"status": "healthy",
"timestamp": datetime.now(timezone.utc).isoformat(),
"service": "modbus-mqtt-bridge",
"version": "1.0.0"
}
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps(health_data, indent=2).encode())
def send_sensors_status(self):
"""Send detailed sensor status"""
# Get sensor status from the global sensor tracker
from sensor_tracker import get_all_sensor_status
sensors_status = get_all_sensor_status()
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps(sensors_status, indent=2).encode())
def log_message(self, format, *args):
"""Override to use our logging system"""
logging.info(f"Health Check - {format % args}")
class HealthCheckServer:
def __init__(self, port=HEALTH_CHECK_PORT):
self.port = port
self.server = None
self.thread = None
def start(self):
"""Start the health check server in a separate thread"""
if not HEALTH_CHECK_ENABLED:
logging.info("Health check server is disabled")
return
try:
self.server = HTTPServer(('', self.port), HealthCheckHandler)
self.thread = threading.Thread(target=self.server.serve_forever, daemon=True)
self.thread.start()
logging.info(f"Health check server started on port {self.port}")
logging.info(f"Health check endpoints:")
logging.info(f" - http://localhost:{self.port}/health")
logging.info(f" - http://localhost:{self.port}/sensors")
except Exception as e:
logging.error(f"Failed to start health check server: {e}")
def stop(self):
"""Stop the health check server"""
if self.server:
self.server.shutdown()
self.server.server_close()
logging.info("Health check server stopped")

View File

@ -1,17 +0,0 @@
# Active Context
## Current Work Focus
- The script is stable and functional, continuously reading temperature and humidity from a Modbus TCP device and publishing the data to an MQTT broker in pretty-printed JSON format.
- The main focus is on reliability, error handling, and clear logging.
## Recent Changes
- Improved JSON formatting for MQTT payloads (pretty print with indent).
- Refined temperature calibration logic (subtracting 40 from raw value after scaling).
- Enhanced logging for all major events and error conditions.
- All configuration is now at the top of the script for easier modification.
## Next Steps
- Optional: Parameterize configuration via environment variables or a config file for easier deployment.
- Optional: Add support for additional Modbus registers or sensors.
- Optional: Implement more advanced error recovery or alerting (e.g., email/SMS on repeated failures).
- Optional: Containerize the application for easier deployment in production environments.

View File

@ -1,20 +0,0 @@
# Product Context
## Why This Project Exists
Many industrial and environmental sensors use the Modbus protocol for data communication, while modern IoT and monitoring systems often rely on MQTT for data ingestion and distribution. This project bridges the gap between legacy Modbus devices and MQTT-based platforms, enabling seamless integration and real-time data flow.
## Problems Solved
- Eliminates the need for manual data collection from Modbus sensors.
- Automates the process of converting and forwarding sensor data to cloud or local MQTT brokers.
- Provides a reliable, scriptable, and extensible solution for integrating Modbus sensors into IoT ecosystems.
## How It Should Work
- The script runs continuously, connecting to a Modbus TCP device to read temperature and humidity data at regular intervals.
- Data is processed and formatted as JSON, then published to a specified MQTT topic.
- The system handles connection issues gracefully, with logging for troubleshooting.
## User Experience Goals
- Simple configuration via code variables (host, port, credentials, topic, etc.).
- Clear, timestamped logging for all major events and errors.
- Data is published in a structured, readable JSON format for easy consumption by MQTT subscribers.
- Minimal setup required; designed for headless/server environments.

View File

@ -1,23 +0,0 @@
# Progress
## What Works
- The script successfully connects to both the Modbus TCP device and the MQTT broker.
- Temperature and humidity data are read from the correct Modbus registers and calibrated as per device documentation.
- Data is published to the MQTT topic in a pretty-printed JSON format.
- Logging provides clear feedback on all major events and errors.
- The script handles connection failures and retries in the main loop.
## What's Left to Build
- (Optional) Configuration via environment variables or external config file.
- (Optional) Support for additional sensors or Modbus registers.
- (Optional) Advanced error notification (e.g., email/SMS alerts).
- (Optional) Dockerfile or deployment scripts for production use.
## Current Status
- The project is functional and meets its core requirements for a Modbus-to-MQTT bridge.
- Ready for deployment in environments matching the current configuration.
## Known Issues
- All configuration is hardcoded; not ideal for dynamic or multi-environment deployments.
- No persistent storage or buffering if MQTT broker is temporarily unavailable.
- No web UI or REST API for monitoring or control.

View File

@ -1,25 +0,0 @@
# Project Brief
## Project Name
Modbus-to-MQTT Bridge for Environmental Sensor
## Overview
This project implements a Python-based bridge that reads temperature and humidity data from a Modbus TCP device and publishes the readings to an MQTT broker in JSON format. The system is designed for continuous, automated data acquisition and integration with IoT or monitoring platforms.
## Core Requirements
- Connect to a Modbus TCP device (sensor gateway) to read holding registers for temperature and humidity.
- Process and calibrate the raw sensor data as per device documentation.
- Publish the processed data to a specified MQTT topic at a configurable interval.
- Support MQTT authentication (username/password).
- Provide clear logging for connection, data acquisition, and publishing events.
## Goals
- Enable seamless integration of Modbus-based sensors with MQTT-based IoT systems.
- Ensure reliable, periodic data transfer with error handling and reconnection logic.
- Maintain code clarity and extensibility for future enhancements (e.g., more sensors, additional data fields).
## Scope
- Single Python script (poe.py) as the main application.
- No web interface or GUI; headless operation via command line.
- Focus on environmental data (temperature, humidity) but extensible for other Modbus registers.
- No persistent storage; data is transient and only sent to MQTT.

View File

@ -1,21 +0,0 @@
# System Patterns
## System Architecture
- **Single-process, event-driven script**: The application runs as a single Python process, using a main loop to periodically read from Modbus and publish to MQTT.
- **Polling pattern**: Data is acquired from the Modbus device at a fixed interval (configurable via `PUBLISH_INTERVAL`).
- **Bridge pattern**: The script acts as a bridge between two protocols (Modbus TCP and MQTT), translating and forwarding data.
## Key Technical Decisions
- **Direct variable configuration**: All connection and operational parameters are set as variables at the top of the script for simplicity.
- **Error handling and reconnection**: The script checks and re-establishes connections to both Modbus and MQTT as needed, with logging for all failures.
- **JSON formatting**: Data is published in pretty-printed JSON for readability and ease of integration.
- **Separation of concerns**: Reading/publishing logic is encapsulated in a dedicated function (`read_and_publish_data`), while the main loop handles orchestration and lifecycle.
## Design Patterns
- **Callback pattern**: Used for MQTT events (on_connect, on_publish).
- **Try/except/finally**: Used for robust error handling and resource cleanup.
- **Headless operation**: No UI; all feedback is via logs and MQTT messages.
## Component Relationships
- The Modbus client and MQTT client are instantiated and managed independently, but coordinated within the main loop.
- Data flows from Modbus -> Python processing -> MQTT publish.

View File

@ -1,21 +0,0 @@
# Technical Context
## Technologies Used
- **Python 3.x**: Main programming language.
- **pymodbus**: For Modbus TCP client communication.
- **paho-mqtt**: For MQTT client functionality.
- **Logging**: Python's built-in logging module for event and error tracking.
## Dependencies
- `pymodbus` (>=2.5, <4.0 recommended for current code)
- `paho-mqtt`
## Development Setup
- No external configuration files; all settings are in `poe.py` as variables.
- Script is intended to run on any system with Python 3.x and the above dependencies installed.
## Technical Constraints
- Assumes the Modbus device is accessible via TCP/IP and supports reading holding registers for temperature and humidity.
- MQTT broker must be reachable from the host running the script.
- No persistent storage or database integration; data is transient.
- No web UI or REST API; all interaction is via logs and MQTT.

109
poe-sensor.nomad Normal file
View File

@ -0,0 +1,109 @@
job "poe-sensor" {
region = "global"
datacenters = ["hs"]
type = "service"
namespace = "production"
meta {
version = "20250617"
}
group "sensor-bridge" {
count = 1
# Network configuration - using host mode for Modbus access
network {
mode = "host"
port "health" {
static = 8080
}
}
# Restart policy
restart {
attempts = 3
interval = "30m"
delay = "15s"
mode = "fail"
}
# Update strategy
update {
max_parallel = 1
min_healthy_time = "30s"
healthy_deadline = "3m"
progress_deadline = "10m"
auto_revert = true
canary = 0
}
service {
name = "${NOMAD_JOB_NAME}"
port = "health"
tags = [
"sensor",
"modbus",
"mqtt",
"iot",
"health-check"
]
check {
type = "http"
path = "/health"
interval = "30s"
timeout = "10s"
check_restart {
limit = 3
grace = "10s"
}
}
}
task "poe-sensor" {
driver = "docker"
config {
image = "registry.dev.meisheng.group/ms_qc_db:20250409"
command = "/bin/bash"
args = [
"-c",
"cd local/poe-sensor && apt-get update -qq && apt-get install -y procps && python -m pip install --upgrade pip && python -m pip install -r requirements.txt && python -c 'import pymodbus, paho.mqtt.client; print(\"Dependencies installed successfully\")' && python main.py"
]
}
# Git artifact - using SSH similar to qc-scanner
artifact {
source = "git::ssh://git@gitea.service.mesh:2222/Mei_Sheng_Textiles/POE-sensor.git"
destination = "local/poe-sensor"
options {
ref = "main"
sshkey = "LS0tLS1CRUdJTiBPUEVOU1NIIFBSSVZBVEUgS0VZLS0tLS0KYjNCbGJuTnphQzFyWlhrdGRqRUFBQUFBQkc1dmJtVUFBQUFFYm05dVpRQUFBQUFBQUFBQkFBQUFNd0FBQUF0emMyZ3RaVwpReU5UVXhPUUFBQUNEbWF6M0ZWdlE1YTRaalY4dUdobENleEFjN0VxbmVVN0FETnFBSXg0cUI4d0FBQUpqNGlkSVcrSW5TCkZnQUFBQXR6YzJndFpXUXlOVFV4T1FBQUFDRG1hejNGVnZRNWE0WmpWOHVHaGxDZXhBYzdFcW5lVTdBRE5xQUl4NHFCOHcKQUFBRURpRXM1ejJRb2dTempvVzdDUnZ3U2RONUpVMTNmZm14cnFIQjNOS3hXUmp1WnJQY1ZXOURscmhtTlh5NGFHVUo3RQpCenNTcWQ1VHNBTTJvQWpIaW9IekFBQUFFbUpoYmk1dVpFQnRjM1JsZUhadUxtTnZiUUVDQXc9PQotLS0tLUVORCBPUkVOU1NIIFBSSVZBVEUgS0VZLS0tLS0K"
}
}
# Environment variables
env {
LOG_LEVEL = "INFO"
PYTHONUNBUFFERED = "1"
TZ = "Asia/Ho_Chi_Minh"
# MQTT configuration (can be overridden by config.py)
MQTT_BROKER = "mqtt.service.mesh"
MQTT_PORT = "1883"
MQTT_USERNAME = "relay"
MQTT_PASSWORD = "Sey@K9c&Q4^"
}
# Resource allocation
resources {
cpu = 256
memory = 256
}
# Logs configuration
logs {
max_files = 10
max_file_size = 20
}
}
}
}

View File

@ -1,2 +1,3 @@
pymodbus>=3.5.0
paho-mqtt>=1.6.0
requests>=2.28.0

View File

@ -1,151 +1,282 @@
import logging
import json
import time
from datetime import datetime
from datetime import datetime, timezone
from pymodbus.client import ModbusTcpClient
from pymodbus.exceptions import ModbusException
# Add MQTT client
import paho.mqtt.client as mqtt
# Import configuration
from config import (
MODBUS_HOST, MODBUS_PORT, UNIT_ID,
MODBUS_HOSTS, MODBUS_PORT, UNIT_ID,
MQTT_BROKER, MQTT_PORT, MQTT_TOPIC, MQTT_CLIENT_ID,
MQTT_USERNAME, MQTT_PASSWORD, LOCATION, PUBLISH_INTERVAL
MQTT_USERNAME, MQTT_PASSWORD, PUBLISH_INTERVAL
)
from sensor_tracker import get_sensor_tracker
from health_check import HealthCheckServer
# Setting logging basic to see output
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# MQTT callbacks
def on_connect(client, userdata, flags, rc):
"""Callback when MQTT client connects to broker"""
if rc == 0:
logging.info("Connected to MQTT Broker!")
else:
logging.error(f"Cannot connect to MQTT Broker. Return code: {rc}")
def on_publish(client, userdata, mid):
"""Callback when MQTT message is published"""
logging.info(f"Successfully sent message with ID: {mid}")
def read_and_publish_data(mqtt_client, modbus_client):
def read_and_publish_data(mqtt_client, modbus_client, host_info):
"""Read data from Modbus and publish to MQTT"""
try:
# Check connection to Modbus server
# Check and establish Modbus connection
if not modbus_client.is_socket_open():
if not modbus_client.connect():
logging.error("Cannot connect to Modbus server.")
logging.info(f"Attempting to connect to {host_info['ip']}:{MODBUS_PORT}")
connection_result = modbus_client.connect()
if not connection_result:
logging.error(f"Failed to connect to Modbus server {host_info['ip']}. Connection returned: {connection_result}")
return False
logging.info(f"Successfully connected to {host_info['ip']}:{MODBUS_PORT}")
# Handle different sensor types
if host_info["type"] == "cwt_co2":
return read_and_publish_cwt_co2(mqtt_client, modbus_client, host_info)
else:
return read_and_publish_temperature_humidity(mqtt_client, modbus_client, host_info)
except ModbusException as e:
logging.error(f"Modbus error from {host_info['ip']}: {e}", exc_info=True)
return False
except Exception as e:
logging.error(f"Unexpected error from {host_info['ip']}: {e}", exc_info=True)
return False
def read_and_publish_temperature_humidity(mqtt_client, modbus_client, host_info):
"""Read temperature and humidity sensors"""
try:
# Read temperature (register 0)
result_temp = modbus_client.read_holding_registers(address=0, count=1, slave=UNIT_ID)
if not hasattr(result_temp, 'registers') or not result_temp.registers:
logging.error(f"Error reading temperature from {host_info['ip']}: {result_temp}")
return False
raw_temp = result_temp.registers[0]
temperature = (125 - (-40)) * raw_temp / 1650 - 40 # Correct formula: -40°C to 125°C
logging.info(f"Raw temperature from {host_info['ip']}: {raw_temp}, Corrected: {temperature:.1f}°C")
# Read humidity (register 1)
result_hum = modbus_client.read_holding_registers(address=1, count=1, slave=UNIT_ID)
if not hasattr(result_hum, 'registers') or not result_hum.registers:
logging.error(f"Error reading humidity from {host_info['ip']}: {result_hum}")
return False
raw_hum = result_hum.registers[0]
humidity = raw_hum * 100 / 1000 # Correct formula: 0% to 100% RH
logging.info(f"Raw humidity from {host_info['ip']}: {raw_hum}, Corrected: {humidity:.1f}%RH")
# Initialize data to publish
data = {
"time": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"),
"location": LOCATION
# Prepare new topic structure: Location/{location_name}/{sensor_type}/data
location = host_info["location"]
sensor_type = "temperature-humidity"
current_time = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
# Create topic for combined data
data_topic = f"Location/{location}/{sensor_type}/data"
# Create JSON payload with all data and status
payload = {
"timestamp": current_time,
"location": location,
"sensor_type": sensor_type,
"ip": host_info["ip"],
"status": "online",
"data": {
"temperature": round(temperature, 1),
"humidity": round(humidity, 1)
}
}
# Process temperature
if hasattr(result_temp, 'registers') and result_temp.registers:
raw_temp = result_temp.registers[0]
raw_temperature = raw_temp * 0.1
temperature = raw_temperature - 40
logging.info(f"Raw temperature: {raw_temperature:.1f}°C (raw: {raw_temp})")
logging.info(f"Corrected temperature: {temperature:.1f}°C")
data["temperature"] = round(temperature, 1)
else:
logging.error(f"Error reading temperature: {result_temp}")
return False
# Xử lý độ ẩm
if hasattr(result_hum, 'registers') and result_hum.registers:
raw_hum = result_hum.registers[0]
humidity = raw_hum * 0.1
logging.info(f"Humidity: {humidity:.1f}%RH (raw: {raw_hum})")
data["humidity"] = round(humidity, 1)
else:
logging.error(f"Error reading humidity: {result_hum}")
return False
# Convert data to JSON with a better format
# indent=2 creates whitespace and newlines for JSON
payload = json.dumps(data, indent=2)
logging.info(f"Publishing data: {payload}")
result = mqtt_client.publish(MQTT_TOPIC, payload)
# Ensure data is sent
# Publish combined data as JSON
result = mqtt_client.publish(data_topic, json.dumps(payload))
result.wait_for_publish()
logging.info(f"Published to '{data_topic}': {json.dumps(payload, indent=2)}")
# Check if published successfully
if result.is_published():
logging.info(f"Successfully published data to topic '{MQTT_TOPIC}'")
logging.info(f"Successfully published temperature-humidity data for {location}")
return True
else:
logging.error("Cannot publish data")
logging.error(f"Failed to publish temperature-humidity data from {host_info['ip']}")
return False
except ModbusException as e:
logging.error(f"Modbus error from {host_info['ip']}: {e}", exc_info=True)
return False
except Exception as e:
logging.error(f"Error in reading/writing data: {e}", exc_info=True)
logging.error(f"Unexpected error from {host_info['ip']}: {e}", exc_info=True)
return False
def read_and_publish_cwt_co2(mqtt_client, modbus_client, host_info):
"""Read CWT CO2 sensor (humidity, temperature, CO2)"""
try:
# Read all 3 registers at once (registers 0, 1, 2)
# According to CWT manual: register 0=humidity, 1=temperature, 2=CO2
result = modbus_client.read_holding_registers(address=0, count=3, slave=UNIT_ID)
if not hasattr(result, 'registers') or len(result.registers) != 3:
logging.error(f"Error reading CWT registers from {host_info['ip']}: {result}")
return False
raw_humidity = result.registers[0] # Register 0: Humidity (0.1%RH)
raw_temperature = result.registers[1] # Register 1: Temperature (0.1°C)
raw_co2 = result.registers[2] # Register 2: CO2 (1ppm)
logging.info(f"Raw CWT values from {host_info['ip']} - Humidity: {raw_humidity}, Temperature: {raw_temperature}, CO2: {raw_co2}")
# Process values according to CWT manual
# Humidity: 0.1%RH resolution
humidity = raw_humidity / 10.0
# Temperature: 0.1°C resolution, handle negative values (2's complement)
if raw_temperature > 32767: # Negative temperature in 2's complement
temperature = (raw_temperature - 65536) / 10.0
else:
temperature = raw_temperature / 10.0
# CO2: 1ppm resolution for standard sensor
co2_ppm = raw_co2
logging.info(f"Processed CWT values from {host_info['ip']} - Humidity: {humidity:.1f}%RH, Temperature: {temperature:.1f}°C, CO2: {co2_ppm}ppm")
# Prepare new topic structure: Location/{location_name}/{sensor_type}/data
location = host_info["location"]
sensor_type = "CO2-gas"
current_time = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
# Create topic for combined data
data_topic = f"Location/{location}/{sensor_type}/data"
# Create JSON payload with all data and status
payload = {
"timestamp": current_time,
"location": location,
"sensor_type": sensor_type,
"ip": host_info["ip"],
"status": "online",
"data": {
"co2": co2_ppm,
"temperature": round(temperature, 1),
"humidity": round(humidity, 1)
}
}
# Publish combined data as JSON
result = mqtt_client.publish(data_topic, json.dumps(payload))
result.wait_for_publish()
logging.info(f"Published to '{data_topic}': {json.dumps(payload, indent=2)}")
# Check if published successfully
if result.is_published():
logging.info(f"Successfully published CO2-gas data for {location}")
return True
else:
logging.error(f"Failed to publish CO2-gas data from {host_info['ip']}")
return False
except ModbusException as e:
logging.error(f"Modbus error from CWT sensor {host_info['ip']}: {e}", exc_info=True)
return False
except Exception as e:
logging.error(f"Unexpected error from CWT sensor {host_info['ip']}: {e}", exc_info=True)
return False
def main_loop():
"""Main function to connect and publish data in cycles"""
# Initialize sensor tracker and health check server
sensor_tracker = get_sensor_tracker()
health_server = HealthCheckServer()
# Initialize MQTT client
mqtt_client = mqtt.Client(client_id=MQTT_CLIENT_ID)
mqtt_client.on_connect = on_connect
mqtt_client.on_publish = on_publish
# Set username and password if needed
if MQTT_USERNAME and MQTT_PASSWORD:
mqtt_client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
# Initialize Modbus TCP client
modbus_client = ModbusTcpClient(
host=MODBUS_HOST,
port=MODBUS_PORT,
timeout=30
)
try:
# Start health check server
health_server.start()
# Connect to MQTT broker
logging.info(f"Connecting to MQTT broker {MQTT_BROKER}:{MQTT_PORT}...")
mqtt_client.connect(MQTT_BROKER, MQTT_PORT, 60)
mqtt_client.loop_start()
logging.info(f"Attempting to connect to Modbus TCP server: {MODBUS_HOST}:{MODBUS_PORT}...")
# Connect to Modbus server
if not modbus_client.connect():
logging.error("Cannot connect to Modbus server initially.")
return
logging.info(f"Starting monitoring of {len(MODBUS_HOSTS)} sensors")
logging.info("System status can be monitored at:")
logging.info(f" - Health: http://localhost:8080/health")
logging.info(f" - Sensors: http://localhost:8080/sensors")
logging.info("Successfully connected to Modbus server.")
logging.info(f"Starting data reading cycle every {PUBLISH_INTERVAL} seconds...")
# Main loop to read and publish data from all hosts
while True:
for host_info in MODBUS_HOSTS:
error_message = None
try:
modbus_client = ModbusTcpClient(
host=host_info["ip"],
port=MODBUS_PORT,
timeout=10 # Reduced timeout from 30 to 10 seconds
)
logging.info(f"Processing channel {host_info['location']} at {host_info['ip']}:{MODBUS_PORT}")
success = read_and_publish_data(mqtt_client, modbus_client, host_info)
try:
# Main loop to read and publish data
while True:
success = read_and_publish_data(mqtt_client, modbus_client)
if success:
# Record successful reading
sensor_tracker.record_success(host_info, mqtt_client)
logging.info(f"Successfully processed {host_info['location']} ({host_info['ip']})")
else:
error_message = f"Failed to read/publish data from {host_info['ip']}"
sensor_tracker.record_failure(host_info, error_message, mqtt_client)
logging.warning(f"Failed to process {host_info['location']} ({host_info['ip']}), will retry next cycle.")
if not success:
logging.warning("Error occurred in current cycle, will retry in next cycle.")
except Exception as e:
error_message = f"Exception processing {host_info['ip']}: {str(e)}"
sensor_tracker.record_failure(host_info, error_message, mqtt_client)
logging.error(f"Error processing {host_info['location']} ({host_info['ip']}): {e}", exc_info=True)
finally:
try:
modbus_client.close()
logging.debug(f"Closed connection to {host_info['ip']}")
except:
pass
# Wait for next publish cycle
logging.info(f"Waiting {PUBLISH_INTERVAL} seconds until next publish...")
time.sleep(PUBLISH_INTERVAL)
# Add small delay between processing each sensor
time.sleep(1)
except KeyboardInterrupt:
logging.info("Received stop signal from user.")
# Log system summary every cycle
summary = sensor_tracker.get_summary()
logging.info(f"Cycle completed - Online: {summary['online_sensors']}/{summary['total_sensors']} sensors "
f"({summary['health_percentage']:.1f}% health), "
f"Alerts: {summary['alerted_sensors']}")
logging.info(f"Waiting {PUBLISH_INTERVAL} seconds until next cycle...")
time.sleep(PUBLISH_INTERVAL)
except KeyboardInterrupt:
logging.info("Received stop signal from user, shutting down...")
except Exception as e:
logging.error(f"Unexpected error in main loop: {e}", exc_info=True)
finally:
# Close Modbus connection
modbus_client.close()
logging.info("Successfully closed Modbus connection.")
# Cleanup
try:
health_server.stop()
except:
pass
mqtt_client.loop_stop()
mqtt_client.disconnect()
logging.info("Successfully closed MQTT connection.")
logging.info("Successfully closed all connections.")
if __name__ == "__main__":
main_loop()

232
sensor_tracker.py Normal file
View File

@ -0,0 +1,232 @@
import json
import threading
from datetime import datetime, timezone
from collections import defaultdict
import logging
from config import (
MODBUS_HOSTS, SENSOR_TIMEOUT_THRESHOLD, RECOVERY_CONFIRMATION_COUNT,
ALERTING_ENABLED
)
class SensorTracker:
def __init__(self):
self.sensor_status = {}
self.failure_counts = defaultdict(int)
self.success_counts = defaultdict(int)
self.alerted_sensors = set()
self.lock = threading.Lock()
# Initialize sensor status
for host in MODBUS_HOSTS:
sensor_id = f"{host['ip']}_{host['location']}"
self.sensor_status[sensor_id] = {
"ip": host["ip"],
"location": host["location"],
"type": host["type"],
"status": "unknown",
"last_success": None,
"last_failure": None,
"consecutive_failures": 0,
"consecutive_successes": 0,
"total_failures": 0,
"total_successes": 0,
"uptime_percentage": 0.0
}
def record_success(self, host_info, mqtt_client=None):
"""Record a successful sensor reading"""
sensor_id = f"{host_info['ip']}_{host_info['location']}"
current_time = datetime.now(timezone.utc)
with self.lock:
sensor = self.sensor_status[sensor_id]
sensor["status"] = "online"
sensor["last_success"] = current_time.isoformat()
sensor["consecutive_failures"] = 0
sensor["consecutive_successes"] += 1
sensor["total_successes"] += 1
# Update uptime percentage
total_attempts = sensor["total_successes"] + sensor["total_failures"]
if total_attempts > 0:
sensor["uptime_percentage"] = (sensor["total_successes"] / total_attempts) * 100
# Check for recovery
if (sensor["consecutive_successes"] >= RECOVERY_CONFIRMATION_COUNT and
sensor_id in self.alerted_sensors):
self._send_recovery_alert(sensor_id, sensor, mqtt_client)
self.alerted_sensors.remove(sensor_id)
logging.info(f"Sensor {sensor['location']} ({sensor['ip']}) has recovered")
# Note: Status is now published as part of sensor data, not separately
def record_failure(self, host_info, error_message, mqtt_client=None):
"""Record a failed sensor reading"""
sensor_id = f"{host_info['ip']}_{host_info['location']}"
current_time = datetime.now(timezone.utc)
with self.lock:
sensor = self.sensor_status[sensor_id]
sensor["status"] = "offline"
sensor["last_failure"] = current_time.isoformat()
sensor["last_error"] = error_message
sensor["consecutive_successes"] = 0
sensor["consecutive_failures"] += 1
sensor["total_failures"] += 1
# Update uptime percentage
total_attempts = sensor["total_successes"] + sensor["total_failures"]
if total_attempts > 0:
sensor["uptime_percentage"] = (sensor["total_successes"] / total_attempts) * 100
# Check if we need to send an alert
if (sensor["consecutive_failures"] >= SENSOR_TIMEOUT_THRESHOLD and
sensor_id not in self.alerted_sensors):
self._send_failure_alert(sensor_id, sensor, mqtt_client)
self.alerted_sensors.add(sensor_id)
logging.warning(f"Sensor {sensor['location']} ({sensor['ip']}) is now considered offline")
# Publish offline status using new topic structure
if mqtt_client and ALERTING_ENABLED:
self._publish_offline_status(host_info, sensor, mqtt_client)
def _send_failure_alert(self, sensor_id, sensor, mqtt_client):
"""Send failure alert to MQTT"""
if not mqtt_client or not ALERTING_ENABLED:
return
# Determine sensor type for topic structure
if sensor["type"] == "cwt_co2":
sensor_type = "CO2-gas"
else:
sensor_type = "temperature-humidity"
# Create alert topic using new structure: Location/{location_name}/{sensor_type}/alerts
alert_topic = f"Location/{sensor['location']}/{sensor_type}/alerts"
alert_message = {
"alert_type": "sensor_failure",
"timestamp": datetime.now(timezone.utc).isoformat(),
"sensor_id": sensor_id,
"sensor_ip": sensor["ip"],
"sensor_location": sensor["location"],
"sensor_type": sensor["type"],
"consecutive_failures": sensor["consecutive_failures"],
"last_error": sensor.get("last_error", "Unknown error"),
"severity": "critical"
}
try:
result = mqtt_client.publish(alert_topic, json.dumps(alert_message))
result.wait_for_publish()
logging.info(f"Sent failure alert for sensor {sensor['location']} to '{alert_topic}'")
except Exception as e:
logging.error(f"Failed to send failure alert: {e}")
def _send_recovery_alert(self, sensor_id, sensor, mqtt_client):
"""Send recovery alert to MQTT"""
if not mqtt_client or not ALERTING_ENABLED:
return
# Determine sensor type for topic structure
if sensor["type"] == "cwt_co2":
sensor_type = "CO2-gas"
else:
sensor_type = "temperature-humidity"
# Create alert topic using new structure: Location/{location_name}/{sensor_type}/alerts
alert_topic = f"Location/{sensor['location']}/{sensor_type}/alerts"
alert_message = {
"alert_type": "sensor_recovery",
"timestamp": datetime.now(timezone.utc).isoformat(),
"sensor_id": sensor_id,
"sensor_ip": sensor["ip"],
"sensor_location": sensor["location"],
"sensor_type": sensor["type"],
"consecutive_successes": sensor["consecutive_successes"],
"severity": "info"
}
try:
result = mqtt_client.publish(alert_topic, json.dumps(alert_message))
result.wait_for_publish()
logging.info(f"Sent recovery alert for sensor {sensor['location']} to '{alert_topic}'")
except Exception as e:
logging.error(f"Failed to send recovery alert: {e}")
def _publish_offline_status(self, host_info, sensor, mqtt_client):
"""Publish offline status using new topic structure"""
try:
location = host_info["location"]
# Determine sensor type based on host_info type
if host_info["type"] == "cwt_co2":
sensor_type = "CO2-gas"
else:
sensor_type = "temperature-humidity"
# Create topic for offline status using new structure: Location/{location_name}/{sensor_type}/data
data_topic = f"Location/{location}/{sensor_type}/data"
# Create JSON payload with offline status
payload = {
"timestamp": datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S"),
"location": location,
"sensor_type": sensor_type,
"ip": host_info["ip"],
"status": "offline",
"error": sensor.get("last_error", "Unknown error"),
"consecutive_failures": sensor["consecutive_failures"],
"uptime_percentage": round(sensor["uptime_percentage"], 2)
}
result = mqtt_client.publish(data_topic, json.dumps(payload))
result.wait_for_publish()
logging.info(f"Published offline status to '{data_topic}'")
except Exception as e:
logging.error(f"Failed to publish offline status: {e}")
def get_sensor_status(self, sensor_id):
"""Get status of a specific sensor"""
with self.lock:
return self.sensor_status.get(sensor_id, {}).copy()
def get_all_sensor_status(self):
"""Get status of all sensors"""
with self.lock:
return {
"timestamp": datetime.now(timezone.utc).isoformat(),
"total_sensors": len(self.sensor_status),
"online_sensors": len([s for s in self.sensor_status.values() if s["status"] == "online"]),
"offline_sensors": len([s for s in self.sensor_status.values() if s["status"] == "offline"]),
"sensors": self.sensor_status.copy()
}
def get_summary(self):
"""Get a summary of sensor health"""
with self.lock:
total = len(self.sensor_status)
online = len([s for s in self.sensor_status.values() if s["status"] == "online"])
offline = len([s for s in self.sensor_status.values() if s["status"] == "offline"])
unknown = len([s for s in self.sensor_status.values() if s["status"] == "unknown"])
return {
"total_sensors": total,
"online_sensors": online,
"offline_sensors": offline,
"unknown_sensors": unknown,
"health_percentage": (online / total * 100) if total > 0 else 0,
"alerted_sensors": len(self.alerted_sensors)
}
# Global sensor tracker instance
_sensor_tracker = SensorTracker()
def get_sensor_tracker():
"""Get the global sensor tracker instance"""
return _sensor_tracker
def get_all_sensor_status():
"""Get status of all sensors (convenience function for health check)"""
return _sensor_tracker.get_all_sensor_status()