import logging import time from PyQt5.QtCore import QTimer, pyqtSignal, QObject from PyQt5.QtGui import QColor class CalibrationManager(QObject): query_finished = pyqtSignal(str, bytes) # 定义常量 WRITE_SINGLE_REGISTER = 0x06 READ_HOLDING_REGISTERS = 0x03 QUERY_INTERVAL = 1000 # 查询间隔时间,单位为毫秒 def __init__(self, serial_manager, data_processor, ui): super().__init__() self.serial_manager = serial_manager self.data_processor = data_processor self.ui = ui self.num = 0 self.modbus_data = [0] * 20 self.query_modbus_bytes = None self.query_timer = QTimer(self.ui) self.query_timer.timeout.connect(self.check_query_value) # 定时查询 self.query_finished.connect(self.process_query_value) # 处理数据 def write_plain_text_edit(self, text, log_level='info'): """统一处理 plainTextEditStastusCalib 的文本写入和日志记录""" self.ui.plainTextEditStastusCalib.appendPlainText(text) if log_level == 'info': logging.info(text) elif log_level == 'error': logging.error(text) elif log_level == 'debug': logging.debug(text) def calibrate_air_tempera(self, temp_str): """开始温度标定""" self.write_plain_text_edit('################\r\n环境温度,开始标定') if not temp_str.isdigit(): self.write_plain_text_edit('环境温度输入错误', log_level='error') return temp_int = int(float(temp_str) * 100) temp_hex = temp_int.to_bytes(2, 'big') self.write_register(0x01, 0x2C, temp_hex) time.sleep(1) self.check_air_tempera() def check_air_tempera(self): self.reset_port_colors() self.write_plain_text_edit('################\r\n空气温度标定状态:') self.read_register(0x01, 0x2C, 0x00, 0x01) def calibrate_laser_tempra(self): self.write_plain_text_edit('################') self.write_plain_text_edit('laser温度标定') self.write_register(0x01, 0x2D, b'\x00\x01') time.sleep(1) self.check_laser_tempra() def stop_calibrate_laser_tempra(self): self.write_plain_text_edit('################') self.write_register(0x01, 0x2D, b'\x00\x00') self.write_plain_text_edit('laser温度标定结束') def check_laser_tempra(self): self.reset_port_colors() self.write_plain_text_edit('查询中...') self.read_register(0x01, 0x2D, 0x00, 0x01) def check_work_tempra(self): self.reset_port_colors() self.write_plain_text_edit('查询当前激光工作点') self.read_register(0x00, 0x01, 0x00, 0x01) def calibrate_laser_tempera_start(self, temp_str): self.write_plain_text_edit('################') self.write_plain_text_edit('laser温度标定起始点修改') if not temp_str.isdigit(): self.write_plain_text_edit('环境温度输入错误') return temp_int = int(float(temp_str) * 100) temp_hex = temp_int.to_bytes(2, 'big') self.write_register(0x00, 0x16, temp_hex) time.sleep(1) self.check_laser_tempera_start() def check_laser_tempera_start(self): self.reset_port_colors() self.write_plain_text_edit('################') self.write_plain_text_edit('laser温度标定起始点查询') self.read_register(0x00, 0x16, 0x00, 0x01) def calibrate_conc_ppm(self, ppm_str): self.write_plain_text_edit('################') self.write_plain_text_edit('浓度标定') if not ppm_str.isdigit(): self.write_plain_text_edit('浓度输入错误') return ppm_int = int(float(ppm_str)) ppm_hex = ppm_int.to_bytes(2, 'big') self.write_register(0x01, 0x2E, ppm_hex) time.sleep(1) self.check_conc_ppm() def check_conc_ppm(self): self.reset_port_colors() self.write_plain_text_edit('################') self.write_plain_text_edit('浓度标定,查询') self.read_register(0x01, 0x2E, 0x00, 0x01) def calibrate_reset_conc_ppm(self): self.write_plain_text_edit('################') self.write_plain_text_edit('浓度标定,重置') self.write_register(0x01, 0x2F, b'\x00\x00') time.sleep(1) self.check_reset_conc_ppm() def check_reset_conc_ppm(self): self.reset_port_colors() self.write_plain_text_edit('################') self.write_plain_text_edit('浓度标定,重置') self.read_register(0x00, 0x64, 0x00, 0x01) def set_out_hex_length(self,length_str): """设置输出的hex长度""" self.write_plain_text_edit('设置输出的hex长度') if not length_str.isdigit(): self.write_plain_text_edit('参数输入错误') return length_int = int(float(length_str)) length_hex = length_int.to_bytes(2, 'big') self.write_register(0x00, 0x17, length_hex) time.sleep(1) self.get_out_hex_length() def get_out_hex_length(self): self.reset_port_colors() self.write_plain_text_edit('查询输出的hex长度') self.read_register(0x00, 0x17, 0x00, 0x01) def write_register(self, address_hi, address_lo, value): """写入寄存器""" self.modbus_data[0:6] = [0x01, self.WRITE_SINGLE_REGISTER, address_hi, address_lo, value[0], value[1]] self.send_bytes(bytes(self.modbus_data[:6])) def read_register(self, address_hi, address_lo, add_num_hi, add_num_lo): """读取寄存器""" self.modbus_data[0:6] = [0x01, self.READ_HOLDING_REGISTERS, address_hi, address_lo, add_num_hi, add_num_lo] self.query_modbus_bytes = bytes(self.modbus_data[:6]) self.send_bytes(self.query_modbus_bytes) self.write_plain_text_edit('查询中...') self.num = 0 self.start_query_timer() def send_bytes(self, modbus_bytes): crc = self.data_processor.crc_rtu(modbus_bytes) modbus_bytes = modbus_bytes + crc modbus_send_str = self.data_processor.format_bytes_to_hexstr_space(modbus_bytes) self.ui.data_send(modbus_send_str) def validate_response(self, buffer): """验证响应""" self.write_plain_text_edit(f'开始捕获:{buffer.hex().upper()}') start_index = buffer.find(b'\x01\x03') if start_index != -1 and len(buffer) >= start_index + 5: self.write_plain_text_edit(f'捕获成功,开始验证:{buffer.hex().upper()}') byte_count = buffer[start_index + 2] frame_length = start_index + 5 + byte_count if len(buffer) >= frame_length: crc = self.data_processor.crc_rtu(buffer[start_index:frame_length - 2]) if buffer[frame_length - 2:frame_length] == crc: self.write_plain_text_edit('验证成功') return buffer[start_index:frame_length] else: self.write_plain_text_edit('验证失败') self.write_plain_text_edit('捕获失败') return False def check_query_value(self): self.num += 1 self.write_plain_text_edit(f'执行查询次数:{self.num}', log_level='debug') if self.num > 10: self.num = 0 timer_status = self.stop_query() self.write_plain_text_edit('查询超时') self.set_cursor_to_end() return False channel_ports = self.ui.find_selected_ser_ports() channel_nets = self.ui.find_selected_net_ports() if channel_ports is None: channel_ports = [] if channel_nets is None: channel_nets = [] if len(channel_ports) == 0 and len(channel_nets) == 0: self.stop_query() self.write_plain_text_edit('查询结束,在通道处查看结果') # 如果只有一个通道,重新勾选 channel_all_nets = self.ui.find_all_net_ports() channel_all_ports = self.ui.find_all_ser_ports() if len(channel_all_nets) == 1: self.ui.select_net_channel_single(channel_all_nets[0], 2) if len(channel_all_ports) == 1: self.ui.select_ser_channel_single(channel_all_ports[0], 2) return False for port_name, buffer in self.ui.buffers.items(): if port_name in channel_ports or port_name in channel_nets: buffer_valid = self.validate_response(buffer) if buffer_valid: self.write_plain_text_edit('发射信号') self.query_finished.emit(port_name, buffer_valid) self.ui.buffers[port_name] = b'' def process_query_value(self, port_name, buffer): """处理查询数据""" self.write_plain_text_edit(f'数据通道:{port_name},数据: {buffer.hex().upper()}', log_level='debug') if len(buffer) > 6: status = self.parse_query_value(self.query_modbus_bytes, buffer) ind = self.ui.find_port_ser_row(port_name) ind_net = self.ui.find_port_net_row(port_name) if status: if ind is not None: self.ui.update_ser_table_status(ind, 2, status[1], status[2]) self.write_plain_text_edit(f'{port_name}:{status[0]}') self.ui.select_ser_channel_single(port_name, 0) if ind_net is not None: self.ui.update_net_table_status(ind_net, 2, status[1], status[2]) self.write_plain_text_edit(f'{port_name}:{status[0]}') self.ui.select_net_channel_single(port_name, 0) else: self.write_plain_text_edit(f'{port_name}:查询失败') def parse_query_value(self, modbus_bytes, buffer): """解析查询结果""" if self.query_modbus_bytes in [ b'\x01\x03\x01\x2C\x00\x01', # 查询环境温度标定 b'\x01\x03\x01\x2D\x00\x01', # 查询激光温度标定 b'\x01\x03\x01\x2E\x00\x01' # 查询浓度标定 ]: status_map = { b'\x01\x03\x02\x00\x00': ('未标定', '未标定', QColor(255, 255, 255)), b'\x01\x03\x02\x00\x01': ('标定中', '标定中', QColor(255, 255, 0)), b'\x01\x03\x02\x00\x02': ('标定成功', '标定成功', QColor(0, 255, 0)), b'\x01\x03\x02\x00\x03': ('标定失败', '标定失败', QColor(255, 0, 0)), b'\x01\x03\x02\x00\x04': ('标定失败,浓度太低', '浓度太低', QColor(255, 0, 0)), b'\x01\x03\x02\x00\x05': ('标定失败,间距太小', '标定间隔太小', QColor(255, 0, 0)) } for key, value in status_map.items(): if key in buffer: self.write_plain_text_edit(f'标定识别,查询指令:{key.hex().upper()} ,查询结果{buffer.hex().upper()}') return value return None elif modbus_bytes == b'\x01\x03\x00\x16\x00\x01': bytes_count = buffer[2] query_data = buffer[3:3 + bytes_count] air_tempera = int.from_bytes(query_data, 'big') / 100 return f'环境温度:查询结果:{air_tempera}', f'{air_tempera}', QColor(0, 255, 0) elif modbus_bytes == b'\x01\x03\x01\x2F\x00\x01': bytes_count = buffer[2] query_data = buffer[3:3 + bytes_count] conc_flag = int.from_bytes(query_data, 'big') return f'浓度标定:重置结果:{conc_flag}', f'{conc_flag}', QColor(0, 255, 0) elif modbus_bytes == b'\x01\x03\x00\x64\x00\x01': bytes_count = buffer[2] query_data = buffer[3:3 + bytes_count] conc_calib_flag = int.from_bytes(query_data, 'big') return f'浓度标定:查询结果:{conc_calib_flag}', f'{conc_calib_flag}', QColor(0, 255, 0) elif modbus_bytes == b'\x01\x03\x00\x01\x00\x01': bytes_count = buffer[2] query_data = buffer[3:3 + bytes_count] laser_work_tempera = int.from_bytes(query_data, 'big') / 100 return f'激光温度:查询结果:{laser_work_tempera}', f'{laser_work_tempera}', QColor(0, 255, 0) elif modbus_bytes == b'\x01\x03\x00\x17\x00\x01': bytes_count = buffer[2] query_data = buffer[3:3 + bytes_count] laser_work_tempera = int.from_bytes(query_data, 'big') return f'激光温度:查询结果:{laser_work_tempera}', f'{laser_work_tempera}', QColor(0, 255, 0) else: return None def start_query_timer(self): """启动查询定时器""" if self.query_timer.isActive(): self.query_timer.stop() self.query_timer.start(self.QUERY_INTERVAL) self.write_plain_text_edit('timer启动', log_level='debug') def stop_query(self): """停止查询定时器""" if self.query_timer and self.query_timer.isActive(): self.query_timer.stop() self.write_plain_text_edit('timer已经关闭') return 'timer已经关闭' def reset_port_colors(self): """重置串口通道颜色""" for port_name in self.serial_manager.buffers.keys(): ind = self.ui.find_port_ser_row(port_name) if ind is not None: self.ui.reset_ser_port_color(ind) selected_channels = self.ui.find_selected_net_ports() for channel_name in selected_channels: ind = self.ui.find_port_net_row(channel_name) if ind is not None: self.ui.reset_net_port_color(ind) def set_cursor_to_end(self): textCursor = self.ui.plainTextEditStastusCalib.textCursor() textCursor.movePosition(textCursor.End) self.ui.plainTextEditStastusCalib.setTextCursor(textCursor)