324 lines
14 KiB
Python
Raw Permalink Normal View History

2025-03-14 15:46:15 +08:00
import logging
import time
from PyQt5.QtCore import QTimer, pyqtSignal, QObject
from PyQt5.QtGui import QColor
class CalibrationManager(QObject):
query_finished = pyqtSignal(str, bytes)
# 定义常量
WRITE_SINGLE_REGISTER = 0x06
READ_HOLDING_REGISTERS = 0x03
QUERY_INTERVAL = 1000 # 查询间隔时间,单位为毫秒
def __init__(self, serial_manager, data_processor, ui):
super().__init__()
self.serial_manager = serial_manager
self.data_processor = data_processor
self.ui = ui
self.num = 0
self.modbus_data = [0] * 20
self.query_modbus_bytes = None
self.query_timer = QTimer(self.ui)
self.query_timer.timeout.connect(self.check_query_value) # 定时查询
self.query_finished.connect(self.process_query_value) # 处理数据
def write_plain_text_edit(self, text, log_level='info'):
"""统一处理 plainTextEditStastusCalib 的文本写入和日志记录"""
self.ui.plainTextEditStastusCalib.appendPlainText(text)
if log_level == 'info':
logging.info(text)
elif log_level == 'error':
logging.error(text)
elif log_level == 'debug':
logging.debug(text)
def calibrate_air_tempera(self, temp_str):
"""开始温度标定"""
self.write_plain_text_edit('################\r\n环境温度,开始标定')
if not temp_str.isdigit():
self.write_plain_text_edit('环境温度输入错误', log_level='error')
return
temp_int = int(float(temp_str) * 100)
temp_hex = temp_int.to_bytes(2, 'big')
self.write_register(0x01, 0x2C, temp_hex)
time.sleep(1)
self.check_air_tempera()
def check_air_tempera(self):
self.reset_port_colors()
self.write_plain_text_edit('################\r\n空气温度标定状态:')
self.read_register(0x01, 0x2C, 0x00, 0x01)
def calibrate_laser_tempra(self):
self.write_plain_text_edit('################')
self.write_plain_text_edit('laser温度标定')
self.write_register(0x01, 0x2D, b'\x00\x01')
time.sleep(1)
self.check_laser_tempra()
def stop_calibrate_laser_tempra(self):
self.write_plain_text_edit('################')
self.write_register(0x01, 0x2D, b'\x00\x00')
self.write_plain_text_edit('laser温度标定结束')
def check_laser_tempra(self):
self.reset_port_colors()
self.write_plain_text_edit('查询中...')
self.read_register(0x01, 0x2D, 0x00, 0x01)
def check_work_tempra(self):
self.reset_port_colors()
self.write_plain_text_edit('查询当前激光工作点')
self.read_register(0x00, 0x01, 0x00, 0x01)
def calibrate_laser_tempera_start(self, temp_str):
self.write_plain_text_edit('################')
self.write_plain_text_edit('laser温度标定起始点修改')
if not temp_str.isdigit():
self.write_plain_text_edit('环境温度输入错误')
return
temp_int = int(float(temp_str) * 100)
temp_hex = temp_int.to_bytes(2, 'big')
self.write_register(0x00, 0x16, temp_hex)
time.sleep(1)
self.check_laser_tempera_start()
def check_laser_tempera_start(self):
self.reset_port_colors()
self.write_plain_text_edit('################')
self.write_plain_text_edit('laser温度标定起始点查询')
self.read_register(0x00, 0x16, 0x00, 0x01)
def calibrate_conc_ppm(self, ppm_str):
self.write_plain_text_edit('################')
self.write_plain_text_edit('浓度标定')
if not ppm_str.isdigit():
self.write_plain_text_edit('浓度输入错误')
return
ppm_int = int(float(ppm_str))
ppm_hex = ppm_int.to_bytes(2, 'big')
self.write_register(0x01, 0x2E, ppm_hex)
time.sleep(1)
self.check_conc_ppm()
def check_conc_ppm(self):
self.reset_port_colors()
self.write_plain_text_edit('################')
self.write_plain_text_edit('浓度标定,查询')
self.read_register(0x01, 0x2E, 0x00, 0x01)
def calibrate_reset_conc_ppm(self):
self.write_plain_text_edit('################')
self.write_plain_text_edit('浓度标定,重置')
self.write_register(0x01, 0x2F, b'\x00\x00')
time.sleep(1)
self.check_reset_conc_ppm()
def check_reset_conc_ppm(self):
self.reset_port_colors()
self.write_plain_text_edit('################')
self.write_plain_text_edit('浓度标定,重置')
self.read_register(0x00, 0x64, 0x00, 0x01)
def set_out_hex_length(self,length_str):
"""设置输出的hex长度"""
self.write_plain_text_edit('设置输出的hex长度')
if not length_str.isdigit():
self.write_plain_text_edit('参数输入错误')
return
length_int = int(float(length_str))
length_hex = length_int.to_bytes(2, 'big')
self.write_register(0x00, 0x17, length_hex)
time.sleep(1)
self.get_out_hex_length()
def get_out_hex_length(self):
self.reset_port_colors()
self.write_plain_text_edit('查询输出的hex长度')
self.read_register(0x00, 0x17, 0x00, 0x01)
def write_register(self, address_hi, address_lo, value):
"""写入寄存器"""
self.modbus_data[0:6] = [0x01, self.WRITE_SINGLE_REGISTER, address_hi, address_lo, value[0],
value[1]]
self.send_bytes(bytes(self.modbus_data[:6]))
def read_register(self, address_hi, address_lo, add_num_hi, add_num_lo):
"""读取寄存器"""
self.modbus_data[0:6] = [0x01, self.READ_HOLDING_REGISTERS, address_hi, address_lo, add_num_hi, add_num_lo]
self.query_modbus_bytes = bytes(self.modbus_data[:6])
self.send_bytes(self.query_modbus_bytes)
self.write_plain_text_edit('查询中...')
self.num = 0
self.start_query_timer()
def send_bytes(self, modbus_bytes):
crc = self.data_processor.crc_rtu(modbus_bytes)
modbus_bytes = modbus_bytes + crc
modbus_send_str = self.data_processor.format_bytes_to_hexstr_space(modbus_bytes)
self.ui.data_send(modbus_send_str)
def validate_response(self, buffer):
"""验证响应"""
self.write_plain_text_edit(f'开始捕获:{buffer.hex().upper()}')
start_index = buffer.find(b'\x01\x03')
if start_index != -1 and len(buffer) >= start_index + 5:
self.write_plain_text_edit(f'捕获成功,开始验证:{buffer.hex().upper()}')
byte_count = buffer[start_index + 2]
frame_length = start_index + 5 + byte_count
if len(buffer) >= frame_length:
crc = self.data_processor.crc_rtu(buffer[start_index:frame_length - 2])
if buffer[frame_length - 2:frame_length] == crc:
self.write_plain_text_edit('验证成功')
return buffer[start_index:frame_length]
else:
self.write_plain_text_edit('验证失败')
self.write_plain_text_edit('捕获失败')
return False
def check_query_value(self):
self.num += 1
self.write_plain_text_edit(f'执行查询次数:{self.num}', log_level='debug')
if self.num > 10:
self.num = 0
timer_status = self.stop_query()
self.write_plain_text_edit('查询超时')
self.set_cursor_to_end()
return False
channel_ports = self.ui.find_selected_ser_ports()
channel_nets = self.ui.find_selected_net_ports()
if channel_ports is None:
channel_ports = []
if channel_nets is None:
channel_nets = []
if len(channel_ports) == 0 and len(channel_nets) == 0:
self.stop_query()
self.write_plain_text_edit('查询结束,在通道处查看结果')
# 如果只有一个通道,重新勾选
channel_all_nets = self.ui.find_all_net_ports()
channel_all_ports = self.ui.find_all_ser_ports()
if len(channel_all_nets) == 1:
self.ui.select_net_channel_single(channel_all_nets[0], 2)
if len(channel_all_ports) == 1:
self.ui.select_ser_channel_single(channel_all_ports[0], 2)
return False
for port_name, buffer in self.ui.buffers.items():
if port_name in channel_ports or port_name in channel_nets:
buffer_valid = self.validate_response(buffer)
if buffer_valid:
self.write_plain_text_edit('发射信号')
self.query_finished.emit(port_name, buffer_valid)
self.ui.buffers[port_name] = b''
def process_query_value(self, port_name, buffer):
"""处理查询数据"""
self.write_plain_text_edit(f'数据通道:{port_name},数据: {buffer.hex().upper()}', log_level='debug')
if len(buffer) > 6:
status = self.parse_query_value(self.query_modbus_bytes, buffer)
ind = self.ui.find_port_ser_row(port_name)
ind_net = self.ui.find_port_net_row(port_name)
if status:
if ind is not None:
self.ui.update_ser_table_status(ind, 2, status[1], status[2])
self.write_plain_text_edit(f'{port_name}{status[0]}')
self.ui.select_ser_channel_single(port_name, 0)
if ind_net is not None:
self.ui.update_net_table_status(ind_net, 2, status[1], status[2])
self.write_plain_text_edit(f'{port_name}{status[0]}')
self.ui.select_net_channel_single(port_name, 0)
else:
self.write_plain_text_edit(f'{port_name}:查询失败')
def parse_query_value(self, modbus_bytes, buffer):
"""解析查询结果"""
if self.query_modbus_bytes in [
b'\x01\x03\x01\x2C\x00\x01', # 查询环境温度标定
b'\x01\x03\x01\x2D\x00\x01', # 查询激光温度标定
b'\x01\x03\x01\x2E\x00\x01' # 查询浓度标定
]:
status_map = {
b'\x01\x03\x02\x00\x00': ('未标定', '未标定', QColor(255, 255, 255)),
b'\x01\x03\x02\x00\x01': ('标定中', '标定中', QColor(255, 255, 0)),
b'\x01\x03\x02\x00\x02': ('标定成功', '标定成功', QColor(0, 255, 0)),
b'\x01\x03\x02\x00\x03': ('标定失败', '标定失败', QColor(255, 0, 0)),
b'\x01\x03\x02\x00\x04': ('标定失败,浓度太低', '浓度太低', QColor(255, 0, 0)),
b'\x01\x03\x02\x00\x05': ('标定失败,间距太小', '标定间隔太小', QColor(255, 0, 0))
}
for key, value in status_map.items():
if key in buffer:
self.write_plain_text_edit(f'标定识别,查询指令:{key.hex().upper()} ,查询结果{buffer.hex().upper()}')
return value
return None
elif modbus_bytes == b'\x01\x03\x00\x16\x00\x01':
bytes_count = buffer[2]
query_data = buffer[3:3 + bytes_count]
air_tempera = int.from_bytes(query_data, 'big') / 100
return f'环境温度:查询结果:{air_tempera}', f'{air_tempera}', QColor(0, 255, 0)
elif modbus_bytes == b'\x01\x03\x01\x2F\x00\x01':
bytes_count = buffer[2]
query_data = buffer[3:3 + bytes_count]
conc_flag = int.from_bytes(query_data, 'big')
return f'浓度标定:重置结果:{conc_flag}', f'{conc_flag}', QColor(0, 255, 0)
elif modbus_bytes == b'\x01\x03\x00\x64\x00\x01':
bytes_count = buffer[2]
query_data = buffer[3:3 + bytes_count]
conc_calib_flag = int.from_bytes(query_data, 'big')
return f'浓度标定:查询结果:{conc_calib_flag}', f'{conc_calib_flag}', QColor(0, 255, 0)
elif modbus_bytes == b'\x01\x03\x00\x01\x00\x01':
bytes_count = buffer[2]
query_data = buffer[3:3 + bytes_count]
laser_work_tempera = int.from_bytes(query_data, 'big') / 100
return f'激光温度:查询结果:{laser_work_tempera}', f'{laser_work_tempera}', QColor(0, 255, 0)
elif modbus_bytes == b'\x01\x03\x00\x17\x00\x01':
bytes_count = buffer[2]
query_data = buffer[3:3 + bytes_count]
laser_work_tempera = int.from_bytes(query_data, 'big')
return f'激光温度:查询结果:{laser_work_tempera}', f'{laser_work_tempera}', QColor(0, 255, 0)
else:
return None
def start_query_timer(self):
"""启动查询定时器"""
if self.query_timer.isActive():
self.query_timer.stop()
self.query_timer.start(self.QUERY_INTERVAL)
self.write_plain_text_edit('timer启动', log_level='debug')
def stop_query(self):
"""停止查询定时器"""
if self.query_timer and self.query_timer.isActive():
self.query_timer.stop()
self.write_plain_text_edit('timer已经关闭')
return 'timer已经关闭'
def reset_port_colors(self):
"""重置串口通道颜色"""
for port_name in self.serial_manager.buffers.keys():
ind = self.ui.find_port_ser_row(port_name)
if ind is not None:
self.ui.reset_ser_port_color(ind)
selected_channels = self.ui.find_selected_net_ports()
for channel_name in selected_channels:
ind = self.ui.find_port_net_row(channel_name)
if ind is not None:
self.ui.reset_net_port_color(ind)
def set_cursor_to_end(self):
textCursor = self.ui.plainTextEditStastusCalib.textCursor()
textCursor.movePosition(textCursor.End)
self.ui.plainTextEditStastusCalib.setTextCursor(textCursor)