261 lines
10 KiB
Python
261 lines
10 KiB
Python
import logging
|
|
import json
|
|
import time
|
|
import gc
|
|
from datetime import datetime, timezone
|
|
from pymodbus.client import ModbusTcpClient
|
|
from pymodbus.exceptions import ModbusException
|
|
import paho.mqtt.client as mqtt
|
|
|
|
from config import (
|
|
MODBUS_HOSTS, MODBUS_PORT, UNIT_ID,
|
|
MQTT_BROKER, MQTT_PORT, MQTT_TOPIC, MQTT_CLIENT_ID,
|
|
MQTT_USERNAME, MQTT_PASSWORD, PUBLISH_INTERVAL, LOCAL_TIMEZONE
|
|
)
|
|
|
|
def on_connect(client, userdata, flags, rc):
|
|
"""Callback when MQTT client connects to broker"""
|
|
if rc == 0:
|
|
logging.info("Connected to MQTT Broker!")
|
|
else:
|
|
logging.error(f"Cannot connect to MQTT Broker. Return code: {rc}")
|
|
|
|
def on_publish(client, userdata, mid):
|
|
"""Callback when MQTT message is published"""
|
|
logging.debug(f"Successfully sent message with ID: {mid}")
|
|
|
|
def read_and_publish_data(mqtt_client, modbus_client, host_info):
|
|
"""Read data from Modbus and publish to MQTT"""
|
|
try:
|
|
# Check and establish Modbus connection
|
|
if not modbus_client.is_socket_open():
|
|
logging.info(f"Attempting to connect to {host_info['ip']}:{MODBUS_PORT}")
|
|
connection_result = modbus_client.connect()
|
|
if not connection_result:
|
|
logging.error(f"Failed to connect to Modbus server {host_info['ip']}")
|
|
return False
|
|
logging.info(f"Successfully connected to {host_info['ip']}:{MODBUS_PORT}")
|
|
|
|
# Handle different sensor types
|
|
if host_info["type"] == "cwt_co2":
|
|
return read_and_publish_cwt_co2(mqtt_client, modbus_client, host_info)
|
|
else:
|
|
return read_and_publish_temperature_humidity(mqtt_client, modbus_client, host_info)
|
|
|
|
except ModbusException as e:
|
|
logging.error(f"Modbus error from {host_info['ip']}: {e}")
|
|
return False
|
|
except Exception as e:
|
|
logging.error(f"Unexpected error from {host_info['ip']}: {e}")
|
|
return False
|
|
finally:
|
|
# Ensure connection is closed
|
|
try:
|
|
if modbus_client and modbus_client.is_socket_open():
|
|
modbus_client.close()
|
|
except Exception as e:
|
|
logging.warning(f"Error closing modbus connection for {host_info['ip']}: {e}")
|
|
|
|
def read_and_publish_temperature_humidity(mqtt_client, modbus_client, host_info):
|
|
"""Read temperature and humidity sensors"""
|
|
try:
|
|
# Read temperature (register 0)
|
|
result_temp = modbus_client.read_holding_registers(address=0, count=1, slave=UNIT_ID)
|
|
if not hasattr(result_temp, 'registers') or not result_temp.registers:
|
|
logging.error(f"Error reading temperature from {host_info['ip']}: {result_temp}")
|
|
return False
|
|
raw_temp = result_temp.registers[0]
|
|
temperature = (125 - (-40)) * raw_temp / 1650 - 40
|
|
logging.info(f"Temperature from {host_info['ip']}: {temperature:.1f}°C")
|
|
|
|
# Read humidity (register 1)
|
|
result_hum = modbus_client.read_holding_registers(address=1, count=1, slave=UNIT_ID)
|
|
if not hasattr(result_hum, 'registers') or not result_hum.registers:
|
|
logging.error(f"Error reading humidity from {host_info['ip']}: {result_hum}")
|
|
return False
|
|
raw_hum = result_hum.registers[0]
|
|
humidity = raw_hum * 100 / 1000
|
|
logging.info(f"Humidity from {host_info['ip']}: {humidity:.1f}%RH")
|
|
|
|
# Publish data
|
|
location = host_info["location"]
|
|
sensor_type = "temperature-humidity"
|
|
current_time = datetime.now(LOCAL_TIMEZONE).strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
base_topic = f"Location/{location}/{sensor_type}"
|
|
|
|
topics_data = [
|
|
(f"{base_topic}/Time", current_time),
|
|
(f"{base_topic}/Status", "online"),
|
|
(f"{base_topic}/Temperature", round(temperature, 1)),
|
|
(f"{base_topic}/Humidity", round(humidity, 1))
|
|
]
|
|
|
|
all_published = True
|
|
for topic, value in topics_data:
|
|
try:
|
|
payload = str(value)
|
|
result = mqtt_client.publish(topic, payload)
|
|
result.wait_for_publish()
|
|
|
|
if result.is_published():
|
|
logging.debug(f"Published to '{topic}': {payload}")
|
|
else:
|
|
logging.error(f"Failed to publish to '{topic}': {payload}")
|
|
all_published = False
|
|
except Exception as e:
|
|
logging.error(f"Error publishing to '{topic}': {e}")
|
|
all_published = False
|
|
|
|
return all_published
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error in temperature_humidity reading: {e}")
|
|
return False
|
|
|
|
def read_and_publish_cwt_co2(mqtt_client, modbus_client, host_info):
|
|
"""Read CWT CO2 sensor"""
|
|
try:
|
|
# Read all 3 registers
|
|
result = modbus_client.read_holding_registers(address=0, count=3, slave=UNIT_ID)
|
|
|
|
if not hasattr(result, 'registers') or len(result.registers) != 3:
|
|
logging.error(f"Error reading CWT registers from {host_info['ip']}: {result}")
|
|
return False
|
|
|
|
raw_humidity = result.registers[0]
|
|
raw_temperature = result.registers[1]
|
|
raw_co2 = result.registers[2]
|
|
|
|
# Process values
|
|
humidity = raw_humidity / 10.0
|
|
|
|
if raw_temperature > 32767:
|
|
temperature = (raw_temperature - 65536) / 10.0
|
|
else:
|
|
temperature = raw_temperature / 10.0
|
|
|
|
co2_ppm = raw_co2
|
|
|
|
logging.info(f"CWT from {host_info['ip']} - Temp: {temperature:.1f}°C, Humidity: {humidity:.1f}%RH, CO2: {co2_ppm}ppm")
|
|
|
|
# Publish data
|
|
location = host_info["location"]
|
|
sensor_type = "CO2-gas"
|
|
current_time = datetime.now(LOCAL_TIMEZONE).strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
base_topic = f"Location/{location}/{sensor_type}"
|
|
|
|
topics_data = [
|
|
(f"{base_topic}/Time", current_time),
|
|
(f"{base_topic}/Status", "online"),
|
|
(f"{base_topic}/Temperature", round(temperature, 1)),
|
|
(f"{base_topic}/Humidity", round(humidity, 1)),
|
|
(f"{base_topic}/CO2", co2_ppm)
|
|
]
|
|
|
|
all_published = True
|
|
for topic, value in topics_data:
|
|
try:
|
|
payload = str(value)
|
|
result = mqtt_client.publish(topic, payload)
|
|
result.wait_for_publish()
|
|
|
|
if result.is_published():
|
|
logging.debug(f"Published to '{topic}': {payload}")
|
|
else:
|
|
logging.error(f"Failed to publish to '{topic}': {payload}")
|
|
all_published = False
|
|
except Exception as e:
|
|
logging.error(f"Error publishing to '{topic}': {e}")
|
|
all_published = False
|
|
|
|
return all_published
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error in CWT CO2 reading: {e}")
|
|
return False
|
|
|
|
def main_loop():
|
|
"""Main function to connect and publish data in cycles"""
|
|
# Import here to avoid circular import
|
|
from sensor_tracker import get_sensor_tracker
|
|
|
|
# Initialize components
|
|
sensor_tracker = get_sensor_tracker()
|
|
|
|
# Initialize MQTT client
|
|
mqtt_client = mqtt.Client(client_id=MQTT_CLIENT_ID)
|
|
mqtt_client.on_connect = on_connect
|
|
mqtt_client.on_publish = on_publish
|
|
|
|
if MQTT_USERNAME and MQTT_PASSWORD:
|
|
mqtt_client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
|
|
|
|
try:
|
|
# Connect to MQTT broker
|
|
logging.info(f"Connecting to MQTT broker {MQTT_BROKER}:{MQTT_PORT}...")
|
|
mqtt_client.connect(MQTT_BROKER, MQTT_PORT, 60)
|
|
mqtt_client.loop_start()
|
|
|
|
logging.info(f"Starting monitoring of {len(MODBUS_HOSTS)} sensors")
|
|
logging.info("Health check available at: http://0.0.0.0:8080/health")
|
|
|
|
# Main loop
|
|
while True:
|
|
for host_info in MODBUS_HOSTS:
|
|
modbus_client = None
|
|
try:
|
|
modbus_client = ModbusTcpClient(
|
|
host=host_info["ip"],
|
|
port=MODBUS_PORT,
|
|
timeout=10
|
|
)
|
|
logging.info(f"Processing {host_info['location']} at {host_info['ip']}")
|
|
success = read_and_publish_data(mqtt_client, modbus_client, host_info)
|
|
|
|
if success:
|
|
sensor_tracker.record_success(host_info, mqtt_client)
|
|
logging.info(f"Successfully processed {host_info['location']}")
|
|
else:
|
|
error_message = f"Failed to read/publish data from {host_info['ip']}"
|
|
sensor_tracker.record_failure(host_info, error_message, mqtt_client)
|
|
logging.warning(f"Failed to process {host_info['location']}")
|
|
|
|
except Exception as e:
|
|
error_message = f"Exception processing {host_info['ip']}: {str(e)}"
|
|
sensor_tracker.record_failure(host_info, error_message, mqtt_client)
|
|
logging.error(f"Error processing {host_info['location']}: {e}")
|
|
finally:
|
|
if modbus_client:
|
|
try:
|
|
modbus_client.close()
|
|
except Exception as close_error:
|
|
logging.warning(f"Error closing connection: {close_error}")
|
|
|
|
time.sleep(1)
|
|
|
|
# Log summary
|
|
summary = sensor_tracker.get_summary()
|
|
logging.info(f"Cycle completed - Online: {summary['online_sensors']}/{summary['total_sensors']} sensors")
|
|
|
|
gc.collect()
|
|
logging.info(f"Waiting {PUBLISH_INTERVAL} seconds...")
|
|
time.sleep(PUBLISH_INTERVAL)
|
|
|
|
except KeyboardInterrupt:
|
|
logging.info("Received stop signal, shutting down...")
|
|
except Exception as e:
|
|
logging.error(f"Unexpected error in main loop: {e}", exc_info=True)
|
|
finally:
|
|
# Cleanup
|
|
try:
|
|
mqtt_client.loop_stop()
|
|
mqtt_client.disconnect()
|
|
except:
|
|
pass
|
|
logging.info("Sensor bridge shutdown complete")
|
|
|
|
if __name__ == "__main__":
|
|
main_loop()
|