Files
POE-sensor/sensor_bridge.py

261 lines
10 KiB
Python

import logging
import json
import time
import gc
from datetime import datetime, timezone
from pymodbus.client import ModbusTcpClient
from pymodbus.exceptions import ModbusException
import paho.mqtt.client as mqtt
from config import (
MODBUS_HOSTS, MODBUS_PORT, UNIT_ID,
MQTT_BROKER, MQTT_PORT, MQTT_TOPIC, MQTT_CLIENT_ID,
MQTT_USERNAME, MQTT_PASSWORD, PUBLISH_INTERVAL
)
def on_connect(client, userdata, flags, rc):
"""Callback when MQTT client connects to broker"""
if rc == 0:
logging.info("Connected to MQTT Broker!")
else:
logging.error(f"Cannot connect to MQTT Broker. Return code: {rc}")
def on_publish(client, userdata, mid):
"""Callback when MQTT message is published"""
logging.debug(f"Successfully sent message with ID: {mid}")
def read_and_publish_data(mqtt_client, modbus_client, host_info):
"""Read data from Modbus and publish to MQTT"""
try:
# Check and establish Modbus connection
if not modbus_client.is_socket_open():
logging.info(f"Attempting to connect to {host_info['ip']}:{MODBUS_PORT}")
connection_result = modbus_client.connect()
if not connection_result:
logging.error(f"Failed to connect to Modbus server {host_info['ip']}")
return False
logging.info(f"Successfully connected to {host_info['ip']}:{MODBUS_PORT}")
# Handle different sensor types
if host_info["type"] == "cwt_co2":
return read_and_publish_cwt_co2(mqtt_client, modbus_client, host_info)
else:
return read_and_publish_temperature_humidity(mqtt_client, modbus_client, host_info)
except ModbusException as e:
logging.error(f"Modbus error from {host_info['ip']}: {e}")
return False
except Exception as e:
logging.error(f"Unexpected error from {host_info['ip']}: {e}")
return False
finally:
# Ensure connection is closed
try:
if modbus_client and modbus_client.is_socket_open():
modbus_client.close()
except Exception as e:
logging.warning(f"Error closing modbus connection for {host_info['ip']}: {e}")
def read_and_publish_temperature_humidity(mqtt_client, modbus_client, host_info):
"""Read temperature and humidity sensors"""
try:
# Read temperature (register 0)
result_temp = modbus_client.read_holding_registers(address=0, count=1, slave=UNIT_ID)
if not hasattr(result_temp, 'registers') or not result_temp.registers:
logging.error(f"Error reading temperature from {host_info['ip']}: {result_temp}")
return False
raw_temp = result_temp.registers[0]
temperature = (125 - (-40)) * raw_temp / 1650 - 40
logging.info(f"Temperature from {host_info['ip']}: {temperature:.1f}°C")
# Read humidity (register 1)
result_hum = modbus_client.read_holding_registers(address=1, count=1, slave=UNIT_ID)
if not hasattr(result_hum, 'registers') or not result_hum.registers:
logging.error(f"Error reading humidity from {host_info['ip']}: {result_hum}")
return False
raw_hum = result_hum.registers[0]
humidity = raw_hum * 100 / 1000
logging.info(f"Humidity from {host_info['ip']}: {humidity:.1f}%RH")
# Publish data
location = host_info["location"]
sensor_type = "temperature-humidity"
current_time = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
base_topic = f"Location/{location}/{sensor_type}"
topics_data = [
(f"{base_topic}/Time", current_time),
(f"{base_topic}/Status", "online"),
(f"{base_topic}/Temperature", round(temperature, 1)),
(f"{base_topic}/Humidity", round(humidity, 1))
]
all_published = True
for topic, value in topics_data:
try:
payload = str(value)
result = mqtt_client.publish(topic, payload)
result.wait_for_publish()
if result.is_published():
logging.debug(f"Published to '{topic}': {payload}")
else:
logging.error(f"Failed to publish to '{topic}': {payload}")
all_published = False
except Exception as e:
logging.error(f"Error publishing to '{topic}': {e}")
all_published = False
return all_published
except Exception as e:
logging.error(f"Error in temperature_humidity reading: {e}")
return False
def read_and_publish_cwt_co2(mqtt_client, modbus_client, host_info):
"""Read CWT CO2 sensor"""
try:
# Read all 3 registers
result = modbus_client.read_holding_registers(address=0, count=3, slave=UNIT_ID)
if not hasattr(result, 'registers') or len(result.registers) != 3:
logging.error(f"Error reading CWT registers from {host_info['ip']}: {result}")
return False
raw_humidity = result.registers[0]
raw_temperature = result.registers[1]
raw_co2 = result.registers[2]
# Process values
humidity = raw_humidity / 10.0
if raw_temperature > 32767:
temperature = (raw_temperature - 65536) / 10.0
else:
temperature = raw_temperature / 10.0
co2_ppm = raw_co2
logging.info(f"CWT from {host_info['ip']} - Temp: {temperature:.1f}°C, Humidity: {humidity:.1f}%RH, CO2: {co2_ppm}ppm")
# Publish data
location = host_info["location"]
sensor_type = "CO2-gas"
current_time = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
base_topic = f"Location/{location}/{sensor_type}"
topics_data = [
(f"{base_topic}/Time", current_time),
(f"{base_topic}/Status", "online"),
(f"{base_topic}/Temperature", round(temperature, 1)),
(f"{base_topic}/Humidity", round(humidity, 1)),
(f"{base_topic}/CO2", co2_ppm)
]
all_published = True
for topic, value in topics_data:
try:
payload = str(value)
result = mqtt_client.publish(topic, payload)
result.wait_for_publish()
if result.is_published():
logging.debug(f"Published to '{topic}': {payload}")
else:
logging.error(f"Failed to publish to '{topic}': {payload}")
all_published = False
except Exception as e:
logging.error(f"Error publishing to '{topic}': {e}")
all_published = False
return all_published
except Exception as e:
logging.error(f"Error in CWT CO2 reading: {e}")
return False
def main_loop():
"""Main function to connect and publish data in cycles"""
# Import here to avoid circular import
from sensor_tracker import get_sensor_tracker
# Initialize components
sensor_tracker = get_sensor_tracker()
# Initialize MQTT client
mqtt_client = mqtt.Client(client_id=MQTT_CLIENT_ID)
mqtt_client.on_connect = on_connect
mqtt_client.on_publish = on_publish
if MQTT_USERNAME and MQTT_PASSWORD:
mqtt_client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
try:
# Connect to MQTT broker
logging.info(f"Connecting to MQTT broker {MQTT_BROKER}:{MQTT_PORT}...")
mqtt_client.connect(MQTT_BROKER, MQTT_PORT, 60)
mqtt_client.loop_start()
logging.info(f"Starting monitoring of {len(MODBUS_HOSTS)} sensors")
logging.info("Health check available at: http://0.0.0.0:8080/health")
# Main loop
while True:
for host_info in MODBUS_HOSTS:
modbus_client = None
try:
modbus_client = ModbusTcpClient(
host=host_info["ip"],
port=MODBUS_PORT,
timeout=10
)
logging.info(f"Processing {host_info['location']} at {host_info['ip']}")
success = read_and_publish_data(mqtt_client, modbus_client, host_info)
if success:
sensor_tracker.record_success(host_info, mqtt_client)
logging.info(f"Successfully processed {host_info['location']}")
else:
error_message = f"Failed to read/publish data from {host_info['ip']}"
sensor_tracker.record_failure(host_info, error_message, mqtt_client)
logging.warning(f"Failed to process {host_info['location']}")
except Exception as e:
error_message = f"Exception processing {host_info['ip']}: {str(e)}"
sensor_tracker.record_failure(host_info, error_message, mqtt_client)
logging.error(f"Error processing {host_info['location']}: {e}")
finally:
if modbus_client:
try:
modbus_client.close()
except Exception as close_error:
logging.warning(f"Error closing connection: {close_error}")
time.sleep(1)
# Log summary
summary = sensor_tracker.get_summary()
logging.info(f"Cycle completed - Online: {summary['online_sensors']}/{summary['total_sensors']} sensors")
gc.collect()
logging.info(f"Waiting {PUBLISH_INTERVAL} seconds...")
time.sleep(PUBLISH_INTERVAL)
except KeyboardInterrupt:
logging.info("Received stop signal, shutting down...")
except Exception as e:
logging.error(f"Unexpected error in main loop: {e}", exc_info=True)
finally:
# Cleanup
try:
mqtt_client.loop_stop()
mqtt_client.disconnect()
except:
pass
logging.info("Sensor bridge shutdown complete")
if __name__ == "__main__":
main_loop()