# -*- coding: utf-8 -*- """ Created on Mon May 27 14:31:38 2024 @author: WANGXIBAO """ import sys import serial,csv import serial.tools.list_ports import time,datetime from PyQt5 import QtWidgets from PyQt5.Qt import QPainter from PyQt5.QtWidgets import QMessageBox ,QFileDialog from PyQt5.QtCore import QTimer from PyUartUi import Ui_UartAssistant from UartDataPolt import QChartViewPlot,UpdateDataThread,GetDataQX from PyQt5.QtChart import QChartView from PyQt5.QtGui import QIcon class PyQt5Serial(QtWidgets.QWidget,Ui_UartAssistant): # %%初始化程序 def __init__(self): super(PyQt5Serial, self).__init__() self.setupUi(self) self.update_data_thread = UpdateDataThread() # 创建更新波形数据线程 self.get_data_qx = GetDataQX() self.get_data_qx.IndOfReturn(0) #根据数据特点给一个初始值 self.init() self.ser = serial.Serial() #创建一个空对象 self.port_check() # 设置Logo和标题 self.setWindowIcon(QIcon('D:/workspace/py/PyUartAssistant/favicon.png')) self.setWindowTitle("调试助手") # 设置禁止拉伸窗口大小 self.setFixedSize(self.width(), self.height()) # 发送数据和接收数据数目置零 self.data_num_sended = 0 self.lineEditSendNum.setText(str(self.data_num_sended)) self.data_num_received = 0 self.lineEditReceiveNum.setText(str(self.data_num_received)) # 串口关闭按钮使能关闭 self.pushButtonCloseSerial.setEnabled(False) # 发送框、文本框清除 self.textEditReceive.setText("") self.textEditSend.setText("") # %%第二个TAB 初始化画图页面表格 self.pushButtonStopPlot.setEnabled(False) self.pushButtonStartPlot.setEnabled(True) self.radioButtonCH4QX.setEnabled(True) self.radioButtonCH4TF.setEnabled(True) self.checkBoxAutoSaveCsv.setEnabled(False) # 加载Qchart波形界面 self.plot_qchart = QChartViewPlot() self.plot_view.setChart(self.plot_qchart) self.plot_view.setRenderHint(QPainter.Antialiasing) # 抗锯齿 self.plot_view.setRubberBand(QChartView.RectangleRubberBand) # 用于暂存接收的串口数据 self.buffer = b'' # ============================================================================= # def wheelEvent(self, event): # if self.plot_view.underMouse: # # 鼠标滚轮:缩放Qchart波形 # if event.angleDelta().y() >= 0: # # 鼠标滚轮向上 # if event.x() < ( # self.plot_view.width() + self.plot_view.x()) and event.x() > self.plot_view.x(): # if event.y() < ( # self.plot_view.height() + self.plot_view.y()) and event.y() > self.plot_view.y(): # self.plot_qchart.zoomIn() # else: # # 鼠标滚轮向下 # if event.x() < ( # self.plot_view.width() + self.plot_view.x()) and event.x() > self.plot_view.x(): # if event.y() < ( # self.plot_view.height() + self.plot_view.y()) and event.y() > self.plot_view.y(): # self.plot_qchart.zoomOut() # ============================================================================= #%% 重写关闭按钮 def closeEvent(self, event): self.update_data_thread.stop() # 可选:加入超时判断,防止线程未能正常退出导致程序无法关闭 if not self.update_data_thread.wait(1000): # 等待5秒 print("线程未在指定时间内退出,可能需要进一步处理") # 如果串口已经打开,则关闭串口 if self.ser.is_open: self.port_close() # 调用父类的关闭事件处理函数 super().closeEvent(event) # %%建立控件信号与槽关系 def init(self): # 串口检测按钮 self.pushButtonTestSerial.clicked.connect(self.port_check) # 串口打开按钮 self.pushButtonOpenSerial.clicked.connect(self.port_open) # 串口关闭按钮 self.pushButtonCloseSerial.clicked.connect(self.port_close) # 定时发送数据 self.timer_send = QTimer() self.timer_send.timeout.connect(self.data_send) self.checkBoxReapitSend.stateChanged.connect(self.data_send_timer) # 发送数据按钮 self.pushButtonSend.clicked.connect(self.data_send) # 保存日志 self.pushButtonLogSave.clicked.connect(self.savefiles) # 加载日志 self.pushButtonLogLoad.clicked.connect(self.openfiles) # 跳转链接 #self.commandLinkButton1.clicked.connect(self.link) # 清除发送按钮 self.pushButtonClearSend.clicked.connect(self.send_data_clear) # 清除接收按钮 self.pushButtonClearReceive.clicked.connect(self.receive_data_clear) # 开始绘图 self.pushButtonStartPlot.clicked.connect(self.btn_start_clicked) # 关闭绘图 self.pushButtonStopPlot.clicked.connect(self.btn_stop_clicked) #线程信号发射 self.update_data_thread._signal_update.connect(self.update_data_thread_slot) self.comboBoxPlot.currentIndexChanged.connect(self.plot_item_changed) # %% 串口检测 def port_check(self): # 检测所有存在的串口,将信息存储在字典中 self.Com_Dict = {} port_list = list(serial.tools.list_ports.comports()) self.comboBoxSerial.clear() for port in port_list: self.Com_Dict["%s" % port[0]] = "%s" % port[1] self.comboBoxSerial.addItem(port[0]) # 无串口判断 if len(self.Com_Dict) == 0: self.comboBoxSerial.addItem("无串口") # %%打开串口 def port_open(self): self.ser.port = self.comboBoxSerial.currentText() # 串口号 self.ser.baudrate = int(self.comboBoxBaudrate.currentText()) # 波特率 self.ser.timeout = 0.01 flag_data = int(self.comboBoxDataBits.currentText()) # 数据位 if flag_data == 5: self.ser.bytesize = serial.FIVEBITS elif flag_data == 6: self.ser.bytesize = serial.SIXBITS elif flag_data == 7: self.ser.bytesize = serial.SEVENBITS else: self.ser.bytesize = serial.EIGHTBITS flag_data = self.comboBoxCheckBit.currentText() # 校验位 if flag_data == "None": self.ser.parity = serial.PARITY_NONE elif flag_data == "Odd": self.ser.parity = serial.PARITY_ODD else: self.ser.parity = serial.PARITY_EVEN flag_data = int(self.comboBoxStopBit.currentText()) # 停止位 if flag_data == 1: self.ser.stopbits = serial.STOPBITS_ONE else: self.ser.stopbits = serial.STOPBITS_TWO flag_data = self.comboBoxFlow.currentText() # 流控 if flag_data == "No Ctrl Flow": self.ser.xonxoff = False #软件流控 self.ser.dsrdtr = False #硬件流控 DTR self.ser.rtscts = False #硬件流控 RTS elif flag_data == "SW Ctrl Flow": self.ser.xonxoff = True #软件流控 else: if self.checkBoxDTR.isChecked(): self.ser.dsrdtr = True #硬件流控 DTR if self.checkBoxRTS.isChecked(): self.ser.rtscts = True #硬件流控 RTS try: time.sleep(0.1) if self.ser.is_open: self.ser.close() self.ser.open() except: QMessageBox.critical(self, "串口异常", "此串口不能被打开!") return None # 串口打开后,切换开关串口按钮使能状态,防止失误操作 if self.ser.isOpen(): self.pushButtonOpenSerial.setEnabled(False) self.pushButtonCloseSerial.setEnabled(True) self.comboBoxBaudrate.setEnabled(False) self.comboBoxSerial.setEnabled(False) #self.formGroupBox1.setTitle("串口状态(开启)") #日志保存 # 格式化日期时间字符串,用于文件名 # 例如:2024-05-28_12-34-56.txt self.file =time.strftime("%Y%m%d%H%M%S", time.localtime()) self.filename = self.file + ".txt" # 定时器接收数据 self.timer = QTimer() self.timer.timeout.connect(self.data_receive) # 打开串口接收定时器,周期为1ms self.timer.start(20) # %%接收数据 def data_receive(self): try: num = self.ser.inWaiting() if num>0 : #print("接收数据",datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")) data = self.ser.read(num) self.buffer+=data #print("接收完成",datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")) if self.checkBoxAddDate.isChecked(): nowTime = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") nowTime = nowTime[:-3] self.textEditReceive.insertPlainText(nowTime + " ") # HEX显示数据 if self.checkBoxHexReceive.checkState(): out_s = '' for i in range(0, len(data)): out_s = out_s + '{:02X}'.format(data[i]) + ' ' self.textEditReceive.insertPlainText(out_s) # ASCII显示数据 else: #print("解码前",datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")) self.textEditReceive.insertPlainText(data.decode('utf-8',errors='replace')) #print("解码数据",datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")) # 接收换行 if self.checkBoxCRLF.isChecked(): self.textEditReceive.insertPlainText('\r\n') # 获取到text光标 textCursor = self.textEditReceive.textCursor() # 滚动到底部 textCursor.movePosition(textCursor.End) # 设置光标到text中去 self.textEditReceive.setTextCursor(textCursor) # 统计接收字符的数量 self.data_num_received += num self.lineEditReceiveNum.setText(str(self.data_num_received)) try: lines = self.buffer.split(b'\n') # 或者使用 b'\r\n' 根据你的需要 # 最后一个元素可能不包含完整的行,所以将其保留作为新的缓存 self.buffer = lines.pop() # 处理每一行数据 for line in lines: # 注意:每行数据可能不包含结尾的换行符,所以在处理前检查一下 if line.endswith(b'\r'): line = line[:-1] # 移除回车 # 写入当前日期的文件,打开文件,如果文件不存在则创建,如果存在则追加内容 if self.checkBoxAutoSaveLog.isChecked(): with open(self.filename, 'a', encoding='utf-8',newline='') as file: #print (data.decode('utf-8')) lineUtf8 = line.decode('utf-8') saveData = (time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) + " " + lineUtf8 + "\r\n" #print("saveData:",saveData) file.write(saveData) # 写入内容 if lineUtf8[:2]=="A+": print(lineUtf8) dataSplit = self.get_data_qx.Transdata(lineUtf8) self.filenameCsv= self.file + ".csv" if self.checkBoxAutoSaveCsv.isChecked(): #写入CSV文件 try: self.get_data_qx.SaveCsv(self.filenameCsv) except: print("写入CSV失败") pass self.update_data_thread.SetFlag(1) self.update_data_thread.SetReceiveData(dataSplit) #print(dataSplit) #print("接收发数",datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")) except: print("预处理数据有问题") pass except: QMessageBox.critical(self, '串口异常', '串口接收数据异常,请重新连接设备!') self.port_close() return None # %%定时发送数据 def data_send_timer(self): try: if 1<= int(self.lineEditTime.text()) <= 30000: # 定时时间1ms~30s内 if self.checkBoxReapitSend.isChecked(): self.timer_send.start(int(self.lineEditTime.text())) self.lineEditTime.setEnabled(False) else: self.timer_send.stop() self.lineEditTime.setEnabled(True) else: QMessageBox.critical(self, '定时发送数据异常', '定时发送数据周期仅可设置在30秒内!') except: QMessageBox.critical(self, '定时发送数据异常', '请设置正确的数值类型!') # %%发送数据 def data_send(self): if self.ser.isOpen(): input_s = self.textEditSend.toPlainText() # 判断是否为非空字符串 if input_s != "": # 时间显示 if self.checkBoxAddDate.isChecked(): self.textEditReceive.insertPlainText((time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) + " ") # HEX发送 if self.checkBoxHexSend.isChecked(): input_s = input_s.strip() send_list = [] while input_s != '': try: num = int(input_s[0:2], 16) except ValueError: QMessageBox.critical(self, '串口异常', '请输入规范十六进制数据,以空格分开!') return None input_s = input_s[2:].strip() send_list.append(num) input_s = bytes(send_list) # ASCII发送 else: input_s = (input_s).encode('utf-8') # HEX接收显示 if self.checkBoxHexReceive.isChecked(): out_s = '' for i in range(0, len(input_s)): out_s = out_s + '{:02X}'.format(input_s[i]) + ' ' self.textEditReceive.insertPlainText(out_s) # ASCII接收显示 else: self.textEditReceive.insertPlainText(input_s.decode('utf-8')) # 接收换行 if self.checkBoxCRLF.isChecked(): self.textEditReceive.insertPlainText('\r\n') # 获取到Text光标 textCursor = self.textEditReceive.textCursor() # 滚动到底部 textCursor.movePosition(textCursor.End) # 设置光标到Text中去 self.textEditReceive.setTextCursor(textCursor) # 统计发送字符数量 num = self.ser.write(input_s) self.data_num_sended += num self.lineEditSendNum.setText(str(self.data_num_sended)) else: pass # %%保存日志 def savefiles(self): dlg = QFileDialog() filenames = dlg.getSaveFileName(None, "保存日志文件", None, "Txt files(*.txt)") try: with open(file = filenames[0], mode='w', encoding='utf-8') as file: file.write(self.textEditReceive.toPlainText()) except: QMessageBox.critical(self, '日志异常', '保存日志文件失败!') #%% 加载日志 def openfiles(self): dlg = QFileDialog() filenames = dlg.getOpenFileName(None, "加载日志文件", None, "Txt files(*.txt)") try: with open(file = filenames[0], mode='r', encoding='utf-8') as file: self.textEditSend.setPlainText(file.read()) except: QMessageBox.critical(self, '日志异常', '加载日志文件失败!') # %%打开博客链接和公众号二维码 #webbrowser.open('https://blog.csdn.net/m0_38106923') # %%清除发送数据显示 def send_data_clear(self): self.textEditSend.setText("") self.data_num_sended = 0 self.lineEditSendNum.setText(str(self.data_num_sended)) # 清除接收数据显示 def receive_data_clear(self): self.textEditReceive.setText("") self.data_num_received = 0 self.lineEditSendNum.setText(str(self.data_num_received)) # 关闭串口 def port_close(self): try: self.timer.stop() self.timer_send.stop() self.btn_stop_clicked() #执行停止绘图操作 self.ser.close() except: QMessageBox.critical(self, '串口异常', '关闭串口失败,请重启程序!') return None # 切换开关串口按钮使能状态和定时发送使能状态 self.pushButtonOpenSerial.setEnabled(True) self.pushButtonCloseSerial.setEnabled(False) self.lineEditTime.setEnabled(True) self.comboBoxBaudrate.setEnabled(True) self.comboBoxSerial.setEnabled(True) # 发送数据和接收数据数目置零 self.data_num_sended = 0 self.lineEditSendNum.setText(str(self.data_num_sended)) self.data_num_received = 0 self.lineEditReceiveNum.setText(str(self.data_num_received)) #self.formGroupBox1.setTitle("串口状态(关闭)") def btn_start_clicked(self): #开启按钮 self.update_data_thread.start() self.update_data_thread.restart() self.pushButtonStartPlot.setEnabled(False) self.pushButtonStopPlot.setEnabled(True) self.radioButtonCH4QX.setEnabled(False) self.radioButtonCH4TF.setEnabled(False) self.checkBoxAutoSaveCsv.setEnabled(True) self.comboBoxPlot.addItems(["甲烷浓度","环境温度","激光器温度","激光强度"]) self.comboBoxPlot.setCurrentText("甲烷浓度") #self.update_data_thread.SetPlotItem(0) self.get_data_qx.IndOfReturn(0) self.update_data_thread.restart() def btn_stop_clicked(self): self.update_data_thread.stop() self.pushButtonStartPlot.setEnabled(True) self.pushButtonStopPlot.setEnabled(False) self.radioButtonCH4QX.setEnabled(True) self.radioButtonCH4TF.setEnabled(True) self.checkBoxAutoSaveCsv.setEnabled(False) self.comboBoxPlot.clear() def update_data_thread_slot(self, data): # 线程回调函数 #data = json.loads(data) self.plot_qchart.handle_update(float(data)) #print("thread ",data) def plot_item_changed(self,index): print(index) #self.update_data_thread.SetPlotItem(index) self.get_data_qx.IndOfReturn(index) #self.plot_qchart.series.replace([]) #执行 if __name__ == '__main__': app = QtWidgets.QApplication(sys.argv) myshow = PyQt5Serial() myshow.show() sys.exit(app.exec_())