From 81ab6191dad10e74bed93cf11db48b2112e44f3d Mon Sep 17 00:00:00 2001 From: Naab2k3 Date: Mon, 23 Jun 2025 09:14:40 +0700 Subject: [PATCH] Enhance POE project with health check server, sensor tracking, and dynamic MQTT topic structure. Updated configuration for multiple Modbus hosts and added alerting system for sensor failures and recoveries. Improved logging and error handling throughout the application. --- README.md | 51 +++- __pycache__/config.cpython-312.pyc | Bin 593 -> 0 bytes __pycache__/sensor_bridge.cpython-312.pyc | Bin 7145 -> 0 bytes config.py | 70 ++++- health_check.py | 77 ++++++ memory-bank/activeContext.md | 17 -- memory-bank/productContext.md | 20 -- memory-bank/progress.md | 23 -- memory-bank/projectbrief.md | 25 -- memory-bank/systemPatterns.md | 21 -- memory-bank/techContext.md | 21 -- poe-sensor.nomad | 109 ++++++++ requirements.txt | 3 +- sensor_bridge.py | 305 ++++++++++++++++------ sensor_tracker.py | 232 ++++++++++++++++ 15 files changed, 752 insertions(+), 222 deletions(-) delete mode 100644 __pycache__/config.cpython-312.pyc delete mode 100644 __pycache__/sensor_bridge.cpython-312.pyc create mode 100644 health_check.py delete mode 100644 memory-bank/activeContext.md delete mode 100644 memory-bank/productContext.md delete mode 100644 memory-bank/progress.md delete mode 100644 memory-bank/projectbrief.md delete mode 100644 memory-bank/systemPatterns.md delete mode 100644 memory-bank/techContext.md create mode 100644 poe-sensor.nomad create mode 100644 sensor_tracker.py diff --git a/README.md b/README.md index 99b227f..d52129b 100644 --- a/README.md +++ b/README.md @@ -46,6 +46,10 @@ python main.py - **Data Processing**: Converts raw sensor values to calibrated readings - **Error Handling**: Robust error handling and retry mechanisms - **Logging**: Comprehensive logging for monitoring and debugging +- **Health Check API**: HTTP endpoints for system health monitoring +- **Sensor Status Tracking**: Real-time tracking of individual sensor health +- **Alerting System**: Automatic alerts for sensor failures and recovery +- **Recovery Detection**: Detects when failed sensors come back online ## 📈 Data Format @@ -77,6 +81,51 @@ All configuration is centralized in `config.py`: - **Register 0**: Temperature (raw value × 0.1 - 40) - **Register 1**: Humidity (raw value × 0.1) +## 🏥 Health Check & Monitoring + +### Health Check Endpoints + +The service provides HTTP endpoints for monitoring: + +- **http://localhost:8080/health** - Basic service health check +- **http://localhost:8080/sensors** - Detailed sensor status information + +### Alerting System + +The system automatically sends MQTT alerts for: + +- **Sensor Failures**: When a sensor fails 3 consecutive times +- **Sensor Recovery**: When a failed sensor comes back online + +Alert topics: +- `sensor-alerts` - Failure and recovery alerts +- `sensor-status/{location}/status` - Individual sensor status updates + +### Demo Monitoring Script + +Use the demo script to test monitoring features: + +```bash +# Check health endpoint +python demo_monitoring.py health + +# Check sensors status +python demo_monitoring.py sensors + +# Monitor system for 5 minutes +python demo_monitoring.py monitor 300 +``` + +### Configuration Options + +| Parameter | Description | Default | +|-----------|-------------|---------| +| `HEALTH_CHECK_ENABLED` | Enable/disable health check server | True | +| `HEALTH_CHECK_PORT` | HTTP server port | 8080 | +| `ALERTING_ENABLED` | Enable/disable MQTT alerts | True | +| `SENSOR_TIMEOUT_THRESHOLD` | Failures before alert | 3 | +| `RECOVERY_CONFIRMATION_COUNT` | Successes to confirm recovery | 2 | + ## 🛠️ Maintenance ### Running as a Service @@ -118,4 +167,4 @@ The service provides comprehensive logging. Monitor the logs for: ## 📝 License -This project is licensed under the MIT License. \ No newline at end of file +This project is licensed under the MIT License. diff --git a/__pycache__/config.cpython-312.pyc b/__pycache__/config.cpython-312.pyc deleted file mode 100644 index 48167fbc93de5bee3171f3fe931d21046df57e37..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 593 zcmYjNO>fgc5MAfXi5-VTjS52Q1*D)JVo^1y2_Qs1EX8VSyS786B{VA84Kc(?u^lAm znKz@IuY$k}+ z09(3Z?Uc{nqnD7d!8)0TylS zj;!Y_0HkPQ`xrY%-ps-M_RO}B<@xh9dx~Dp+;S)?55^Xrcx94w@c77|^9J^x?$!pS z{mLOpsly>1xC+m)DUgI`*_iNqYOB#T`fb%P{}D~qO~Q5++3d?Ln%y_eenVGxC7om< z5;brUG*wM*lDIYTNG?h=JF=uiLLuT^L(-M{o<#mqt!@|xs@@{$j@qo7vZ|20)@^iT zqurMkQ_`Q+JE!0xywK(C*2CVx)Opi0vFr6TRq73WBkOqMO(wW zxi6?Yk24JO6Y%FLaAPCjm$?FU93QY@;ts{xe2`fRcLQ#@R{G?g)J`g&Yp1ovqwn{2 j&Pvs@LM_NG^EU%wncoNmD#{jvG!4b|Zy@ii(+z(B=S-sr diff --git a/__pycache__/sensor_bridge.cpython-312.pyc b/__pycache__/sensor_bridge.cpython-312.pyc deleted file mode 100644 index fa564610d50269bb52dd2dd06ad5abadcac89dbf..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 7145 zcmdT}U2GfImA=E@AvqFBQ6%-Ru`Su6EYp^qttEf!b>SfKHfoK1uELmyN`4|-TEyudza7iq1cFQh%LgsB2#v3c=B0o8Glz35Dm#L7Og=yVdoM!&M=Dw}l?q=gP*_DA;-~}) z7jr61sdYox0J$MyOd7*Riu4&1rldJ+R?DV@C20*?)iRZ!leUkrrnZ<9jwBOi zl2zfVWOcYY=?puQuCOcV4!e^z;hLl;>`B&!Ym?rvH|Y!eC}co9S_o?D={dBX(Q8_X zv#eV`GZbqekNzhVuIFs-F*BnaJ<}v|gK;iS$iP zj0{dGHnpM>*wy09#N=?lQmvNzM~4T;)iG6S{rdFa)OhdMpi-<(_D)aVn3x(+=+TM( z-kITvam6)xy>E1QdMGkHJ~KG=gWl0>puHn1#?(28>?yXrBf-DVCsI+EQ?Z`pMKL-D zwSYk}tDTBvR=~-q6w?-*n1hs!@cViJKC38a-o z302Tm@-7YFj5*^c=+k{80;X|H*k^&9v{he^+!`FL1JS? zidXuB>=Z9$a0 zof8!Uj)_EH7ShWj1EE_t1pMx;8D5lbO->Bn68V%U;K*$p=jQledP%VhsYpo~+yvu% z@Dn@Xa~JJcZ7bJ)vbak-f69EwjgVch!Hv^j47#<*0@7euBPI`mB_T$DzN{D6jVWB(pYV!J?J) zoTDmsqXBr?^ggY`899@rUV@}$-TYZe-twQLH~krUqg18wT&UJcdaliB!L7GxCC;Mh zDbnsR9MJZVWF{{@jwgp9^78C5S^n-7P3`na*$m-Ih#7W+8q5IyXbSY z?VF?bubQLpJGY`iA_qxL3oTkn&%t{B^*YU#%l1Pwug}@{UllpDa75>(m87FuDQ7S9 zcH|rj?OMB@YkQkQ(0@X2*GgJHZaGA^23;yxwXt&NEHzb(J!o$<>$_~pa12S{t%|7) z;kISwkkKtUhFFDbT5r{6OQD;HW0#o^m?c9oU!)e%9pg>3NCl2$15Cxlp(KoZ4<`MJ0#@mQSu*N;j1&)1(3OJvB;vl(*r{m(`}+kvBtA@(H|$ol&Qj7zS!l0HLjMj%xj6>PGN zgT$S(7>ofv_7w^Cp$s^YxU>Y)?Cn!SB`FF$Uuw_R?H{hShuCMtZvg1B#8Vvjt*m>p z=y-?_Vszkme4lqK865zFx+Z`cBq1G-u_shyc7}Ad`huuUSD;f^!yOVYfXU1y6_U>D+bsSF7$-8bsGPKije9iN5YGq=lA+Xk0Xb7&1>@uF8j(#|rZ|qu&Jew7 z4Q++;*ME?&zacYkmHW?bTzDwv&-QPg%hz9(nW5dIC)OMr*Pa|bzcTPq^JAvRwogUZMLxmK`TI?AvDs9-kT5@v+;!&VsLVt-IhmwQ_aG z-}1n7Z+T^?B<+6a!Tg%|;GK=o!?}kyH>3IbA(?iK*KB$3S(!e!<8In^w-wxNKM!yC zpSXKgEMR?`bDM50(5*6k5B3FT$ptU?shoG317>xoqfO96-!z)Rr)@gOOC6ytj4ksmYMX8B1tQ*T9 zmjVcm76|5lB}rw_BQxz1TPi54$|4C^Mcv`1(>5+C!lMgl{Bpn zwT@kvwB$HMD@Lb1gtAKN)HG2cYXvN~K`17j(t2|cmT9{Q!XSg_t>ApX`wQp#t95Yi z)1jsk;7R1ns%hb@R?>5QoG$BM!T|F~c&sLXd3A?c)}xQUiBcvF&ZW)(r|(s8YFjo| z%=w_5Ra=DVoO9i)*-8bBj%DKq#uUw|xegd%AO9Why<;Rm|GU#$Th)Lsdu}WW=x#QX zf+sN{MF6?>YE+5SRlo>zv87mo7XftIiYO@(=bn*(3oyB9UJxb4loqh0SP5zqGSV{= z&tp9h_LJ>aO? zM3~h~HdI8^LM)bn7=nXYuxbe&o>b#@!Mjr;0h}Zl3!N$cPN^rDrEdV3CEz>TqVZ%Q z{D~)uR3%hPjMqBF$N2dBz+VK6&x-N6R5VeH!pK`55L^ew>0%7StMLiQN|LRR5QKDy z#pJy>(4?3MBvvfN7hX)+T+JNb8_dGBqRjsaA3HE%oH3eGm!d7QvQf_xjU$jsFpre^OfviXYKWx848 z1AmVkdNbcMvUMTvy&=zkN}QTt}WG%63r$y!D~qG8guY(DHJJxlly6lzEgo z0N!rXBai8kKLIiHzudF`Z6^43&+)f0cq!h1-SgkU-q)^nqt7p&h3_xBdmFDZ=n>O$ z^_1~ZrvaKCby1{z%1Fv*sNsv~QMYZV#rP=HFjQ~csx?E)R=pWowpxr({`JM)rja^h zpt*Q#dO(DF0E&ULGrd#{T7$*gSv`e{r`!sWdh!1=#IV z0ap*-OZp-xpGokS@htQcR4U$v4=gA}{SH Python processing -> MQTT publish. \ No newline at end of file diff --git a/memory-bank/techContext.md b/memory-bank/techContext.md deleted file mode 100644 index ed2520d..0000000 --- a/memory-bank/techContext.md +++ /dev/null @@ -1,21 +0,0 @@ -# Technical Context - -## Technologies Used -- **Python 3.x**: Main programming language. -- **pymodbus**: For Modbus TCP client communication. -- **paho-mqtt**: For MQTT client functionality. -- **Logging**: Python's built-in logging module for event and error tracking. - -## Dependencies -- `pymodbus` (>=2.5, <4.0 recommended for current code) -- `paho-mqtt` - -## Development Setup -- No external configuration files; all settings are in `poe.py` as variables. -- Script is intended to run on any system with Python 3.x and the above dependencies installed. - -## Technical Constraints -- Assumes the Modbus device is accessible via TCP/IP and supports reading holding registers for temperature and humidity. -- MQTT broker must be reachable from the host running the script. -- No persistent storage or database integration; data is transient. -- No web UI or REST API; all interaction is via logs and MQTT. \ No newline at end of file diff --git a/poe-sensor.nomad b/poe-sensor.nomad new file mode 100644 index 0000000..7c5bef8 --- /dev/null +++ b/poe-sensor.nomad @@ -0,0 +1,109 @@ +job "poe-sensor" { + region = "global" + datacenters = ["hs"] + type = "service" + namespace = "production" + meta { + version = "20250617" + } + + group "sensor-bridge" { + count = 1 + + # Network configuration - using host mode for Modbus access + network { + mode = "host" + port "health" { + static = 8080 + } + } + + # Restart policy + restart { + attempts = 3 + interval = "30m" + delay = "15s" + mode = "fail" + } + + # Update strategy + update { + max_parallel = 1 + min_healthy_time = "30s" + healthy_deadline = "3m" + progress_deadline = "10m" + auto_revert = true + canary = 0 + } + + service { + name = "${NOMAD_JOB_NAME}" + port = "health" + tags = [ + "sensor", + "modbus", + "mqtt", + "iot", + "health-check" + ] + + check { + type = "http" + path = "/health" + interval = "30s" + timeout = "10s" + check_restart { + limit = 3 + grace = "10s" + } + } + } + + task "poe-sensor" { + driver = "docker" + + config { + image = "registry.dev.meisheng.group/ms_qc_db:20250409" + command = "/bin/bash" + args = [ + "-c", + "cd local/poe-sensor && apt-get update -qq && apt-get install -y procps && python -m pip install --upgrade pip && python -m pip install -r requirements.txt && python -c 'import pymodbus, paho.mqtt.client; print(\"Dependencies installed successfully\")' && python main.py" + ] + } + + # Git artifact - using SSH similar to qc-scanner + artifact { + source = "git::ssh://git@gitea.service.mesh:2222/Mei_Sheng_Textiles/POE-sensor.git" + destination = "local/poe-sensor" + options { + ref = "main" + sshkey = "LS0tLS1CRUdJTiBPUEVOU1NIIFBSSVZBVEUgS0VZLS0tLS0KYjNCbGJuTnphQzFyWlhrdGRqRUFBQUFBQkc1dmJtVUFBQUFFYm05dVpRQUFBQUFBQUFBQkFBQUFNd0FBQUF0emMyZ3RaVwpReU5UVXhPUUFBQUNEbWF6M0ZWdlE1YTRaalY4dUdobENleEFjN0VxbmVVN0FETnFBSXg0cUI4d0FBQUpqNGlkSVcrSW5TCkZnQUFBQXR6YzJndFpXUXlOVFV4T1FBQUFDRG1hejNGVnZRNWE0WmpWOHVHaGxDZXhBYzdFcW5lVTdBRE5xQUl4NHFCOHcKQUFBRURpRXM1ejJRb2dTempvVzdDUnZ3U2RONUpVMTNmZm14cnFIQjNOS3hXUmp1WnJQY1ZXOURscmhtTlh5NGFHVUo3RQpCenNTcWQ1VHNBTTJvQWpIaW9IekFBQUFFbUpoYmk1dVpFQnRjM1JsZUhadUxtTnZiUUVDQXc9PQotLS0tLUVORCBPUkVOU1NIIFBSSVZBVEUgS0VZLS0tLS0K" + } + } + + # Environment variables + env { + LOG_LEVEL = "INFO" + PYTHONUNBUFFERED = "1" + TZ = "Asia/Ho_Chi_Minh" + # MQTT configuration (can be overridden by config.py) + MQTT_BROKER = "mqtt.service.mesh" + MQTT_PORT = "1883" + MQTT_USERNAME = "relay" + MQTT_PASSWORD = "Sey@K9c&Q4^" + } + + # Resource allocation + resources { + cpu = 256 + memory = 256 + } + + # Logs configuration + logs { + max_files = 10 + max_file_size = 20 + } + } + } +} diff --git a/requirements.txt b/requirements.txt index 0b055fb..08ed65a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ pymodbus>=3.5.0 -paho-mqtt>=1.6.0 \ No newline at end of file +paho-mqtt>=1.6.0 +requests>=2.28.0 diff --git a/sensor_bridge.py b/sensor_bridge.py index f5cfcb4..08e2f05 100644 --- a/sensor_bridge.py +++ b/sensor_bridge.py @@ -1,151 +1,282 @@ import logging import json import time -from datetime import datetime +from datetime import datetime, timezone from pymodbus.client import ModbusTcpClient from pymodbus.exceptions import ModbusException - -# Add MQTT client import paho.mqtt.client as mqtt -# Import configuration from config import ( - MODBUS_HOST, MODBUS_PORT, UNIT_ID, + MODBUS_HOSTS, MODBUS_PORT, UNIT_ID, MQTT_BROKER, MQTT_PORT, MQTT_TOPIC, MQTT_CLIENT_ID, - MQTT_USERNAME, MQTT_PASSWORD, LOCATION, PUBLISH_INTERVAL + MQTT_USERNAME, MQTT_PASSWORD, PUBLISH_INTERVAL ) +from sensor_tracker import get_sensor_tracker +from health_check import HealthCheckServer -# Setting logging basic to see output +# Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') -# MQTT callbacks def on_connect(client, userdata, flags, rc): + """Callback when MQTT client connects to broker""" if rc == 0: logging.info("Connected to MQTT Broker!") else: logging.error(f"Cannot connect to MQTT Broker. Return code: {rc}") def on_publish(client, userdata, mid): + """Callback when MQTT message is published""" logging.info(f"Successfully sent message with ID: {mid}") -def read_and_publish_data(mqtt_client, modbus_client): +def read_and_publish_data(mqtt_client, modbus_client, host_info): """Read data from Modbus and publish to MQTT""" try: - # Check connection to Modbus server + # Check and establish Modbus connection if not modbus_client.is_socket_open(): - if not modbus_client.connect(): - logging.error("Cannot connect to Modbus server.") + logging.info(f"Attempting to connect to {host_info['ip']}:{MODBUS_PORT}") + connection_result = modbus_client.connect() + if not connection_result: + logging.error(f"Failed to connect to Modbus server {host_info['ip']}. Connection returned: {connection_result}") return False + logging.info(f"Successfully connected to {host_info['ip']}:{MODBUS_PORT}") + # Handle different sensor types + if host_info["type"] == "cwt_co2": + return read_and_publish_cwt_co2(mqtt_client, modbus_client, host_info) + else: + return read_and_publish_temperature_humidity(mqtt_client, modbus_client, host_info) + + except ModbusException as e: + logging.error(f"Modbus error from {host_info['ip']}: {e}", exc_info=True) + return False + except Exception as e: + logging.error(f"Unexpected error from {host_info['ip']}: {e}", exc_info=True) + return False + +def read_and_publish_temperature_humidity(mqtt_client, modbus_client, host_info): + """Read temperature and humidity sensors""" + try: # Read temperature (register 0) result_temp = modbus_client.read_holding_registers(address=0, count=1, slave=UNIT_ID) + if not hasattr(result_temp, 'registers') or not result_temp.registers: + logging.error(f"Error reading temperature from {host_info['ip']}: {result_temp}") + return False + raw_temp = result_temp.registers[0] + temperature = (125 - (-40)) * raw_temp / 1650 - 40 # Correct formula: -40°C to 125°C + logging.info(f"Raw temperature from {host_info['ip']}: {raw_temp}, Corrected: {temperature:.1f}°C") # Read humidity (register 1) result_hum = modbus_client.read_holding_registers(address=1, count=1, slave=UNIT_ID) - - # Initialize data to publish - data = { - "time": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"), - "location": LOCATION - } - - # Process temperature - if hasattr(result_temp, 'registers') and result_temp.registers: - raw_temp = result_temp.registers[0] - raw_temperature = raw_temp * 0.1 - temperature = raw_temperature - 40 - logging.info(f"Raw temperature: {raw_temperature:.1f}°C (raw: {raw_temp})") - logging.info(f"Corrected temperature: {temperature:.1f}°C") - data["temperature"] = round(temperature, 1) - else: - logging.error(f"Error reading temperature: {result_temp}") + if not hasattr(result_hum, 'registers') or not result_hum.registers: + logging.error(f"Error reading humidity from {host_info['ip']}: {result_hum}") return False + raw_hum = result_hum.registers[0] + humidity = raw_hum * 100 / 1000 # Correct formula: 0% to 100% RH + logging.info(f"Raw humidity from {host_info['ip']}: {raw_hum}, Corrected: {humidity:.1f}%RH") - # Xử lý độ ẩm - if hasattr(result_hum, 'registers') and result_hum.registers: - raw_hum = result_hum.registers[0] - humidity = raw_hum * 0.1 - logging.info(f"Humidity: {humidity:.1f}%RH (raw: {raw_hum})") - data["humidity"] = round(humidity, 1) - else: - logging.error(f"Error reading humidity: {result_hum}") - return False - - # Convert data to JSON with a better format - # indent=2 creates whitespace and newlines for JSON - payload = json.dumps(data, indent=2) - logging.info(f"Publishing data: {payload}") - result = mqtt_client.publish(MQTT_TOPIC, payload) + # Prepare new topic structure: Location/{location_name}/{sensor_type}/data + location = host_info["location"] + sensor_type = "temperature-humidity" + current_time = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") - # Ensure data is sent + # Create topic for combined data + data_topic = f"Location/{location}/{sensor_type}/data" + + # Create JSON payload with all data and status + payload = { + "timestamp": current_time, + "location": location, + "sensor_type": sensor_type, + "ip": host_info["ip"], + "status": "online", + "data": { + "temperature": round(temperature, 1), + "humidity": round(humidity, 1) + } + } + + # Publish combined data as JSON + result = mqtt_client.publish(data_topic, json.dumps(payload)) result.wait_for_publish() + logging.info(f"Published to '{data_topic}': {json.dumps(payload, indent=2)}") + + # Check if published successfully if result.is_published(): - logging.info(f"Successfully published data to topic '{MQTT_TOPIC}'") + logging.info(f"Successfully published temperature-humidity data for {location}") return True else: - logging.error("Cannot publish data") + logging.error(f"Failed to publish temperature-humidity data from {host_info['ip']}") return False + except ModbusException as e: + logging.error(f"Modbus error from {host_info['ip']}: {e}", exc_info=True) + return False except Exception as e: - logging.error(f"Error in reading/writing data: {e}", exc_info=True) + logging.error(f"Unexpected error from {host_info['ip']}: {e}", exc_info=True) + return False + +def read_and_publish_cwt_co2(mqtt_client, modbus_client, host_info): + """Read CWT CO2 sensor (humidity, temperature, CO2)""" + try: + # Read all 3 registers at once (registers 0, 1, 2) + # According to CWT manual: register 0=humidity, 1=temperature, 2=CO2 + result = modbus_client.read_holding_registers(address=0, count=3, slave=UNIT_ID) + + if not hasattr(result, 'registers') or len(result.registers) != 3: + logging.error(f"Error reading CWT registers from {host_info['ip']}: {result}") + return False + + raw_humidity = result.registers[0] # Register 0: Humidity (0.1%RH) + raw_temperature = result.registers[1] # Register 1: Temperature (0.1°C) + raw_co2 = result.registers[2] # Register 2: CO2 (1ppm) + + logging.info(f"Raw CWT values from {host_info['ip']} - Humidity: {raw_humidity}, Temperature: {raw_temperature}, CO2: {raw_co2}") + + # Process values according to CWT manual + # Humidity: 0.1%RH resolution + humidity = raw_humidity / 10.0 + + # Temperature: 0.1°C resolution, handle negative values (2's complement) + if raw_temperature > 32767: # Negative temperature in 2's complement + temperature = (raw_temperature - 65536) / 10.0 + else: + temperature = raw_temperature / 10.0 + + # CO2: 1ppm resolution for standard sensor + co2_ppm = raw_co2 + + logging.info(f"Processed CWT values from {host_info['ip']} - Humidity: {humidity:.1f}%RH, Temperature: {temperature:.1f}°C, CO2: {co2_ppm}ppm") + + # Prepare new topic structure: Location/{location_name}/{sensor_type}/data + location = host_info["location"] + sensor_type = "CO2-gas" + current_time = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") + + # Create topic for combined data + data_topic = f"Location/{location}/{sensor_type}/data" + + # Create JSON payload with all data and status + payload = { + "timestamp": current_time, + "location": location, + "sensor_type": sensor_type, + "ip": host_info["ip"], + "status": "online", + "data": { + "co2": co2_ppm, + "temperature": round(temperature, 1), + "humidity": round(humidity, 1) + } + } + + # Publish combined data as JSON + result = mqtt_client.publish(data_topic, json.dumps(payload)) + result.wait_for_publish() + + logging.info(f"Published to '{data_topic}': {json.dumps(payload, indent=2)}") + + # Check if published successfully + if result.is_published(): + logging.info(f"Successfully published CO2-gas data for {location}") + return True + else: + logging.error(f"Failed to publish CO2-gas data from {host_info['ip']}") + return False + + except ModbusException as e: + logging.error(f"Modbus error from CWT sensor {host_info['ip']}: {e}", exc_info=True) + return False + except Exception as e: + logging.error(f"Unexpected error from CWT sensor {host_info['ip']}: {e}", exc_info=True) return False def main_loop(): """Main function to connect and publish data in cycles""" + # Initialize sensor tracker and health check server + sensor_tracker = get_sensor_tracker() + health_server = HealthCheckServer() + # Initialize MQTT client mqtt_client = mqtt.Client(client_id=MQTT_CLIENT_ID) mqtt_client.on_connect = on_connect mqtt_client.on_publish = on_publish - - # Set username and password if needed + if MQTT_USERNAME and MQTT_PASSWORD: mqtt_client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD) - - # Initialize Modbus TCP client - modbus_client = ModbusTcpClient( - host=MODBUS_HOST, - port=MODBUS_PORT, - timeout=30 - ) - + try: + # Start health check server + health_server.start() + # Connect to MQTT broker + logging.info(f"Connecting to MQTT broker {MQTT_BROKER}:{MQTT_PORT}...") mqtt_client.connect(MQTT_BROKER, MQTT_PORT, 60) mqtt_client.loop_start() - logging.info(f"Attempting to connect to Modbus TCP server: {MODBUS_HOST}:{MODBUS_PORT}...") - - # Connect to Modbus server - if not modbus_client.connect(): - logging.error("Cannot connect to Modbus server initially.") - return - logging.info("Successfully connected to Modbus server.") - logging.info(f"Starting data reading cycle every {PUBLISH_INTERVAL} seconds...") - - try: - # Main loop to read and publish data - while True: - success = read_and_publish_data(mqtt_client, modbus_client) + logging.info(f"Starting monitoring of {len(MODBUS_HOSTS)} sensors") + logging.info("System status can be monitored at:") + logging.info(f" - Health: http://localhost:8080/health") + logging.info(f" - Sensors: http://localhost:8080/sensors") + + # Main loop to read and publish data from all hosts + while True: + for host_info in MODBUS_HOSTS: + error_message = None + try: + modbus_client = ModbusTcpClient( + host=host_info["ip"], + port=MODBUS_PORT, + timeout=10 # Reduced timeout from 30 to 10 seconds + ) + logging.info(f"Processing channel {host_info['location']} at {host_info['ip']}:{MODBUS_PORT}") + success = read_and_publish_data(mqtt_client, modbus_client, host_info) + + if success: + # Record successful reading + sensor_tracker.record_success(host_info, mqtt_client) + logging.info(f"Successfully processed {host_info['location']} ({host_info['ip']})") + else: + error_message = f"Failed to read/publish data from {host_info['ip']}" + sensor_tracker.record_failure(host_info, error_message, mqtt_client) + logging.warning(f"Failed to process {host_info['location']} ({host_info['ip']}), will retry next cycle.") + + except Exception as e: + error_message = f"Exception processing {host_info['ip']}: {str(e)}" + sensor_tracker.record_failure(host_info, error_message, mqtt_client) + logging.error(f"Error processing {host_info['location']} ({host_info['ip']}): {e}", exc_info=True) + finally: + try: + modbus_client.close() + logging.debug(f"Closed connection to {host_info['ip']}") + except: + pass + + # Add small delay between processing each sensor + time.sleep(1) + + # Log system summary every cycle + summary = sensor_tracker.get_summary() + logging.info(f"Cycle completed - Online: {summary['online_sensors']}/{summary['total_sensors']} sensors " + f"({summary['health_percentage']:.1f}% health), " + f"Alerts: {summary['alerted_sensors']}") - if not success: - logging.warning("Error occurred in current cycle, will retry in next cycle.") - - # Wait for next publish cycle - logging.info(f"Waiting {PUBLISH_INTERVAL} seconds until next publish...") - time.sleep(PUBLISH_INTERVAL) - - except KeyboardInterrupt: - logging.info("Received stop signal from user.") - + logging.info(f"Waiting {PUBLISH_INTERVAL} seconds until next cycle...") + time.sleep(PUBLISH_INTERVAL) + + except KeyboardInterrupt: + logging.info("Received stop signal from user, shutting down...") except Exception as e: logging.error(f"Unexpected error in main loop: {e}", exc_info=True) - finally: - # Close Modbus connection - modbus_client.close() - logging.info("Successfully closed Modbus connection.") - + # Cleanup + try: + health_server.stop() + except: + pass mqtt_client.loop_stop() mqtt_client.disconnect() - logging.info("Successfully closed MQTT connection.") \ No newline at end of file + logging.info("Successfully closed all connections.") + +if __name__ == "__main__": + main_loop() diff --git a/sensor_tracker.py b/sensor_tracker.py new file mode 100644 index 0000000..6a52f49 --- /dev/null +++ b/sensor_tracker.py @@ -0,0 +1,232 @@ +import json +import threading +from datetime import datetime, timezone +from collections import defaultdict +import logging + +from config import ( + MODBUS_HOSTS, SENSOR_TIMEOUT_THRESHOLD, RECOVERY_CONFIRMATION_COUNT, + ALERTING_ENABLED +) + +class SensorTracker: + def __init__(self): + self.sensor_status = {} + self.failure_counts = defaultdict(int) + self.success_counts = defaultdict(int) + self.alerted_sensors = set() + self.lock = threading.Lock() + + # Initialize sensor status + for host in MODBUS_HOSTS: + sensor_id = f"{host['ip']}_{host['location']}" + self.sensor_status[sensor_id] = { + "ip": host["ip"], + "location": host["location"], + "type": host["type"], + "status": "unknown", + "last_success": None, + "last_failure": None, + "consecutive_failures": 0, + "consecutive_successes": 0, + "total_failures": 0, + "total_successes": 0, + "uptime_percentage": 0.0 + } + + def record_success(self, host_info, mqtt_client=None): + """Record a successful sensor reading""" + sensor_id = f"{host_info['ip']}_{host_info['location']}" + current_time = datetime.now(timezone.utc) + + with self.lock: + sensor = self.sensor_status[sensor_id] + sensor["status"] = "online" + sensor["last_success"] = current_time.isoformat() + sensor["consecutive_failures"] = 0 + sensor["consecutive_successes"] += 1 + sensor["total_successes"] += 1 + + # Update uptime percentage + total_attempts = sensor["total_successes"] + sensor["total_failures"] + if total_attempts > 0: + sensor["uptime_percentage"] = (sensor["total_successes"] / total_attempts) * 100 + + # Check for recovery + if (sensor["consecutive_successes"] >= RECOVERY_CONFIRMATION_COUNT and + sensor_id in self.alerted_sensors): + self._send_recovery_alert(sensor_id, sensor, mqtt_client) + self.alerted_sensors.remove(sensor_id) + logging.info(f"Sensor {sensor['location']} ({sensor['ip']}) has recovered") + + # Note: Status is now published as part of sensor data, not separately + + def record_failure(self, host_info, error_message, mqtt_client=None): + """Record a failed sensor reading""" + sensor_id = f"{host_info['ip']}_{host_info['location']}" + current_time = datetime.now(timezone.utc) + + with self.lock: + sensor = self.sensor_status[sensor_id] + sensor["status"] = "offline" + sensor["last_failure"] = current_time.isoformat() + sensor["last_error"] = error_message + sensor["consecutive_successes"] = 0 + sensor["consecutive_failures"] += 1 + sensor["total_failures"] += 1 + + # Update uptime percentage + total_attempts = sensor["total_successes"] + sensor["total_failures"] + if total_attempts > 0: + sensor["uptime_percentage"] = (sensor["total_successes"] / total_attempts) * 100 + + # Check if we need to send an alert + if (sensor["consecutive_failures"] >= SENSOR_TIMEOUT_THRESHOLD and + sensor_id not in self.alerted_sensors): + self._send_failure_alert(sensor_id, sensor, mqtt_client) + self.alerted_sensors.add(sensor_id) + logging.warning(f"Sensor {sensor['location']} ({sensor['ip']}) is now considered offline") + + # Publish offline status using new topic structure + if mqtt_client and ALERTING_ENABLED: + self._publish_offline_status(host_info, sensor, mqtt_client) + + def _send_failure_alert(self, sensor_id, sensor, mqtt_client): + """Send failure alert to MQTT""" + if not mqtt_client or not ALERTING_ENABLED: + return + + # Determine sensor type for topic structure + if sensor["type"] == "cwt_co2": + sensor_type = "CO2-gas" + else: + sensor_type = "temperature-humidity" + + # Create alert topic using new structure: Location/{location_name}/{sensor_type}/alerts + alert_topic = f"Location/{sensor['location']}/{sensor_type}/alerts" + + alert_message = { + "alert_type": "sensor_failure", + "timestamp": datetime.now(timezone.utc).isoformat(), + "sensor_id": sensor_id, + "sensor_ip": sensor["ip"], + "sensor_location": sensor["location"], + "sensor_type": sensor["type"], + "consecutive_failures": sensor["consecutive_failures"], + "last_error": sensor.get("last_error", "Unknown error"), + "severity": "critical" + } + + try: + result = mqtt_client.publish(alert_topic, json.dumps(alert_message)) + result.wait_for_publish() + logging.info(f"Sent failure alert for sensor {sensor['location']} to '{alert_topic}'") + except Exception as e: + logging.error(f"Failed to send failure alert: {e}") + + def _send_recovery_alert(self, sensor_id, sensor, mqtt_client): + """Send recovery alert to MQTT""" + if not mqtt_client or not ALERTING_ENABLED: + return + + # Determine sensor type for topic structure + if sensor["type"] == "cwt_co2": + sensor_type = "CO2-gas" + else: + sensor_type = "temperature-humidity" + + # Create alert topic using new structure: Location/{location_name}/{sensor_type}/alerts + alert_topic = f"Location/{sensor['location']}/{sensor_type}/alerts" + + alert_message = { + "alert_type": "sensor_recovery", + "timestamp": datetime.now(timezone.utc).isoformat(), + "sensor_id": sensor_id, + "sensor_ip": sensor["ip"], + "sensor_location": sensor["location"], + "sensor_type": sensor["type"], + "consecutive_successes": sensor["consecutive_successes"], + "severity": "info" + } + + try: + result = mqtt_client.publish(alert_topic, json.dumps(alert_message)) + result.wait_for_publish() + logging.info(f"Sent recovery alert for sensor {sensor['location']} to '{alert_topic}'") + except Exception as e: + logging.error(f"Failed to send recovery alert: {e}") + + def _publish_offline_status(self, host_info, sensor, mqtt_client): + """Publish offline status using new topic structure""" + try: + location = host_info["location"] + # Determine sensor type based on host_info type + if host_info["type"] == "cwt_co2": + sensor_type = "CO2-gas" + else: + sensor_type = "temperature-humidity" + + # Create topic for offline status using new structure: Location/{location_name}/{sensor_type}/data + data_topic = f"Location/{location}/{sensor_type}/data" + + # Create JSON payload with offline status + payload = { + "timestamp": datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S"), + "location": location, + "sensor_type": sensor_type, + "ip": host_info["ip"], + "status": "offline", + "error": sensor.get("last_error", "Unknown error"), + "consecutive_failures": sensor["consecutive_failures"], + "uptime_percentage": round(sensor["uptime_percentage"], 2) + } + + result = mqtt_client.publish(data_topic, json.dumps(payload)) + result.wait_for_publish() + logging.info(f"Published offline status to '{data_topic}'") + except Exception as e: + logging.error(f"Failed to publish offline status: {e}") + + def get_sensor_status(self, sensor_id): + """Get status of a specific sensor""" + with self.lock: + return self.sensor_status.get(sensor_id, {}).copy() + + def get_all_sensor_status(self): + """Get status of all sensors""" + with self.lock: + return { + "timestamp": datetime.now(timezone.utc).isoformat(), + "total_sensors": len(self.sensor_status), + "online_sensors": len([s for s in self.sensor_status.values() if s["status"] == "online"]), + "offline_sensors": len([s for s in self.sensor_status.values() if s["status"] == "offline"]), + "sensors": self.sensor_status.copy() + } + + def get_summary(self): + """Get a summary of sensor health""" + with self.lock: + total = len(self.sensor_status) + online = len([s for s in self.sensor_status.values() if s["status"] == "online"]) + offline = len([s for s in self.sensor_status.values() if s["status"] == "offline"]) + unknown = len([s for s in self.sensor_status.values() if s["status"] == "unknown"]) + + return { + "total_sensors": total, + "online_sensors": online, + "offline_sensors": offline, + "unknown_sensors": unknown, + "health_percentage": (online / total * 100) if total > 0 else 0, + "alerted_sensors": len(self.alerted_sensors) + } + +# Global sensor tracker instance +_sensor_tracker = SensorTracker() + +def get_sensor_tracker(): + """Get the global sensor tracker instance""" + return _sensor_tracker + +def get_all_sensor_status(): + """Get status of all sensors (convenience function for health check)""" + return _sensor_tracker.get_all_sensor_status()