2025-01-16 14:03:14 +08:00

506 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
Created on Sun Sep 29 09:01:22 2024
@author: WANGXIBAO
"""
import sys,os
import re,socket
import time,datetime
from PyQt5 import QtWidgets
from PyQt5.QtWidgets import QMessageBox ,QFileDialog,QInputDialog
from PyQt5.QtCore import QTimer
from PyNetUi import Ui_UartAssistant
from configparser import ConfigParser
class Pyqt5Net(QtWidgets.QWidget,Ui_UartAssistant):
# %%初始化程序
def __init__(self):
super(Pyqt5Net, self).__init__()
self.setupUi(self)
self.quick_num = 99
self.init() #信号和槽
self.IniPath = "PyNet.ini"
self.log_time = 10
self.CheckCfgIniData() #更改按钮的文字
self.log_count = 0
self.log_buffer = {} #定义成一个字典,键--文件名,值--对于收到的字符串列表
# 设置Logo和标题
#self.setWindowIcon(QIcon('favicon.ico'))
self.setWindowTitle("网口调试助手")
# 设置禁止拉伸窗口大小
#self.setFixedSize(self.width(), self.height())
# 发送数据和接收数据数目置零
self.data_num_sended = 0
self.lineEditSendNum.setText(str(self.data_num_sended))
self.data_num_received = 0
self.lineEditReceiveNum.setText(str(self.data_num_received))
# 串口关闭按钮使能关闭
self.pushButton_closeNet.setEnabled(False)
# 发送框、文本框清除
self.textEditReceive.setText("")
self.textEditSend.setText("")
self.get_all_local_ips()
# 定时器接收数据
self.timer = QTimer()
self.timer.timeout.connect(self.data_receive)
#加载快捷指令
self.widget_6.hide()
#加载快捷指令的按键值
#建立信号与槽
def init(self):
self.pushButton_openNet.clicked.connect(self.open_net)
self.pushButton_closeNet.clicked.connect(self.close_net)
self.pushButtonSend.clicked.connect(lambda:self.data_send(text_quick= None))
# 清除发送按钮
self.pushButtonClearSend.clicked.connect(self.send_data_clear)
# 清除接收按钮
self.pushButtonClearReceive.clicked.connect(self.receive_data_clear)
# 定时发送数据
self.timer_send = QTimer()
self.timer_send.timeout.connect(self.data_send)
self.checkBoxReapitSend.stateChanged.connect(self.data_send_timer)
# 快捷指令扩展区域
self.pushButton_expend.clicked.connect(self.adjust_sidebar)
# 清空网络通道列表
self.pushButtonClearChannel.clicked.connect(self.channel_clear)
# 动态创建控件并存储引用
for i in range(self.quick_num): # 从0到20
index_str = f"{i:02}" # 确保编号为两位数字形式
horizontalLayoutName = f"horizontalLayoutQuick_{index_str}"
horizontalLayout = QtWidgets.QHBoxLayout()
horizontalLayout.setObjectName(horizontalLayoutName)
# 创建 QLineEdit 并设置动态属性
lineEditName = f"lineEditQuick_{index_str}"
setattr(self, lineEditName, QtWidgets.QLineEdit(self.layoutWidget1))
getattr(self, lineEditName).setObjectName(lineEditName)
horizontalLayout.addWidget(getattr(self, lineEditName))
# 创建 QPushButton 并设置动态属性
buttonName = f"pushButtonQuick_{index_str}"
setattr(self, buttonName, QtWidgets.QPushButton(self.layoutWidget1))
getattr(self, buttonName).setObjectName(buttonName)
horizontalLayout.addWidget(getattr(self, buttonName))
self.verticalLayout_8.addLayout(horizontalLayout)
# 连接按钮点击事件到槽函数
button = getattr(self, buttonName)
if button:
button.clicked.connect(lambda checked, idx=i: self.onButtonClick(idx))
self.double_click_timers = {} # 存储双击定时器
#%% 重写关闭按钮
def closeEvent(self, event):
#关闭界面前保存快捷区域的命令和名称
for i in range(self.quick_num): # 假设有20个按钮
index_str = f"{i:02}" # 确保编号为两位数字形式
lineEditName = f"lineEditQuick_{index_str}"
buttonName = f"pushButtonQuick_{index_str}" # 格式化按钮名称,确保两位数
button_name = f"button{index_str}"
set_text = getattr(self,lineEditName).text()+ "|" + getattr(self,buttonName).text()
self.SetCfgIniData(button_name, set_text)
self.close_net()
# 调用父类的关闭事件处理函数
super().closeEvent(event)
#加载本地网络地址
def get_all_local_ips(self):
ip_addresses = []
for interface in socket.getaddrinfo(socket.gethostname(), None):
address = interface[4][0]
if ':' not in address: # IPv6地址包含冒号
ip_addresses.append(address)
print(ip_addresses)
for ip in ip_addresses:
self.comboBox_localAddr.insertItem(0,ip)
self.comboBox_localAddr.setCurrentIndex(0)
# 打开网络连接
def open_net(self):
IP = self.comboBox_localAddr.currentText()
port = self.lineEdit_port.text()
udp_addr = (IP, int(port))
self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #创建接收空socket
print("绑定地址", udp_addr)
try:
self.udp_socket.bind(udp_addr)
print("开始监听。。。")
# 设置套接字为非阻塞模式
self.udp_socket.setblocking(False)
except Exception as e:
print(f"Error reading configuration: {e}")
return None
#使能相关按钮
self.pushButton_closeNet.setEnabled(True)
self.pushButton_openNet.setEnabled(False)
# 打开串口接收定时器周期为1ms
self.timer.start(20)
def close_net(self):
if hasattr(self, 'udp_socket'):
self.timer.stop()
self.udp_socket.close()
#使能相关按钮
self.pushButton_closeNet.setEnabled(False)
self.pushButton_openNet.setEnabled(True)
print("关闭网络连接")
#接收数据
def data_receive(self):
try:
recv_data = self.udp_socket.recvfrom(1024) # 1024表示本次接收的最大字节数
out_s = ''
# 打印接收到的数据
if self.checkBoxHexReceive.checkState():
line_utf8 = recv_data[0].hex()
else:
line_utf8 = recv_data[0].decode('utf-8',errors='replace')
print("[From %s:%d]:%s" % (recv_data[1][0], recv_data[1][1], line_utf8))
num = len(line_utf8)
# 接收窗口中增加来源地址显示
recv_addr = recv_data[1][0]+':'+str(recv_data[1][1])
if not self.is_in_comboBox_channel(recv_addr):
self.comboBox_channel.addItem(recv_addr)
if recv_addr==self.comboBox_channel.currentText() or self.comboBox_channel.currentText()=="ALL":
#self.textEditReceive.insertPlainText('<' + recv_addr+ '> ' )
if self.checkBoxAddDate.isChecked():
nowTime = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
nowTime = nowTime[:-3]
#self.textEditReceive.insertPlainText(nowTime + " ")
self.add_line_to_textedit(nowTime + " ")
# HEX显示数据
if self.checkBoxHexReceive.checkState():
# line_utf8_bytes = line_utf8.hex() # 编码为 UTF-8 字节串
# for byte in line_utf8_bytes:
# out_s += '{:02X} '.format(byte) # 使用大写字母 X
out_s = ' '.join(['{:02X}'.format(b) for b in bytes.fromhex(line_utf8)])
# self.textEditReceive.insertPlainText(out_s)
# self.textEditReceive.insertPlainText('\r\n')
self.add_line_to_textedit('<' + recv_addr+ '> ' + out_s)
# ASCII显示数据
else:
#print("解码前",datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"))
#self.textEditReceive.insertPlainText(line_utf8)
self.add_line_to_textedit('<' + recv_addr+ '> ' + line_utf8)
#print("解码数据",datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"))
# 接收换行
if self.checkBoxCRLF.isChecked():
self.textEditReceive.insertPlainText('\r\n')
# 获取到text光标
textCursor = self.textEditReceive.textCursor()
# 滚动到底部
textCursor.movePosition(textCursor.End)
# 设置光标到text中去
self.textEditReceive.setTextCursor(textCursor)
# 统计接收字符的数量
self.data_num_received += num
self.lineEditReceiveNum.setText(str(self.data_num_received))
# 自动保存日志
if self.checkBoxAutoSaveLog.isChecked():
line = line_utf8.split(',')
if len(line) ==2:
sav_name = line[0] +'_'+ datetime.datetime.now().strftime("%Y-%m-%d")+'.txt'
sav_str = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")[:23] +" " +line[1]
else:
sav_name = datetime.datetime.now().strftime("%Y-%m-%d")+'.log'
if self.checkBoxHexReceive.checkState():
sav_str = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")[:23] +" " +out_s +"\r\n"
else:
sav_str = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")[:23] +" " +line_utf8
# 将日志信息添加到缓冲区
if sav_name not in self.log_buffer:
self.log_buffer[sav_name] = []
self.log_buffer[sav_name].append(sav_str)
# 当缓冲区中的记录达到10条时写入文件
if len(self.log_buffer[sav_name]) >= self.log_time:
with open(sav_name, mode='a', newline='',encoding='utf-8', errors='replace') as file:
file.writelines(self.log_buffer[sav_name])
self.log_buffer[sav_name].clear()
except socket.error as e:
#print(f"Socket error: {e}")
pass
def data_send(self,text_quick = None):
if text_quick== None:
input_s = self.textEditSend.toPlainText()
else:
input_s = text_quick
try:
if input_s != "":
# 时间显示
if self.checkBoxAddDate.isChecked():
self.textEditReceive.insertPlainText((time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) + " ")
# HEX发送
if self.checkBoxHexSend.isChecked():
input_s = input_s.strip()
send_list = []
while input_s != '':
try:
num = int(input_s[0:2], 16)
except ValueError:
QMessageBox.critical(self, '串口异常', '请输入规范十六进制数据,以空格分开!')
return None
input_s = input_s[2:].strip()
send_list.append(num)
input_s = bytes(send_list)
# ASCII发送
else:
input_s = (input_s).encode('utf-8')
input_s = re.sub(b'(?<!\r)\n', b'\r\n', input_s)
# HEX接收显示
if self.checkBoxHexReceive.isChecked():
out_s = ''
for i in range(0, len(input_s)):
out_s = out_s + '{:02X}'.format(input_s[i]) + ' '
print(out_s)
self.textEditReceive.insertPlainText(out_s)
# ASCII接收显示
else:
self.textEditReceive.insertPlainText(input_s.decode('utf-8'))
# 接收换行
if self.checkBoxCRLF.isChecked():
self.textEditReceive.insertPlainText('\r\n')
# 获取到Text光标
textCursor = self.textEditReceive.textCursor()
# 滚动到底部
textCursor.movePosition(textCursor.End)
# 设置光标到Text中去
self.textEditReceive.setTextCursor(textCursor)
#定义服务器的IP地址和端口号
sever = self.comboBox_channel.currentText().split(':')
server_ip = sever[0]
server_port = int(sever[1])
server_address = (server_ip, server_port)
# 将消息发送到服务器
num = self.udp_socket.sendto(input_s, server_address)
# 统计发送字符数量
self.data_num_sended += num
self.lineEditSendNum.setText(str(self.data_num_sended))
except:
pass
# %%定时发送数据
def data_send_timer(self):
try:
if 1<= int(self.lineEditTime.text()) <= 30000: # 定时时间1ms~30s内
if self.checkBoxReapitSend.isChecked():
self.timer_send.start(int(self.lineEditTime.text()))
self.lineEditTime.setEnabled(False)
else:
self.timer_send.stop()
self.lineEditTime.setEnabled(True)
else:
QMessageBox.critical(self, '定时发送数据异常', '定时发送数据周期仅可设置在30秒内')
except:
QMessageBox.critical(self, '定时发送数据异常', '请设置正确的数值类型!')
# %%保存日志
def savefiles(self):
dlg = QFileDialog()
filenames = dlg.getSaveFileName(None, "保存日志文件", None, "Txt files(*.txt)")
try:
with open(file = filenames[0], mode='w', encoding='utf-8') as file:
file.write(self.textEditReceive.toPlainText())
except:
QMessageBox.critical(self, '日志异常', '保存日志文件失败!')
#%% 加载日志
def openfiles(self):
dlg = QFileDialog()
filenames = dlg.getOpenFileName(None, "加载日志文件", None, "Txt files(*.txt)")
try:
with open(file = filenames[0], mode='r', encoding='utf-8') as file:
self.textEditSend.setPlainText(file.read())
except:
QMessageBox.critical(self, '日志异常', '加载日志文件失败!')
#%% 清除计数
def send_data_clear(self):
self.textEditSend.setText("")
self.data_num_sended = 0
self.lineEditSendNum.setText(str(self.data_num_sended))
# 清除接收数据显示
def receive_data_clear(self):
self.textEditReceive.setText("")
self.data_num_received = 0
self.lineEditSendNum.setText(str(self.data_num_received))
#设置接受区域显示条数
def add_line_to_textedit(self, new_line):
# 获取当前文本并按行分割
current_lines = self.textEditReceive.toPlainText().splitlines()
# 添加新的行到列表中
current_lines.append(new_line)
# 如果行数超过100则删除最早的行直到剩下100行
if len(current_lines) > 1000:
current_lines = current_lines[-1000:] # 只保留最新的100行
# 将更新后的行列表转换回字符串并设置为textEdit的内容
self.textEditReceive.setPlainText('\n'.join(current_lines))
#检查是否在comboBox_channel中
def is_in_comboBox_channel(self,addr_rece):
for i in range(self.comboBox_channel.count()):
if addr_rece == self.comboBox_channel.itemText(i):
return True
return False
def channel_clear(self):
self.comboBox_channel.lineEdit().clear()
for i in range(self.comboBox_channel.count()-1):
self.comboBox_channel.removeItem(1)
self.comboBox_channel.setCurrentIndex(0)
#开关快捷指令栏
def adjust_sidebar(self):
if self.widget_6.isHidden():
self.widget_6.show()
else:
self.widget_6.hide()
def onPushButtonQuickClicked(self, line_edit):
text = getattr(self, line_edit).text()
#print(f"Button clicked: {text}")
if self.checkBox_return.isChecked():
text = text + "\r\n"
self.data_send(text)
def CheckCfgIniData(self):
if not os.path.exists(self.IniPath):
config = ConfigParser()
config.add_section('Quick_config')
config.set('Quick_config', 'log_time', '10')
for i in range(self.quick_num):
idx = f'{i:02}'
button_name = f'Button{idx}' # 格式化按钮名称,确保两位数
config.set('Quick_config', button_name, '')
with open(self.IniPath, 'w' ,encoding='utf-8') as f:
config.write(f)
config = ConfigParser()
config.read(self.IniPath, encoding='utf-8')
try:
# 创建一个空字典来存储按钮名称和对应的配置
self.log_time = int(config.get('Quick_config', 'log_time'))
# 循环遍历按钮编号从0到19
for i in range(self.quick_num):
idx = f'{i:02}'
button_name = f'Button{idx}' # 格式化按钮名称,确保两位数
# 使用 get 方法安全地获取配置,如果不存在则返回空字符串
config_value = config.get('Quick_config', button_name, fallback='')
config_value_split = config_value.split('|')
# 将按钮名称和配置信息存储在字典中
if len(config_value_split) == 2:
# 打印字典查看结果,并赋值
print(button_name,config_value_split[0],config_value_split[1])
getattr(self, f'pushButtonQuick_{idx}').setText(config_value_split[1])
getattr(self, f'lineEditQuick_{idx}').setText(config_value_split[0])
except Exception as e:
print(f"Error reading configuration: {e}")
def SetCfgIniData(self,button_name,set_text):
config = ConfigParser()
config.read(self.IniPath, encoding='utf-8')
config.set('Quick_config', button_name, set_text)
with open(self.IniPath, 'w' ,encoding='utf-8') as f:
config.write(f)
def onButtonClick(self, idx):
# 槽函数处理按钮点击事件
if idx not in self.double_click_timers:
self.double_click_timers[idx] = None
if self.double_click_timers[idx] is None:
self.double_click_timers[idx] = True
QTimer.singleShot(200, lambda: self.onButtonSigleClick(idx))
else:
self.onButtonDoubleClick(idx)
def onButtonSigleClick(self, idx):
index_str = f"{idx:02}"
if self.double_click_timers[idx]:
self.double_click_timers[idx] = None
lineEdit = getattr(self, f"lineEditQuick_{index_str}")
print(f"Text in lineEdit_{idx}: {lineEdit.text()}")
text = lineEdit.text()
if self.checkBox_return.isChecked():
text = text + "\r\n"
self.data_send(text)
def onButtonDoubleClick(self, idx):
# 槽函数处理按钮双击事件
self.double_click_timers[idx] = None
index_str = f"{idx:02}"
print(f"Double click detected on Button {index_str}.")
button = getattr(self, f"pushButtonQuick_{index_str}")
new_name, ok = QInputDialog.getText(self, 'Button Rename', 'Enter new button name:')
if ok and new_name:
button.setText(new_name)
#执行
if __name__ == '__main__':
app = QtWidgets.QApplication(sys.argv)
myshow = Pyqt5Net()
myshow.show()
sys.exit(app.exec_())