python/LaserMehtanePT_RB/pythonProject/calibration_manager.py
2025-03-14 16:15:51 +08:00

227 lines
9.6 KiB
Python

import logging
import time
from PyQt5.QtCore import QTimer, pyqtSignal, QObject
from PyQt5.QtGui import QColor
class CalibrationManager(QObject):
query_finished = pyqtSignal(str, bytes)
# 定义常量
WRITE_SINGLE_REGISTER = 0x06
READ_HOLDING_REGISTERS = 0x03
QUERY_INTERVAL = 100 # 查询间隔时间,单位为毫秒
def __init__(self, serial_manager, data_processor, ui):
super().__init__()
self.serial_manager = serial_manager
self.data_processor = data_processor
self.ui = ui
self.query_timer = None
self.num = 0
self.modbus_data = [0] * 20
self.query_modbus_bytes = None
self.query_finished.connect(self.process_query_value)
def calibrate_air_tempera(self, temp_str):
"""开始温度标定"""
self.ui.plainTextEditStastusCalib.appendPlainText('################')
self.ui.plainTextEditStastusCalib.appendPlainText('环境温度,开始标定')
if not temp_str.isdigit():
self.ui.plainTextEditStastusCalib.appendPlainText('环境温度输入错误')
return
temp_int = int(float(temp_str) * 100)
temp_hex = temp_int.to_bytes(2, 'big')
self.write_register(0x01, 0x2C, temp_hex)
time.sleep(1)
self.check_air_tempera()
def check_air_tempera(self):
self.reset_port_colors()
self.ui.plainTextEditStastusCalib.appendPlainText('查询中...')
self.read_register(0x01, 0x2C, 0x00, 0x01)
def calibrate_laser_tempra(self):
self.ui.plainTextEditStastusCalib.appendPlainText('################')
self.ui.plainTextEditStastusCalib.appendPlainText('laser温度标定')
self.write_register(0x01, 0x2D, b'\x00\x01')
time.sleep(1)
self.check_laser_tempra()
def stop_calibrate_laser_tempra(self):
self.ui.plainTextEditStastusCalib.appendPlainText('################')
self.write_register(0x01, 0x2D, b'\x00\x00')
self.ui.plainTextEditStastusCalib.appendPlainText('laser温度标定结束')
def check_laser_tempra(self):
self.reset_port_colors()
self.ui.plainTextEditStastusCalib.appendPlainText('查询中...')
self.read_register(0x01, 0x2D, 0x00, 0x01)
def calibrate_laser_tempera_start(self, temp_str):
self.ui.plainTextEditStastusCalib.appendPlainText('################')
self.ui.plainTextEditStastusCalib.appendPlainText('laser温度标定起始点修改')
if not temp_str.isdigit():
self.ui.plainTextEditStastusCalib.appendPlainText('环境温度输入错误')
return
temp_int = int(float(temp_str) * 100)
temp_hex = temp_int.to_bytes(2, 'big')
self.write_register(0x00, 0x16, temp_hex)
time.sleep(1)
self.check_laser_tempera_start()
def check_laser_tempera_start(self):
self.reset_port_colors()
self.ui.plainTextEditStastusCalib.appendPlainText('################')
self.ui.plainTextEditStastusCalib.appendPlainText('laser温度标定起始点查询')
self.read_register(0x00, 0x16, 0x00, 0x01)
def calibrate_conc_ppm(self, ppm_str):
self.ui.plainTextEditStastusCalib.appendPlainText('################')
self.ui.plainTextEditStastusCalib.appendPlainText('浓度标定')
if not ppm_str.isdigit():
self.ui.plainTextEditStastusCalib.appendPlainText('浓度输入错误')
return
ppm_int = int(float(ppm_str))
ppm_hex = ppm_int.to_bytes(2, 'big')
self.write_register(0x01, 0x2E, ppm_hex)
time.sleep(1)
self.check_conc_ppm()
def check_conc_ppm(self):
self.reset_port_colors()
self.ui.plainTextEditStastusCalib.appendPlainText('################')
self.ui.plainTextEditStastusCalib.appendPlainText('浓度标定,查询')
self.read_register(0x01, 0x2E, 0x00, 0x01)
def calibrate_reset_conc_ppm(self):
self.ui.plainTextEditStastusCalib.appendPlainText('################')
self.ui.plainTextEditStastusCalib.appendPlainText('浓度标定,重置')
self.write_register(0x01, 0x2F, b'\x00\x00')
time.sleep(1)
self.check_reset_conc_ppm()
def check_reset_conc_ppm(self):
self.reset_port_colors()
self.ui.plainTextEditStastusCalib.appendPlainText('################')
self.ui.plainTextEditStastusCalib.appendPlainText('浓度标定,重置')
self.read_register(0x00, 0x64, 0x00, 0x01)
def write_register(self, address_hi, address_lo, value):
"""写入寄存器"""
self.modbus_data[0:6] = [0x01, self.WRITE_SINGLE_REGISTER, address_hi, address_lo, value[0],
value[1]]
self.send_bytes(bytes(self.modbus_data[:6]))
def read_register(self, address_hi, address_lo, add_num_hi, add_num_lo):
"""读取寄存器"""
self.modbus_data[0:6] = [0x01, self.READ_HOLDING_REGISTERS, address_hi, address_lo, add_num_hi, add_num_lo]
self.query_modbus_bytes = bytes(self.modbus_data[:6])
self.send_bytes(self.query_modbus_bytes)
self.ui.plainTextEditStastusCalib.appendPlainText('查询中...')
self.num = 0
self.start_query_timer()
def send_bytes(self, modbus_bytes):
crc = self.data_processor.crc_rtu(modbus_bytes)
modbus_bytes = modbus_bytes + crc
modbus_send_str = self.data_processor.format_bytes_to_hexstr_space(modbus_bytes)
self.ui.data_send(modbus_send_str)
def check_query_value(self):
self.num += 1
if self.num > 5:
self.stop_query()
self.ui.plainTextEditStastusCalib.appendPlainText('查询结束,在通道处查看结果')
return False
for port_name, buffer in self.serial_manager.buffers.items():
if self.validate_response(buffer):
self.query_finished.emit(port_name, buffer)
def validate_response(self, buffer):
"""验证响应"""
start_index = buffer.find(b'\x01\x03')
if start_index != -1 and len(buffer) >= start_index + 5:
byte_count = buffer[start_index + 2]
frame_length = start_index + 5 + byte_count
if len(buffer) >= frame_length:
crc = self.data_processor.crc_rtu(buffer[start_index:start_index + 5])
if buffer[start_index + 5:start_index + 7] == crc:
return True
return False
def process_query_value(self, port_name, buffer):
"""处理查询数据"""
logging.debug(f'{port_name} {buffer}')
if len(buffer) > 6:
ind = self.ui.find_port_row(port_name)
status = self.parse_query_value(self.query_modbus_bytes, buffer)
if status:
self.ui.plainTextEditStastusCalib.appendPlainText(f'{port_name}{status[0]}')
self.ui.update_table_status(ind, status[1], status[2])
def parse_query_value(self, modbus_bytes, buffer):
"""解析查询结果"""
if self.query_modbus_bytes in [
b'\x01\x03\x01\x2C\x00\x01', # 查询环境温度标定
b'\x01\x03\x01\x2D\x00\x01', # 查询激光温度标定
b'\x01\x03\x01\x2E\x00\x01' # 查询浓度标定
]:
status_map = {
b'\x01\x03\x02\x00\x00': ('未标定', '未标定', QColor(255, 255, 255)),
b'\x01\x03\x02\x00\x01': ('标定中', '标定中', QColor(255, 255, 0)),
b'\x01\x03\x02\x00\x02': ('标定成功', '标定成功', QColor(0, 255, 0)),
b'\x01\x03\x02\x00\x03': ('标定失败', '标定失败', QColor(255, 0, 0)),
b'\x01\x03\x02\x00\x04': ('标定失败,浓度太低', '浓度太低', QColor(255, 0, 0)),
b'\x01\x03\x02\x00\x05': ('标定失败,间距太小', '标定间隔太小', QColor(255, 0, 0))
}
for key, value in status_map.items():
if key in buffer:
return value
return None
elif modbus_bytes == b'\x01\x03\x00\x16\x00\x01':
bytes_count = buffer[2]
query_data = buffer[3:3 + bytes_count]
air_tempera = int.from_bytes(query_data, 'big') / 100
return f'环境温度:查询结果:{air_tempera}', f'{air_tempera}', QColor(0, 255, 0)
elif modbus_bytes == b'\x01\x03\x01\x2F\x00\x01':
bytes_count = buffer[2]
query_data = buffer[3:3 + bytes_count]
conc_flag = int.from_bytes(query_data, 'big')
return f'浓度标定:重置结果:{conc_flag}', f'{conc_flag}', QColor(0, 255, 0)
elif modbus_bytes == b'\x01\x03\x00\x64\x00\x01':
bytes_count = buffer[2]
query_data = buffer[3:3 + bytes_count]
conc_calib_flag = int.from_bytes(query_data, 'big')
return f'浓度标定:查询结果:{conc_calib_flag}', f'{conc_calib_flag}', QColor(0, 255, 0)
else:
return '查询失败', '查询失败', QColor(255, 0, 0)
def start_query_timer(self):
"""启动查询定时器"""
self.query_timer = QTimer(self.ui)
self.query_timer.timeout.connect(self.check_query_value)
self.query_timer.start(self.QUERY_INTERVAL)
def stop_query(self):
"""停止查询定时器"""
if self.query_timer and self.query_timer.isActive():
self.query_timer.stop()
def reset_port_colors(self):
"""重置串口通道颜色"""
for port_name in self.serial_manager.buffers.keys():
ind = self.ui.find_port_row(port_name)
if ind is not None:
self.ui.reset_port_color(ind)