401 lines
14 KiB
Python
Raw Permalink Normal View History

2025-03-14 15:46:15 +08:00
import logging
import os
import sqlite3
import struct
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta
import re
from threading import Lock
from PyQt5.QtWidgets import QMessageBox
class DataProcessor:
def __init__(self):
self.log_buffer = {}
self.db_lock = Lock()
self.log_size = 10
self.position = 5
self.funcode = 'F4'
self.buffer = []
self.buffer_size = 1000 # 缓冲区大小
self.buffer_interval = 10 # 缓冲区时间间隔(秒)
self.executor = ThreadPoolExecutor(max_workers=2)
self.last_insert_time = datetime.now()
# 用于存储数据,并定义表头
self.sign_scale_arry = [
[0, 1, 'methane'], # 浓度
[0, 1, 'laser'], # 光强
[1, 100, 'air_temp'], # 气温
[1, 100, 'laser_temp'], # 激光温度
[0, 1, 'radio'], # 放大倍数
[1, 100, 'snr'], # snr
[1, 100, 'NL'], # NL
[1, 100, 'Ns'], # Ns
[1, 100, 'Signal'], # Signal
[0, 1, 'peak'], # peak
[0, 1000, 'confidence'], # 置信度
[0, 100, 'methane_stability'], # 浓度稳定性
[0, 100, 'peak_stability'], # peak稳定性
]
@staticmethod
def encode_data(data):
"""编码数据"""
try:
encoded_data = data.encode('utf-8')
encoded_data = re.sub(b'(?<!\r)\n', b'\r\n', encoded_data) # 替换换行符
return encoded_data
except Exception as e:
return f"编码失败: {e}"
@staticmethod
def decode_data(data):
"""解码数据"""
try:
decoded_data = data.decode('utf-8', errors='replace')
return decoded_data
except Exception as e:
return f"解码失败: {e}"
@staticmethod
def format_bytes_to_hexstr_space(data_bytes):
"""格式化数据为十六进制字符串"""
out_str = ' '.join(f'{byte:02X}' for byte in data_bytes)
return out_str
@staticmethod
def format_bytes_to_hexstr(data_bytes):
"""格式化数据为十六进制字符串"""
out_str = ''.join(f'{byte:02X}' for byte in data_bytes)
return out_str
@staticmethod
def format_hexstr_space_to_bytes(hexstr_space):
"""将十六进制字符串转换为字节数组"""
send_list = []
input_s = hexstr_space.strip() # 移除所有空格
while input_s != '':
try:
num = int(input_s[0:2], 16)
except ValueError:
msg_box = QMessageBox()
msg_box.setIcon(QMessageBox.Critical)
msg_box.setWindowTitle('串口异常')
msg_box.setText('请输入规范十六进制数据,以空格分开!')
msg_box.exec_()
return None
input_s = input_s[2:].strip()
send_list.append(num)
out_bytes = bytes(send_list)
return out_bytes
@staticmethod
def bytes_to_uint(data_bytes):
"""将字节数据转换为无符号整数"""
try:
if len(data_bytes) == 1:
return struct.unpack('B', data_bytes)[0]
elif len(data_bytes) == 2:
return struct.unpack('>H', data_bytes)[0]
elif len(data_bytes) == 4:
return struct.unpack('>I', data_bytes)[0]
elif len(data_bytes) == 8:
return struct.unpack('>Q', data_bytes)[0]
else:
raise ValueError("不支持的字节长度")
except struct.error as e:
return f"转换失败: {e}"
@staticmethod
def bytes_to_int(data_bytes):
"""将字节数据转换为有符号整数"""
try:
if len(data_bytes) == 1:
return struct.unpack('b', data_bytes)[0]
elif len(data_bytes) == 2:
return struct.unpack('>h', data_bytes)[0]
elif len(data_bytes) == 4:
return struct.unpack('>i', data_bytes)[0]
elif len(data_bytes) == 8:
return struct.unpack('>q', data_bytes)[0]
else:
raise ValueError("不支持的字节长度")
except struct.error as e:
return f"转换失败: {e}"
@staticmethod
def crc_rtu(data):
crc = 0xFFFF
for pos in data:
crc ^= pos
for _ in range(8):
if (crc & 0x0001) != 0:
crc >>= 1
crc ^= 0xA001
else:
crc >>= 1
return crc.to_bytes(2, byteorder='little') # 返回低字节在前
def format_bytes_to_float(self, data_bytes, sign=0, scale=1):
try:
if sign == 0:
data_int = self.bytes_to_uint(data_bytes)
else:
data_int = self.bytes_to_int(data_bytes)
data_float = data_int / scale
return data_float
except Exception as e:
return f"转换失败: {e}"
def auto_save_log_asc(self, decoded_data, port_name=None):
line = decoded_data.split(',')
if len(line) == 2:
sav_name = line[0] + '_' + datetime.now().strftime("%Y-%m-%d") + '.txt'
sav_str = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")[:23] + " " + line[1]
else:
if port_name:
port_name = port_name + '_'
else:
port_name = ''
sav_name = port_name + datetime.now().strftime("%Y-%m-%d") + '.log'
sav_str = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")[:23] + " " + decoded_data
self.auto_save_Log(sav_name, sav_str)
def auto_save_log_hex(self, data_bytes, port_name=None):
data_hex_str = self.format_bytes_to_hexstr(data_bytes)
out_str = self.format_bytes_to_hexstr_space(data_bytes)
if data_hex_str[2 * self.position:2 * self.position + 2] == self.funcode:
sav_name = data_hex_str[0:8] + '_' + datetime.now().strftime("%Y-%m-%d") + '.txt'
sav_str = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")[:23] + " " + out_str[12:] + "\r\n"
if data_hex_str[14:18] != '0000':
sav_name_out = 'alarm_' + data_hex_str[0:8] + '_' + datetime.now().strftime("%Y-%m-%d") + '.txt'
sav_str_out = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")[:23] + " " + out_str[12:] + "\r\n"
with open(sav_name_out, mode='a', newline='', encoding='utf-8', errors='replace') as file:
file.writelines(sav_str_out)
else:
if port_name:
port_name = port_name + '_'
else:
port_name = ''
sav_name = port_name + datetime.now().strftime("%Y-%m-%d") + '.log'
sav_str = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")[:23] + " " + out_str + "\r\n"
self.auto_save_Log(sav_name, sav_str)
def auto_save_Log(self, sav_name, sav_str):
if sav_name not in self.log_buffer:
self.log_buffer[sav_name] = []
self.log_buffer[sav_name].append(sav_str)
if len(self.log_buffer[sav_name]) >= self.log_size:
with open(sav_name, mode='a', newline='', encoding='utf-8', errors='replace') as file:
file.writelines(self.log_buffer[sav_name])
self.log_buffer[sav_name].clear()
def update_log_size(self, log_size):
self.log_size = log_size
def update_position(self, position):
self.position = position
def update_funcode(self, funcode):
self.funcode = funcode
"""以下用于存储到数据库"""
def parse_modbus_data(self, hex_data_bytes):
total_bytes = len(hex_data_bytes)
if total_bytes < 4 + 1 + 1 + 1 + 2:
return None
if hex_data_bytes[self.position:self.position + 1].hex().upper() != self.funcode:
return None
attachment_code = hex_data_bytes[:4].hex().upper()
device_address = hex_data_bytes[4:5].hex().upper()
function_code = hex_data_bytes[5:6].hex().upper()
data_length = int.from_bytes(hex_data_bytes[6:7], byteorder='big')
checksum = hex_data_bytes[-2:]
crc = self.crc_rtu(hex_data_bytes[4:-2])
if crc != checksum:
logging.debug("保存数据库CRC校验失败")
return None
checksum = checksum.hex().upper()
valid_data_start = 7
valid_data_end = valid_data_start + data_length
valid_data = hex_data_bytes[valid_data_start: valid_data_end]
valid_data_pairs = []
for i in range(0, len(valid_data), 2):
pair_bytes = valid_data[i:i + 2]
pair = self.format_bytes_to_float(pair_bytes, self.sign_scale_arry[i // 2][0], self.sign_scale_arry[i // 2][1])
valid_data_pairs.append(pair)
result = {
"timestamp": datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")[:23],
"attachment_code": attachment_code,
"device_address": device_address,
"function_code": function_code,
"data_length": data_length,
"valid_data_pairs": valid_data_pairs,
"checksum": checksum
}
return result
def init_database(self, attachment_code, db_file):
table_name = f"modbus_logs_{attachment_code}"
conn = sqlite3.connect(db_file, isolation_level=None)
cursor = conn.cursor()
cursor.execute("PRAGMA journal_mode=WAL;")
cursor.execute(f"""
CREATE TABLE IF NOT EXISTS {table_name} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT UNIQUE,
attachment_code TEXT,
device_address TEXT,
function_code TEXT,
data_length INTEGER,
methane REAL,
laser REAL,
air_temp REAL,
laser_temp REAL,
radio REAL,
snr REAL,
NL REAL,
Ns REAL,
Signal REAL,
peak REAL,
confidence REAL,
methane_stability REAL,
peak_stability REAL,
checksum TEXT
)
""")
conn.commit()
return conn, table_name
def insert_data(self, conn, data, table_name):
cursor = conn.cursor()
columns = ["timestamp","attachment_code", "device_address", "function_code", "data_length", "checksum"]
placeholders = ["?", "?", "?", "?", "?", "?"]
for i in range(len(data["valid_data_pairs"])):
# columns.append(f"data{i + 1}")
columns.append(self.sign_scale_arry[i][2])
placeholders.append("?")
columns_str = ', '.join(columns)
placeholders_str = ', '.join(placeholders)
sql = f"INSERT INTO {table_name} ({columns_str}) VALUES ({placeholders_str})"
values = [
data["timestamp"],
data["attachment_code"],
data["device_address"],
data["function_code"],
data["data_length"],
data["checksum"]
]
values.extend(data["valid_data_pairs"])
cursor.execute(sql, tuple(values))
conn.commit()
def add_to_buffer(self, parsed_data):
self.buffer.append(parsed_data)
if len(self.buffer) >= self.buffer_size:
self.flush_buffer()
elif (datetime.now() - self.last_insert_time).total_seconds() >= self.buffer_interval:
self.flush_buffer()
def flush_buffer(self):
if not self.buffer:
return
self.executor.submit(self._flush_buffer_thread)
def _flush_buffer_thread(self):
try:
grouped_data = {}
for data in self.buffer:
attachment_code = data["attachment_code"]
if attachment_code not in grouped_data:
grouped_data[attachment_code] = []
grouped_data[attachment_code].append(data)
current_month = datetime.now().strftime("%Y_%m")
db_file = f"modbus_data_{current_month}.db"
for attachment_code, data_list in grouped_data.items():
conn, table_name = self.init_database(attachment_code, db_file)
with self.db_lock: # 使用锁来同步数据库操作
for data in data_list:
self.insert_data(conn, data, table_name)
logging.debug(f"保存数据库,数据已插入{table_name}")
conn.close()
self.buffer.clear()
self.last_insert_time = datetime.now()
logging.debug("数据已保存到数据库")
except Exception as e:
logging.error(f"插入数据失败: {e}")
def auto_save_hex_to_db(self, data_bytes):
parsed_data = self.parse_modbus_data(data_bytes)
if parsed_data:
self.add_to_buffer(parsed_data)
def cleanup_old_databases(self, months_to_keep=3):
try:
current_month = datetime.now().strftime("%Y_%m")
cutoff_date = datetime.now() - timedelta(days=months_to_keep * 30)
cutoff_month = cutoff_date.strftime("%Y_%m")
for month in self.get_existing_months():
if month < cutoff_month:
db_file = f"modbus_data_{month}.db"
logging.info(f"删除旧数据库文件: {db_file}")
os.remove(db_file)
except Exception as e:
logging.error(f"清理旧数据库文件失败: {e}")
def get_existing_months(self):
import os
existing_months = []
for filename in os.listdir('.'):
if filename.startswith("modbus_data_") and filename.endswith(".db"):
month = filename[12:-3] # 提取月份部分
existing_months.append(month)
return existing_months
# 示例使用
if __name__ == "__main__":
processor = DataProcessor()
# 模拟数据解析和保存
hex_data = b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1A\x1B\x1C\x1D\x1E\x1F'
processor.auto_save_hex_to_db(hex_data)
# 定期清理旧数据库文件
processor.cleanup_old_databases(months_to_keep=3)