import logging import sys import time from datetime import datetime from PyQt5.QtCore import QTimer from PyQt5.QtWidgets import QApplication, QMessageBox, QInputDialog from calibration_manager import CalibrationManager from config_manager import ConfigManager from data_processor import DataProcessor from log_config import setup_logging from net_manager import NetManager from serial_manager import SerialManager from ui_mian import UiMain from ui_net import UiNet from ui_serial import UiSerial setup_logging() class LaserMethanePT(UiMain): def __init__(self): super().__init__() logging.info("LaserMethanePT 初始化开始") self.buffers = {} self.timer = QTimer() self.timer.timeout.connect(self.data_receive) self.timer_net = QTimer() self.timer_net.timeout.connect(self.data_receive_net) self.timer_check_channel = QTimer() self.timer_check_channel.timeout.connect(self.check_channel_status) # 初始化各个模块 self.serial_window = UiSerial() self.net_window = UiNet() self.serial_manager = SerialManager(self.buffers) self.net_manager = NetManager(self.buffers) self.data_processor = DataProcessor() self.config_manager = ConfigManager() self.calibration_manager = CalibrationManager(self.serial_manager, self.data_processor, self) self.init_ui() self.check_cfg_ini() def init_ui(self): # 绑定按钮事件 # 从配置菜单,打开串口设置窗口 self.actionSerial.triggered.connect(self.open_serial_window) self.serial_window.pushButtonOpenSerial.clicked.connect(self.port_open) #打开串口 self.serial_window.pushButtonCloseSerial.clicked.connect(self.port_close) #关闭串口,检测串口在内部绑定 # 从配置菜单,打开网络设置窗口 self.actionNet.triggered.connect(self.open_net_window) self.net_window.pushButton_openNet.clicked.connect(self.net_open) self.net_window.pushButton_closeNet.clicked.connect(self.net_close) # 串口通信表格按钮时间,选择的信号和槽 self.pushButtonSerAll.clicked.connect(self.select_ser_channel_all) self.pushButtonSerNo.clicked.connect(self.select_ser_channel_none) self.pushButtonSerClear.clicked.connect(self.clear_ser_channel_all) # 网口数据通道表格按钮,信号和槽 self.pushButtonNetAll.clicked.connect(self.select_net_channel_all) self.pushButtonNetNo.clicked.connect(self.select_net_channel_none) self.pushButtonNetClear.clicked.connect(self.clear_net_channel_all) self.pushButtonSend.clicked.connect(self.data_send_form_textEdit) # 快捷指令扩展区域 self.pushButton_expend.clicked.connect(self.adjust_sidebar) # 清除发送按钮 self.pushButtonClearSend.clicked.connect(self.send_data_clear) # 清除接收按钮 self.pushButtonClearReceive.clicked.connect(self.receive_data_clear) # 连接按钮点击事件到槽函数 for i in range(self.quick_num): # 从0到20 index_str = f"{i:02}" # 确保编号为两位数字形式 buttonName = f"pushButtonQuick_{index_str}" button = getattr(self, buttonName) if button: button.clicked.connect(lambda checked, idx=i: self.onButtonClick(idx)) self.double_click_timers = {} # 存储双击定时器 # 标定功能 # 当点击温度标定按钮时,连接到tempra_calibrate方法 self.pushButtonCalibTempra.clicked.connect(self.air_tempera_calibration) # 当点击检查温度标定结果按钮时,连接到calib_tempra_result方法 self.pushButtonCheckTempra.clicked.connect(self.air_tempera_calib_result) # 当点击激光温度标定按钮时,连接到laser_calibrate方法 self.pushButtonCalibLaserTemp.clicked.connect(self.laser_tempera_calibration) # 当点击检查激光温度标定结果按钮时,连接到calib_laser_result方法 self.pushButtonCheckLaserTempra.clicked.connect(self.laser_tempera_calib_result) # 当点击停止激光标定按钮时,连接到stop_laser_calib方法 self.pushButtonCalibLaserStop.clicked.connect(self.stop_laser_tempera_calibration) # 当点击检查工作温度标定结果按钮时,连接到calib_work_tempra_result方法 self.pushButtonCheckWorkTempra.clicked.connect(self.laser_work_tempra_result) # 当点击开始激光温度标定按钮时,连接到相应的方法 self.pushButtonCalibTempraLaserStart.clicked.connect(self.laser_tempera_start_calibration) # 当点击检查开始激光温度标定结果按钮时,连接到相应的方法 self.pushButtonCheckTempraLaserStart.clicked.connect(self.laser_tempera_start_result) # 当点击浓度标定按钮时,连接到conc_calibrate方法 self.pushButtonCalibConcen.clicked.connect(self.conc_ppm_calibration) # 当点击检查浓度标定结果按钮时,连接到calib_concen_result方法 self.pushButtonCheckConcen.clicked.connect(self.conc_ppm_calib_result) # 当点击重置浓度标定按钮时,连接到reset_conc_calibrate方法 self.pushButtonCalibConcenReset.clicked.connect(self.reset_conc_ppm_calibration) # 当点击检查重置浓度标定结果按钮时,连接到reset_concen_result方法 self.pushButtonCheckConcenReset.clicked.connect(self.reset_conc_ppm_result) self.pushButtonCalibHexLength.clicked.connect(self.set_out_hex_length) self.pushButtonCheckHexLength.clicked.connect(self.get_out_hex_length) def closeEvent(self, event): self.save_cfg_ini() self.serial_manager.port_all_close() # 调用父类的关闭事件处理函数 logging.info("LaserMethanePT 关闭") super().closeEvent(event) def open_serial_window(self): self.serial_window.show() def port_open(self): # 打开串口 port_name = self.serial_window.get_port() baudrate = self.serial_window.get_baudrate() bytesize = self.serial_window.get_bytesize() parity = self.serial_window.get_parity() stopbits = self.serial_window.get_stopbit() flowcontrol = self.serial_window.get_flow() self.stackedWidget.setCurrentIndex(1) self.actionNet.setEnabled(False) if self.serial_manager.open_port(port_name, baudrate, bytesize, parity, stopbits, flowcontrol): self.add_item_ser_channel(port_name) # 定时器接收数据 # 打开串口接收定时器,周期为20ms self.timer.start(20) def port_close(self): # 关闭串口 port_name = self.serial_window.get_port() if self.serial_manager.close_port(port_name): self.delete_item_ser_channel(port_name) def clear_ser_channel_all(self): self.calibration_manager.reset_port_colors() self.clear_ser_channel() self.serial_manager.port_all_close() def clear_net_channel_all(self): self.calibration_manager.reset_port_colors() self.clear_net_channel() # self.net_manager.net_all_close() def open_net_window(self): self.net_window.show() def net_open(self): ip_str = self.net_window.get_local_ip() port_str = self.net_window.get_port() net_name = ip_str + ':' + port_str self.net_manager.open_net(net_name) self.stackedWidget.setCurrentIndex(0) self.actionSerial.setEnabled(False) # 打开串口接收定时器,周期为20ms self.timer_net.start(20) self.timer_check_channel.start(10000) # 添加定时器,每10秒检查一次通道状态 def net_close(self): ip_str = self.net_window.get_local_ip() port_str = self.net_window.get_port() net_name = ip_str + ':' + port_str self.net_manager.close_net(net_name) def data_receive(self): # 处理接收到的数据 for port_name in self.serial_manager.serials: data = self.serial_manager.read_data(port_name) if data is False: time.sleep(1) ind = self.find_port_ser_row(port_name) self.plainTextEditReceive.insertPlainText(f"[{port_name}] 链接断开\r\n") self.set_background_color_ser_channel(ind, 0, 'red') status = self.serial_manager.reopen_port(port_name) if status: self.plainTextEditReceive.insertPlainText(f"[{port_name}] 重连成功\r\n") self.set_background_color_ser_channel(ind, 0, 'green') if data: # hex 格式 if self.checkBoxHexReceive.isChecked(): data_bytes = data decoded_data = self.data_processor.format_bytes_to_hexstr_space(data_bytes) data_hex_str = self.data_processor.format_bytes_to_hexstr(data_bytes) # 保存日志 if self.checkBoxAutoSaveLog.isChecked(): self.data_processor.auto_save_log_hex(data_bytes, port_name) else: decoded_data = self.data_processor.decode_data(data) # 保存日志 if self.checkBoxAutoSaveLog.isChecked(): self.data_processor.auto_save_log_asc(decoded_data, port_name) # 更新UI显示接收的数据 ind = self.find_port_ser_row(port_name) if ind is None: return # 找不到对应的行,跳过处理 if self.tableWidgetSerChannel.item(ind, 0).checkState() == 2: if 'data_hex_str' in locals(): self.disp_hex_receive(data_hex_str) # 显示在表格中 self.set_cursor_to_end() # 光标移动到末尾 if self.checkBoxAddDate.isChecked(): nowTime = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] self.plainTextEditReceive.insertPlainText(f" {nowTime} ") self.plainTextEditReceive.insertPlainText(f"[{port_name}] ") if self.checkBoxCRLF.isChecked(): decoded_data += '\r\n' self.plainTextEditReceive.insertPlainText(decoded_data) self.set_cursor_to_end() # 光标移动到末尾 def data_send(self, text=None): # 发送数据 selected_name = self.find_selected_ser_ports() # 检查是否有打开的串口 selected_ports = [port_name for port_name, ser in self.serial_manager.serials.items() if ser.is_open and port_name in selected_name] selected_channels = self.find_selected_net_ports() if not selected_ports and self.stackedWidget.currentIndex() == 1: # QMessageBox.critical(self, '串口异常', '没有打开的串口!') self.plainTextEditReceive.insertPlainText('没有打开的串口!\r\n') logging.debug('没有打开的串口!') else: for port_name in selected_ports: ser = self.serial_manager.serials[port_name] if ser.isOpen(): if text is None: input_s = self.plainTextEditSend.toPlainText() else: input_s = text # 判断是否为非空字符串 if input_s != "": # HEX发送 if self.checkBoxHexSend.isChecked(): send_data = self.data_processor.format_hexstr_space_to_bytes(input_s) if send_data is None: return None if self.checkBoxAddCRC.isChecked(): crc = self.data_processor.crc_rtu(send_data) send_data = send_data + crc # ASCII发送 else: send_data = self.data_processor.encode_data(input_s) # 以下显示设置 # 时间显示 if self.checkBoxAddDate.isChecked(): nowTime = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] self.plainTextEditReceive.insertPlainText(f" {nowTime} ") # HEX接收显示 if self.checkBoxHexReceive.isChecked(): self.plainTextEditReceive.insertPlainText( self.data_processor.format_bytes_to_hexstr_space(send_data)) # ASCII接收显示 else: self.plainTextEditReceive.insertPlainText(self.data_processor.decode_data(send_data)) # 接收换行 if self.checkBoxCRLF.isChecked(): self.plainTextEditReceive.insertPlainText('\r\n') self.set_cursor_to_end() self.serial_manager.write_data(port_name, send_data) self.set_cursor_to_end() if not selected_channels and self.stackedWidget.currentIndex() == 0: # QMessageBox.critical(self, '网络异常', '没有打开的网络!') logging.debug('没有打开的网络!') self.plainTextEditReceive.insertPlainText('没有打开的网络!') return else: for channal_name in selected_channels: if text is None: input_s = self.plainTextEditSend.toPlainText() else: input_s = text # 判断是否为非空字符串 if input_s != "": # HEX发送 if self.checkBoxHexSend.isChecked(): send_data = self.data_processor.format_hexstr_space_to_bytes(input_s) if send_data is None: return None if self.checkBoxAddCRC.isChecked(): crc = self.data_processor.crc_rtu(send_data) send_data = send_data + crc # ASCII发送 else: send_data = self.data_processor.encode_data(input_s) # 以下显示设置 # 时间显示 if self.checkBoxAddDate.isChecked(): nowTime = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] self.plainTextEditReceive.insertPlainText(f" {nowTime} ") # HEX接收显示 if self.checkBoxHexReceive.isChecked(): self.plainTextEditReceive.insertPlainText( self.data_processor.format_bytes_to_hexstr_space(send_data)) # ASCII接收显示 else: self.plainTextEditReceive.insertPlainText(self.data_processor.decode_data(send_data)) # 接收换行 if self.checkBoxCRLF.isChecked(): self.plainTextEditReceive.insertPlainText('\r\n') self.set_cursor_to_end() self.net_manager.write_data(send_data, channal_name) def data_receive_net(self): for net_name in self.net_manager.net_connections: data = self.net_manager.read_data(net_name) if data: # logging.debug(f"Received data from <{data[1][0]}:{data[1][1]} > {data[0]}") channel_name = data[1][0] + ':' + str(data[1][1]) # hex 格式 if self.checkBoxHexReceive.isChecked(): data_bytes = data[0] decoded_data = self.data_processor.format_bytes_to_hexstr_space(data_bytes) data_hex_str = self.data_processor.format_bytes_to_hexstr(data_bytes) # 保存日志 if self.checkBoxAutoSaveLog.isChecked(): self.data_processor.auto_save_log_hex(data_bytes) if self.checkBoxAutoSaveDB.isChecked(): self.data_processor.auto_save_hex_to_db(data_bytes) else: decoded_data = self.data_processor.decode_data(data[0]) logging.debug(f"Received data from {decoded_data} ") # 保存日志 if self.checkBoxAutoSaveLog.isChecked(): self.data_processor.auto_save_log_asc(decoded_data) # 更新UI显示接收的数据 if not self.find_port_net_row(channel_name): self.add_item_net_channel(channel_name) ind = self.find_port_net_row(channel_name) if ind is None: return # 找不到对应的行,跳过处理 else: self.set_background_color_net_channel(ind, 0, 'green') if 'data_hex_str' in locals(): try: self.update_net_table_status(ind, 0, data_hex_str[0:8], 'green') except Exception as e: logging.error(f"Error updating net table status: {e}") if self.tableWidgetNetChannel.item(ind, 0).checkState() == 2: if 'data_hex_str' in locals(): self.disp_hex_receive(data_hex_str) # 显示在表格中 self.set_cursor_to_end() # 光标移动到末尾 if self.checkBoxAddDate.isChecked(): nowTime = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] self.plainTextEditReceive.insertPlainText(f" {nowTime} ") self.plainTextEditReceive.insertPlainText(f"<{data[1][0]}:{data[1][1]} > ") if self.checkBoxCRLF.isChecked(): decoded_data += '\r\n' self.plainTextEditReceive.insertPlainText(decoded_data) self.set_cursor_to_end() # 光标移动到末尾 def check_channel_status(self): all_channel_name = self.find_all_net_ports() for channel_name in all_channel_name: self.update_net_table_status(self.find_port_net_row(channel_name), 0, '', 'white') def data_send_channel_warning(self): selected_name = self.find_selected_ser_ports() selected_num = len(selected_name) selceted_channel = self.find_selected_net_ports() selccted_net_num = len(selceted_channel) if selected_num == 0 and selccted_net_num == 0: QMessageBox.critical(self, '串口异常', '请先选择串口或网口通道!') self.plainTextEditReceive.insertPlainText('请先选择串口或网口通道!\r\n') logging.debug('请先选择串口或网口通道!') return None if selected_num + selccted_net_num > 1: reply = QMessageBox.question(None, "警告", "发送所有选中通道,是否继续执行?", QMessageBox.Yes | QMessageBox.No, QMessageBox.No) if reply != QMessageBox.Yes: return None return QMessageBox.Yes def data_send_form_textEdit(self): reply = self.data_send_channel_warning() if reply == QMessageBox.Yes: self.data_send() def air_tempera_calibration(self): # 开始温度标定 reply = self.data_send_channel_warning() if reply == QMessageBox.Yes: temp_str = self.lineEditTempra.text() self.calibration_manager.calibrate_air_tempera(temp_str) def air_tempera_calib_result(self): reply = self.data_send_channel_warning() if reply == QMessageBox.Yes: self.calibration_manager.check_air_tempera() def laser_tempera_calibration(self): reply = self.data_send_channel_warning() if reply == QMessageBox.Yes: self.calibration_manager.calibrate_laser_tempra() def stop_laser_tempera_calibration(self): self.calibration_manager.stop_calibrate_laser_tempra() def laser_tempera_calib_result(self): reply = self.data_send_channel_warning() if reply == QMessageBox.Yes: self.calibration_manager.check_laser_tempra() def laser_work_tempra_result(self): reply = self.data_send_channel_warning() if reply == QMessageBox.Yes: self.calibration_manager.check_work_tempra() def laser_tempera_start_calibration(self): reply = self.data_send_channel_warning() if reply == QMessageBox.Yes: text_str = self.lineEditTempraLaserStart.text() self.calibration_manager.calibrate_laser_tempera_start(text_str) def laser_tempera_start_result(self): reply = self.data_send_channel_warning() if reply == QMessageBox.Yes: self.calibration_manager.check_laser_tempera_start() def conc_ppm_calibration(self): reply = self.data_send_channel_warning() if reply == QMessageBox.Yes: ppm_str = self.lineEditConcen.text() self.calibration_manager.calibrate_conc_ppm(ppm_str) def conc_ppm_calib_result(self): reply = self.data_send_channel_warning() if reply == QMessageBox.Yes: self.calibration_manager.check_conc_ppm() def reset_conc_ppm_calibration(self): reply = self.data_send_channel_warning() if reply == QMessageBox.Yes: self.calibration_manager.calibrate_reset_conc_ppm() def reset_conc_ppm_result(self): reply = self.data_send_channel_warning() if reply == QMessageBox.Yes: self.calibration_manager.check_reset_conc_ppm() def set_out_hex_length(self): reply = self.data_send_channel_warning() if reply == QMessageBox.Yes: length_str = self.lineEditHexLenth.text() self.calibration_manager.set_out_hex_length(length_str) def get_out_hex_length(self): reply = self.data_send_channel_warning() if reply == QMessageBox.Yes: self.calibration_manager.get_out_hex_length() def adjust_sidebar(self): if self.widget_2.isHidden(): self.widget_2.show() else: self.widget_2.hide() def send_data_clear(self): self.plainTextEditSend.clear() def receive_data_clear(self): self.plainTextEditReceive.clear() def onButtonClick(self, idx): # 槽函数处理按钮点击事件 if idx not in self.double_click_timers: self.double_click_timers[idx] = None if self.double_click_timers[idx] is None: self.double_click_timers[idx] = True QTimer.singleShot(200, lambda: self.onButtonSigleClick(idx)) else: self.onButtonDoubleClick(idx) def onButtonSigleClick(self, idx): index_str = f"{idx:02}" if self.double_click_timers[idx]: self.double_click_timers[idx] = None lineEdit = getattr(self, f"lineEditQuick_{index_str}") print(f"Text in lineEdit_{idx}: {lineEdit.text()}") text = lineEdit.text() if self.checkBox_return.isChecked(): text = text + "\r\n" reply = self.data_send_channel_warning() if reply == QMessageBox.Yes: self.data_send(text) def onButtonDoubleClick(self, idx): # 槽函数处理按钮双击事件 self.double_click_timers[idx] = None index_str = f"{idx:02}" print(f"Double click detected on Button {index_str}.") button = getattr(self, f"pushButtonQuick_{index_str}") new_name, ok = QInputDialog.getText(self, 'Button Rename', 'Enter new button name:') if ok and new_name: button.setText(new_name) def check_cfg_ini(self): row_count = self.tableWidget.rowCount() if not self.config_manager.read_config(): self.config_manager.create_default_config(row_count) else: self.config_manager.read_config() # 更主界面首发配置 self.set_checkBoxHexReceive(int(self.config_manager.get_hex_receive())) self.set_checkBoxHexSend(int(self.config_manager.get_hex_send())) self.set_checkBoxCRLF(int(self.config_manager.get_cr_lf())) self.set_checkBoxAddDate(int(self.config_manager.get_add_date())) self.set_checkBoxAutoSaveDB(int(self.config_manager.get_auto_sav_db())) self.set_checkBoxAutoSaveLog(int(self.config_manager.get_auto_sav_log())) self.set_checkBoxAddCRC(int(self.config_manager.get_add_crc_rtu())) # 更新串口配置 self.serial_window.set_port(self.config_manager.get_port()) self.serial_window.set_baudrate(self.config_manager.get_baudrate()) self.serial_window.set_parity(self.config_manager.get_parity()) # 更新网口配置 self.net_window.set_port(self.config_manager.get_net_port()) # 更新modbus 配种 self.data_processor.update_funcode(self.config_manager.get_funcode()) self.data_processor.update_position(self.config_manager.get_position()) self.data_processor.update_log_size(self.config_manager.get_log_time()) # 更新显示界面配置 for i in range(row_count): row_str = self.config_manager.get_dishex_row(i) row_str_split = row_str.split('|') len_split = len(row_str_split) for j in range(len_split - 1): self.set_disp_hex(i, j, row_str_split[j]) self.set_disp_select(i, int(row_str_split[len_split - 1])) # 更新快捷键配置 for i in range(self.quick_num): idx = f"{i:02}" quick_str = self.config_manager.get_quick_row(i) quick_str_split = quick_str.split('|') if len(quick_str_split) == 2: # 打印字典查看结果,并赋值 getattr(self, f'pushButtonQuick_{idx}').setText(quick_str_split[1]) getattr(self, f'lineEditQuick_{idx}').setText(quick_str_split[0]) def save_cfg_ini(self): # 主界面保存配置 self.config_manager.set_hex_receive(str(self.get_checkBoxHexReceive())) self.config_manager.set_hex_send(str(self.get_checkBoxHexSend())) self.config_manager.set_cr_lf(str(self.get_checkBoxCRLF())) self.config_manager.set_add_date(str(self.get_checkBoxAddDate())) self.config_manager.set_auto_sav_db(str(self.get_checkBoxAutoSaveDB())) self.config_manager.set_auto_sav_log(str(self.get_checkBoxAutoSaveLog())) self.config_manager.set_add_crc_rtu(str(self.get_checkBoxAddCRC())) # 保存当串口 self.config_manager.set_port(self.serial_window.get_port()) self.config_manager.set_baudrate(str(self.serial_window.get_baudrate())) self.config_manager.set_parity(self.serial_window.get_parity()) # 保存网口 self.config_manager.set_net_port(self.net_window.get_port()) # 保存数据显示页面配置 row_count = self.tableWidget.rowCount() for i in range(row_count): rowstr = [] for col in range(self.tableWidget.columnCount() - 2): item = self.tableWidget.item(i, col) if item is not None: rowstr.append(item.text()) else: rowstr.append('') # 如果单元格为空,则添加空字符串 selectStr = str(self.tableWidget.item(i, 0).checkState()) row_string = '|'.join(rowstr) + f'|{selectStr}' self.config_manager.set_dishex_row(i, row_string) # 保存快捷键配置 for i in range(self.quick_num): # 假设有20个按钮 index_str = f"{i:02}" # 确保编号为两位数字形式 lineEditName = f"lineEditQuick_{index_str}" buttonName = f"pushButtonQuick_{index_str}" # 格式化按钮名称,确保两位数 set_text = getattr(self, lineEditName).text() + "|" + getattr(self, buttonName).text() self.config_manager.set_quick_buttion(i, set_text) if __name__ == "__main__": app = QApplication(sys.argv) window = LaserMethanePT() window.show() sys.exit(app.exec_())