Files
POE-sensor/sensor_bridge.py

311 lines
14 KiB
Python

import logging
import json
import time
import gc # Add garbage collection
from datetime import datetime, timezone
from pymodbus.client import ModbusTcpClient
from pymodbus.exceptions import ModbusException
import paho.mqtt.client as mqtt
from config import (
MODBUS_HOSTS, MODBUS_PORT, UNIT_ID,
MQTT_BROKER, MQTT_PORT, MQTT_TOPIC, MQTT_CLIENT_ID,
MQTT_USERNAME, MQTT_PASSWORD, PUBLISH_INTERVAL
)
from sensor_tracker import get_sensor_tracker
from health_check import HealthCheckServer
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def on_connect(client, userdata, flags, rc):
"""Callback when MQTT client connects to broker"""
if rc == 0:
logging.info("Connected to MQTT Broker!")
else:
logging.error(f"Cannot connect to MQTT Broker. Return code: {rc}")
def on_publish(client, userdata, mid):
"""Callback when MQTT message is published"""
logging.info(f"Successfully sent message with ID: {mid}")
def read_and_publish_data(mqtt_client, modbus_client, host_info):
"""Read data from Modbus and publish to MQTT"""
try:
# Check and establish Modbus connection with explicit timeout
if not modbus_client.is_socket_open():
logging.info(f"Attempting to connect to {host_info['ip']}:{MODBUS_PORT}")
connection_result = modbus_client.connect()
if not connection_result:
logging.error(f"Failed to connect to Modbus server {host_info['ip']}. Connection returned: {connection_result}")
return False
logging.info(f"Successfully connected to {host_info['ip']}:{MODBUS_PORT}")
# Handle different sensor types
if host_info["type"] == "cwt_co2":
return read_and_publish_cwt_co2(mqtt_client, modbus_client, host_info)
else:
return read_and_publish_temperature_humidity(mqtt_client, modbus_client, host_info)
except ModbusException as e:
logging.error(f"Modbus error from {host_info['ip']}: {e}", exc_info=True)
return False
except Exception as e:
logging.error(f"Unexpected error from {host_info['ip']}: {e}", exc_info=True)
return False
finally:
# Ensure connection is closed after each operation to prevent resource leaks
try:
if modbus_client and modbus_client.is_socket_open():
modbus_client.close()
except Exception as e:
logging.warning(f"Error closing modbus connection for {host_info['ip']}: {e}")
def read_and_publish_temperature_humidity(mqtt_client, modbus_client, host_info):
"""Read temperature and humidity sensors"""
try:
# Read temperature (register 0)
result_temp = modbus_client.read_holding_registers(address=0, count=1, slave=UNIT_ID)
if not hasattr(result_temp, 'registers') or not result_temp.registers:
logging.error(f"Error reading temperature from {host_info['ip']}: {result_temp}")
return False
raw_temp = result_temp.registers[0]
temperature = (125 - (-40)) * raw_temp / 1650 - 40 # Correct formula: -40°C to 125°C
logging.info(f"Raw temperature from {host_info['ip']}: {raw_temp}, Corrected: {temperature:.1f}°C")
# Read humidity (register 1)
result_hum = modbus_client.read_holding_registers(address=1, count=1, slave=UNIT_ID)
if not hasattr(result_hum, 'registers') or not result_hum.registers:
logging.error(f"Error reading humidity from {host_info['ip']}: {result_hum}")
return False
raw_hum = result_hum.registers[0]
humidity = raw_hum * 100 / 1000 # Correct formula: 0% to 100% RH
logging.info(f"Raw humidity from {host_info['ip']}: {raw_hum}, Corrected: {humidity:.1f}%RH")
# Prepare new topic structure: Location/{location_name}/{type}/...
location = host_info["location"]
sensor_type = "temperature-humidity"
current_time = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
# Create base topic path
base_topic = f"Location/{location}/{sensor_type}"
# Publish to individual topics
topics_data = [
(f"{base_topic}/Time", current_time),
(f"{base_topic}/Status", "online"),
(f"{base_topic}/Data/temperature", round(temperature, 1)),
(f"{base_topic}/Data/humidity", round(humidity, 1))
]
# Publish all data
all_published = True
for topic, value in topics_data:
try:
# Convert value to string for MQTT
payload = str(value)
result = mqtt_client.publish(topic, payload)
result.wait_for_publish()
if result.is_published():
logging.info(f"Published to '{topic}': {payload}")
else:
logging.error(f"Failed to publish to '{topic}': {payload}")
all_published = False
except Exception as e:
logging.error(f"Error publishing to '{topic}': {e}")
all_published = False
if all_published:
logging.info(f"Successfully published all temperature-humidity data for {location}")
return True
else:
logging.error(f"Failed to publish some temperature-humidity data from {host_info['ip']}")
return False
except ModbusException as e:
logging.error(f"Modbus error from {host_info['ip']}: {e}", exc_info=True)
return False
except Exception as e:
logging.error(f"Unexpected error from {host_info['ip']}: {e}", exc_info=True)
return False
def read_and_publish_cwt_co2(mqtt_client, modbus_client, host_info):
"""Read CWT CO2 sensor (humidity, temperature, CO2)"""
try:
# Read all 3 registers at once (registers 0, 1, 2)
# According to CWT manual: register 0=humidity, 1=temperature, 2=CO2
result = modbus_client.read_holding_registers(address=0, count=3, slave=UNIT_ID)
if not hasattr(result, 'registers') or len(result.registers) != 3:
logging.error(f"Error reading CWT registers from {host_info['ip']}: {result}")
return False
raw_humidity = result.registers[0] # Register 0: Humidity (0.1%RH)
raw_temperature = result.registers[1] # Register 1: Temperature (0.1°C)
raw_co2 = result.registers[2] # Register 2: CO2 (1ppm)
logging.info(f"Raw CWT values from {host_info['ip']} - Humidity: {raw_humidity}, Temperature: {raw_temperature}, CO2: {raw_co2}")
# Process values according to CWT manual
# Humidity: 0.1%RH resolution
humidity = raw_humidity / 10.0
# Temperature: 0.1°C resolution, handle negative values (2's complement)
if raw_temperature > 32767: # Negative temperature in 2's complement
temperature = (raw_temperature - 65536) / 10.0
else:
temperature = raw_temperature / 10.0
# CO2: 1ppm resolution for standard sensor
co2_ppm = raw_co2
logging.info(f"Processed CWT values from {host_info['ip']} - Humidity: {humidity:.1f}%RH, Temperature: {temperature:.1f}°C, CO2: {co2_ppm}ppm")
# Prepare new topic structure: Location/{location_name}/{type}/...
location = host_info["location"]
sensor_type = "CO2-gas"
current_time = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
# Create base topic path
base_topic = f"Location/{location}/{sensor_type}"
# Publish to individual topics
topics_data = [
(f"{base_topic}/Time", current_time),
(f"{base_topic}/Status", "online"),
(f"{base_topic}/Data/co2", co2_ppm),
(f"{base_topic}/Data/temperature", round(temperature, 1)),
(f"{base_topic}/Data/humidity", round(humidity, 1))
]
# Publish all data
all_published = True
for topic, value in topics_data:
try:
# Convert value to string for MQTT
payload = str(value)
result = mqtt_client.publish(topic, payload)
result.wait_for_publish()
if result.is_published():
logging.info(f"Published to '{topic}': {payload}")
else:
logging.error(f"Failed to publish to '{topic}': {payload}")
all_published = False
except Exception as e:
logging.error(f"Error publishing to '{topic}': {e}")
all_published = False
if all_published:
logging.info(f"Successfully published all CO2-gas data for {location}")
return True
else:
logging.error(f"Failed to publish some CO2-gas data from {host_info['ip']}")
return False
except ModbusException as e:
logging.error(f"Modbus error from CWT sensor {host_info['ip']}: {e}", exc_info=True)
return False
except Exception as e:
logging.error(f"Unexpected error from CWT sensor {host_info['ip']}: {e}", exc_info=True)
return False
def main_loop():
"""Main function to connect and publish data in cycles"""
# Initialize sensor tracker and health check server
sensor_tracker = get_sensor_tracker()
health_server = HealthCheckServer()
# Initialize MQTT client
mqtt_client = mqtt.Client(client_id=MQTT_CLIENT_ID)
mqtt_client.on_connect = on_connect
mqtt_client.on_publish = on_publish
if MQTT_USERNAME and MQTT_PASSWORD:
mqtt_client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
try:
# Start health check server
health_server.start()
# Connect to MQTT broker
logging.info(f"Connecting to MQTT broker {MQTT_BROKER}:{MQTT_PORT}...")
mqtt_client.connect(MQTT_BROKER, MQTT_PORT, 60)
mqtt_client.loop_start()
logging.info(f"Starting monitoring of {len(MODBUS_HOSTS)} sensors")
logging.info("System status can be monitored at:")
logging.info(f" - Health: http://0.0.0.0:8080/health")
logging.info(f" - Sensors: http://0.0.0.0:8080/sensors")
# Main loop to read and publish data from all hosts
while True:
for host_info in MODBUS_HOSTS:
modbus_client = None
error_message = None
try:
modbus_client = ModbusTcpClient(
host=host_info["ip"],
port=MODBUS_PORT,
timeout=10 # Reduced timeout from 30 to 10 seconds
)
logging.info(f"Processing channel {host_info['location']} at {host_info['ip']}:{MODBUS_PORT}")
success = read_and_publish_data(mqtt_client, modbus_client, host_info)
if success:
# Record successful reading
sensor_tracker.record_success(host_info, mqtt_client)
logging.info(f"Successfully processed {host_info['location']} ({host_info['ip']})")
else:
error_message = f"Failed to read/publish data from {host_info['ip']}"
sensor_tracker.record_failure(host_info, error_message, mqtt_client)
logging.warning(f"Failed to process {host_info['location']} ({host_info['ip']}), will retry next cycle.")
except Exception as e:
error_message = f"Exception processing {host_info['ip']}: {str(e)}"
sensor_tracker.record_failure(host_info, error_message, mqtt_client)
logging.error(f"Error processing {host_info['location']} ({host_info['ip']}): {e}", exc_info=True)
finally:
# Ensure modbus connection is always closed
if modbus_client:
try:
modbus_client.close()
logging.debug(f"Closed connection to {host_info['ip']}")
except Exception as close_error:
logging.warning(f"Error closing connection to {host_info['ip']}: {close_error}")
finally:
modbus_client = None # Explicit cleanup
# Add small delay between processing each sensor
time.sleep(1)
# Log system summary every cycle
summary = sensor_tracker.get_summary()
logging.info(f"Cycle completed - Online: {summary['online_sensors']}/{summary['total_sensors']} sensors "
f"({summary['health_percentage']:.1f}% health), "
f"Alerts: {summary['alerted_sensors']}")
# Force garbage collection every cycle to prevent memory buildup
gc.collect()
logging.info(f"Waiting {PUBLISH_INTERVAL} seconds until next cycle...")
time.sleep(PUBLISH_INTERVAL)
except KeyboardInterrupt:
logging.info("Received stop signal from user, shutting down...")
except Exception as e:
logging.error(f"Unexpected error in main loop: {e}", exc_info=True)
finally:
# Cleanup
try:
health_server.stop()
except:
pass
mqtt_client.loop_stop()
mqtt_client.disconnect()
logging.info("Successfully closed all connections.")
if __name__ == "__main__":
main_loop()