# -*- coding: utf-8 -*- """ Created on Sun Sep 29 09:01:22 2024 @author: WANGXIBAO """ import datetime import os import re import socket import sys import time from configparser import ConfigParser from PyQt5 import QtWidgets from PyQt5.QtCore import QTimer,Qt from PyQt5.QtWidgets import QMessageBox, QFileDialog, QInputDialog, QTableWidgetItem from PyNetUi import Ui_UartAssistant class Pyqt5Net(QtWidgets.QWidget, Ui_UartAssistant): # %%初始化程序 def __init__(self): super(Pyqt5Net, self).__init__() self.setupUi(self) self.quick_num = 99 self.init() #信号和槽 self.IniPath = "PyNet.ini" self.log_time = 10 self.funcode = "F4" self.position = 5 self.CheckCfgIniData() #更改按钮的文字 self.log_count = 0 self.log_buffer = {} #定义成一个字典,键--文件名,值--对于收到的字符串列表 # 设置Logo和标题 #self.setWindowIcon(QIcon('favicon.ico')) self.setWindowTitle("网口调试助手") # 设置禁止拉伸窗口大小 #self.setFixedSize(self.width(), self.height()) # 发送数据和接收数据数目置零 self.data_num_sended = 0 self.lineEditSendNum.setText(str(self.data_num_sended)) self.data_num_received = 0 self.lineEditReceiveNum.setText(str(self.data_num_received)) # 串口关闭按钮使能关闭 self.pushButton_closeNet.setEnabled(False) # 发送框、文本框清除 self.textEditReceive.setPlainText("") self.textEditReceive.setMaximumBlockCount(5000) self.textEditSend.setText("") self.get_all_local_ips() # 定时器接收数据 #self.timer = QTimer() self.timer: QTimer = QTimer() #self.timer.timeout.connect(self.data_receive) timeout_signal = self.timer.timeout timeout_signal.connect(self.data_receive) #加载快捷指令 self.widget_6.hide() #加载快捷指令的按键值 #建立信号与槽 def init(self): self.pushButton_openNet.clicked.connect(self.open_net) self.pushButton_closeNet.clicked.connect(self.close_net) self.pushButtonSend.clicked.connect(lambda: self.data_send(text_quick=None)) # 清除发送按钮 self.pushButtonClearSend.clicked.connect(self.send_data_clear) # 清除接收按钮 self.pushButtonClearReceive.clicked.connect(self.receive_data_clear) # # 定时发送数据 self.timer_send = QTimer() self.timer_send.timeout.connect(self.data_send) self.checkBoxReapitSend.stateChanged.connect(self.data_send_timer) # 快捷指令扩展区域 self.pushButton_expend.clicked.connect(self.adjust_sidebar) # 清空网络通道列表 self.pushButtonClearChannel.clicked.connect(self.channel_clear) # 动态创建控件并存储引用 for i in range(self.quick_num): # 从0到20 index_str = f"{i:02}" # 确保编号为两位数字形式 horizontalLayoutName = f"horizontalLayoutQuick_{index_str}" horizontalLayout = QtWidgets.QHBoxLayout() horizontalLayout.setObjectName(horizontalLayoutName) # 创建 QLineEdit 并设置动态属性 lineEditName = f"lineEditQuick_{index_str}" setattr(self, lineEditName, QtWidgets.QLineEdit(self.layoutWidget1)) getattr(self, lineEditName).setObjectName(lineEditName) horizontalLayout.addWidget(getattr(self, lineEditName)) # 创建 QPushButton 并设置动态属性 buttonName = f"pushButtonQuick_{index_str}" setattr(self, buttonName, QtWidgets.QPushButton(self.layoutWidget1)) getattr(self, buttonName).setObjectName(buttonName) horizontalLayout.addWidget(getattr(self, buttonName)) self.verticalLayout_8.addLayout(horizontalLayout) # 连接按钮点击事件到槽函数 button = getattr(self, buttonName) if button: button.clicked.connect(lambda checked, idx=i: self.onButtonClick(idx)) # 建立表格解析并显示hex self.tableWidget.setRowCount(self.quick_num) for i in range(self.quick_num): # 从0到20 item = QtWidgets.QTableWidgetItem() item.setCheckState(Qt.Unchecked) self.tableWidget.setItem(i, 0, item) self.double_click_timers = {} # 存储双击定时器 #%% 重写关闭按钮 def closeEvent(self, event): self.SavConfig() self.close_net() # 调用父类的关闭事件处理函数 super().closeEvent(event) #加载本地网络地址 def get_all_local_ips(self): ip_addresses = [] for interface in socket.getaddrinfo(socket.gethostname(), None): address = interface[4][0] if ':' not in address: # IPv6地址包含冒号 ip_addresses.append(address) print(ip_addresses) for ip in ip_addresses: self.comboBox_localAddr.insertItem(0, ip) self.comboBox_localAddr.setCurrentIndex(0) # 打开网络连接 def open_net(self): IP = self.comboBox_localAddr.currentText() port = self.lineEdit_port.text() udp_addr = (IP, int(port)) self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #创建接收空socket print("绑定地址", udp_addr) try: self.udp_socket.bind(udp_addr) print("开始监听。。。") # 设置套接字为非阻塞模式 self.udp_socket.setblocking(False) except Exception as e: print(f"Error reading configuration: {e}") return None #使能相关按钮 self.pushButton_closeNet.setEnabled(True) self.pushButton_openNet.setEnabled(False) # 打开串口接收定时器,周期为1ms self.timer.start(20) def close_net(self): if hasattr(self, 'udp_socket'): self.timer.stop() self.udp_socket.close() #使能相关按钮 self.pushButton_closeNet.setEnabled(False) self.pushButton_openNet.setEnabled(True) print("关闭网络连接") #接收数据 def data_receive(self): try: recv_data = self.udp_socket.recvfrom(1024) # 1024表示本次接收的最大字节数 out_s = '' # 打印接收到的数据 if self.checkBoxHexReceive.checkState(): line_utf8 = recv_data[0].hex().upper() out_s = ' '.join(['{:02X}'.format(b) for b in bytes.fromhex(line_utf8)]) else: line_utf8 = recv_data[0].decode('utf-8', errors='replace') print("[From %s:%d]:%s" % (recv_data[1][0], recv_data[1][1], line_utf8)) num = len(line_utf8) # 接收窗口中增加来源地址显示 recv_addr = recv_data[1][0] + ':' + str(recv_data[1][1]) if not self.is_in_comboBox_channel(recv_addr): self.comboBox_channel.addItem(recv_addr) # 在接收窗口显示 # 获取到text光标 textCursor = self.textEditReceive.textCursor() # 滚动到底部 textCursor.movePosition(textCursor.End) # 设置光标到text中去 self.textEditReceive.setTextCursor(textCursor) #选择要显示的通道 if recv_addr == self.comboBox_channel.currentText() or self.comboBox_channel.currentText() == "ALL": #self.textEditReceive.insertPlainText('<' + recv_addr+ '> ' ) if self.checkBoxAddDate.isChecked(): nowTime = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") nowTime = nowTime[:-3] self.textEditReceive.insertPlainText(nowTime + " ") #self.add_line_to_textedit(nowTime + " ") # HEX显示数据 if self.checkBoxHexReceive.checkState(): # line_utf8_bytes = line_utf8.hex() # 编码为 UTF-8 字节串 # for byte in line_utf8_bytes: # out_s += '{:02X} '.format(byte) # 使用大写字母 X # self.textEditReceive.insertPlainText(out_s) self.textEditReceive.insertPlainText('<' + recv_addr + '> ' + out_s) #self.add_line_to_textedit('<' + recv_addr + '> ' + out_s) # 解析数据 self.Disp_hex_receive(line_utf8) # ASCII显示数据 else: #print("解码前",datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")) self.textEditReceive.insertPlainText('<' + recv_addr + '> ' + line_utf8) #self.add_line_to_textedit('<' + recv_addr + '> ' + line_utf8) #print("解码数据",datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")) # 接收换行 if self.checkBoxCRLF.isChecked(): self.textEditReceive.insertPlainText('\r\n') # 统计接收字符的数量 self.data_num_received += num self.lineEditReceiveNum.setText(str(self.data_num_received)) # 自动保存日志 if self.checkBoxAutoSaveLog.isChecked(): if self.checkBoxHexReceive.checkState(): #根据功能码保存数据 if line_utf8[2*self.position:2*self.position+2] == self.funcode: sav_name = line_utf8[0:8] + '_' + datetime.datetime.now().strftime("%Y-%m-%d") + '.txt' sav_str = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")[:23] + " " + out_s[12:] + "\r\n" else: sav_name = datetime.datetime.now().strftime("%Y-%m-%d") + '.log' sav_str = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")[:23] + " " + out_s + "\r\n" else: line = line_utf8.split(',') if len(line) == 2: sav_name = line[0] + '_' + datetime.datetime.now().strftime("%Y-%m-%d") + '.txt' sav_str = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")[:23] + " " + line[1] else: sav_name = datetime.datetime.now().strftime("%Y-%m-%d") + '.log' sav_str = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")[:23] + " " + line_utf8 # 将日志信息添加到缓冲区 if sav_name not in self.log_buffer: self.log_buffer[sav_name] = [] self.log_buffer[sav_name].append(sav_str) # 当缓冲区中的记录达到10条时,写入文件 if len(self.log_buffer[sav_name]) >= self.log_time: with open(sav_name, mode='a', newline='', encoding='utf-8', errors='replace') as file: file.writelines(self.log_buffer[sav_name]) self.log_buffer[sav_name].clear() except socket.error as e: #print(f"Socket error: {e}") pass def data_send(self, text_quick=None): if text_quick == None: input_s = self.textEditSend.toPlainText() else: input_s = text_quick try: if input_s != "": # 时间显示 if self.checkBoxAddDate.isChecked(): self.textEditReceive.insertPlainText((time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) + " ") # HEX发送 if self.checkBoxHexSend.isChecked(): input_s = input_s.strip() send_list = [] while input_s != '': try: num = int(input_s[0:2], 16) except ValueError: QMessageBox.critical(self, '串口异常', '请输入规范十六进制数据,以空格分开!') return None input_s = input_s[2:].strip() send_list.append(num) input_s = bytes(send_list) # ASCII发送 else: input_s = (input_s).encode('utf-8') input_s = re.sub(b'(? 1000: current_lines = current_lines[-1000:] # 只保留最新的100行 # 将更新后的行列表转换回字符串并设置为textEdit的内容 self.textEditReceive.setPlainText('\n'.join(current_lines)) #检查是否在comboBox_channel中 def is_in_comboBox_channel(self, addr_rece): for i in range(self.comboBox_channel.count()): if addr_rece == self.comboBox_channel.itemText(i): return True return False def channel_clear(self): self.comboBox_channel.lineEdit().clear() for i in range(self.comboBox_channel.count() - 1): self.comboBox_channel.removeItem(1) self.comboBox_channel.setCurrentIndex(0) #开关快捷指令栏 def adjust_sidebar(self): if self.widget_6.isHidden(): self.widget_6.show() else: self.widget_6.hide() def onPushButtonQuickClicked(self, line_edit): text = getattr(self, line_edit).text() #print(f"Button clicked: {text}") if self.checkBox_return.isChecked(): text = text + "\r\n" self.data_send(text) def CheckCfgIniData(self): if not os.path.exists(self.IniPath): config = ConfigParser() #UI_config config.add_section('UI_config') config.set('UI_config', 'port', '9000') config.set('UI_config', 'hex_send', '0') config.set('UI_config', 'hex_receive', '0') config.set('UI_config', 'add_date', '0') config.set('UI_config', 'cr_lf', '0') config.set('UI_config', 'auto_sav_log', '2') config.set('UI_config', 'modbus_csv', '0') # Modbus_config config.add_section('Modbus_config') config.set('Modbus_config', 'funcode', 'F4') #识别码 config.set('Modbus_config', 'position', '5') #识别码所在位置 #DisHex_config config.add_section('DisHex_config') for i in range(self.quick_num): idx = f'{i:02}' rowname = f'row{idx}' config.set('DisHex_config', rowname, '|||||0') #Quick_config config.add_section('Quick_config') config.set('Quick_config', 'log_time', '10') for i in range(self.quick_num): idx = f'{i:02}' button_name = f'Button{idx}' # 格式化按钮名称,确保两位数 config.set('Quick_config', button_name, '') with open(self.IniPath, 'w', encoding='utf-8') as f: config.write(f) #读取数据 config = ConfigParser() config.read(self.IniPath, encoding='utf-8') try: # UI_config port = config.get('UI_config', 'port') hex_send = int(config.get('UI_config', 'hex_send')) hex_receive = int(config.get('UI_config', 'hex_receive')) add_date = int(config.get('UI_config', 'add_date')) cr_lf = int(config.get('UI_config', 'cr_lf')) auto_sav_log = int(config.get('UI_config', 'auto_sav_log')) modbus_csv = int(config.get('UI_config', 'modbus_csv')) self.lineEdit_port.setText(port) self.checkBoxHexSend.setChecked(hex_send) self.checkBoxHexReceive.setChecked(hex_receive) self.checkBoxAddDate.setChecked(add_date) self.checkBoxCRLF.setChecked(cr_lf) self.checkBoxAutoSaveLog.setChecked(auto_sav_log) self.checkBoxAutoSaveCsv.setChecked(modbus_csv) # Modbus_config self.funcode = config.get('Modbus_config', 'funcode') self.position = int(config.get('Modbus_config', 'position')) # DisHex_config for i in range(self.quick_num): idx = f'{i:02}' rowname = f'row{idx}' rowstr = config.get('DisHex_config', rowname) rowstr_split = rowstr.split('|') for col in range(len(rowstr_split)-1): item = QtWidgets.QTableWidgetItem() item.setText(rowstr_split[col]) self.tableWidget.setItem(i, col, item) select = int(rowstr_split[col+1]) item = self.tableWidget.item(i, 0) # 先反读回来,再设置,不然text丢失 item.setCheckState(select) self.tableWidget.setItem(i, 0, item) # Quick_config # 创建一个空字典来存储按钮名称和对应的配置 self.log_time = int(config.get('Quick_config', 'log_time')) # 循环遍历按钮编号,从0到19 for i in range(self.quick_num): idx = f'{i:02}' button_name = f'Button{idx}' # 格式化按钮名称,确保两位数 # 使用 get 方法安全地获取配置,如果不存在则返回空字符串 config_value = config.get('Quick_config', button_name, fallback='') config_value_split = config_value.split('|') # 将按钮名称和配置信息存储在字典中 if len(config_value_split) == 2: # 打印字典查看结果,并赋值 print(button_name, config_value_split[0], config_value_split[1]) getattr(self, f'pushButtonQuick_{idx}').setText(config_value_split[1]) getattr(self, f'lineEditQuick_{idx}').setText(config_value_split[0]) except Exception as e: print(f"Error reading configuration: {e}") def SavConfig(self): #关闭界面前保存快捷区域的命令和名称 config = ConfigParser() config.read(self.IniPath, encoding='utf-8') # 保存UI_config config.set('UI_config', 'port', str(self.lineEdit_port.text())) config.set('UI_config', 'hex_send', str(self.checkBoxHexSend.checkState())) config.set('UI_config', 'hex_receive', str(self.checkBoxHexReceive.checkState())) config.set('UI_config', 'add_date', str(self.checkBoxAddDate.checkState())) config.set('UI_config','cr_lf',str(self.checkBoxCRLF.checkState())) config.set('UI_config', 'auto_sav_log', str(self.checkBoxAutoSaveLog.checkState())) config.set('UI_config', 'modbus_csv', str(self.checkBoxAutoSaveCsv.checkState())) # 保存DisHex_config for i in range(self.quick_num): rowstr = [] idx = f'{i:02}' rowname = f'row{idx}' for col in range(self.tableWidget.columnCount()-1): item = self.tableWidget.item(i, col) if item is not None: rowstr.append(item.text()) else: rowstr.append('') # 如果单元格为空,则添加空字符串 selectStr = str(self.tableWidget.item(i,0).checkState()) row_string = '|'.join(rowstr) + f'|{selectStr}' config.set('DisHex_config', rowname, row_string) # 保存快捷按钮 for i in range(self.quick_num): # 假设有20个按钮 index_str = f"{i:02}" # 确保编号为两位数字形式 lineEditName = f"lineEditQuick_{index_str}" buttonName = f"pushButtonQuick_{index_str}" # 格式化按钮名称,确保两位数 button_name = f"button{index_str}" set_text = getattr(self, lineEditName).text() + "|" + getattr(self, buttonName).text() config.set('Quick_config', button_name, set_text) with open(self.IniPath, 'w', encoding='utf-8') as f: config.write(f) def onButtonClick(self, idx): # 槽函数处理按钮点击事件 if idx not in self.double_click_timers: self.double_click_timers[idx] = None if self.double_click_timers[idx] is None: self.double_click_timers[idx] = True QTimer.singleShot(200, lambda: self.onButtonSigleClick(idx)) else: self.onButtonDoubleClick(idx) def onButtonSigleClick(self, idx): index_str = f"{idx:02}" if self.double_click_timers[idx]: self.double_click_timers[idx] = None lineEdit = getattr(self, f"lineEditQuick_{index_str}") print(f"Text in lineEdit_{idx}: {lineEdit.text()}") text = lineEdit.text() if self.checkBox_return.isChecked(): text = text + "\r\n" self.data_send(text) def onButtonDoubleClick(self, idx): # 槽函数处理按钮双击事件 self.double_click_timers[idx] = None index_str = f"{idx:02}" print(f"Double click detected on Button {index_str}.") button = getattr(self, f"pushButtonQuick_{index_str}") new_name, ok = QInputDialog.getText(self, 'Button Rename', 'Enter new button name:') if ok and new_name: button.setText(new_name) #执行 if __name__ == '__main__': app = QtWidgets.QApplication(sys.argv) myshow = Pyqt5Net() myshow.show() sys.exit(app.exec_())