# -*- coding: utf-8 -*- """ Created on Mon May 27 14:31:38 2024 @author: WANGXIBAO """ import sys import serial,csv import serial.tools.list_ports import time,datetime from PyQt5 import QtWidgets from PyQt5.Qt import QPainter from PyQt5.QtWidgets import QMessageBox ,QFileDialog from PyQt5.QtCore import QTimer from PyUartUi import Ui_UartAssistant from UartDataPolt import QChartViewPlot,UpdateDataThread from PyQt5.QtChart import QChart, QValueAxis, QChartView, QSplineSeries from PyQt5.QtGui import QIcon class PyQt5Serial(QtWidgets.QWidget,Ui_UartAssistant): # %%初始化程序 def __init__(self): super(PyQt5Serial, self).__init__() self.setupUi(self) self.update_data_thread = UpdateDataThread() # 创建更新波形数据线程 self.init() self.ser = serial.Serial() #创建一个空对象 self.port_check() # 设置Logo和标题 self.setWindowIcon(QIcon('D:/workspace/py/PyUartAssistant/favicon.png')) self.setWindowTitle("调试助手") # 设置禁止拉伸窗口大小 self.setFixedSize(self.width(), self.height()) # 发送数据和接收数据数目置零 self.data_num_sended = 0 self.lineEditSendNum.setText(str(self.data_num_sended)) self.data_num_received = 0 self.lineEditReceiveNum.setText(str(self.data_num_received)) # 串口关闭按钮使能关闭 self.pushButtonCloseSerial.setEnabled(False) # 发送框、文本框清除 self.textEditReceive.setText("") self.textEditSend.setText("") # %%第二个TAB 初始化画图页面表格 self.pushButtonStopPlot.setEnabled(False) self.pushButtonStartPlot.setEnabled(True) self.radioButtonCH4QX.setEnabled(True) self.radioButtonCH4TF.setEnabled(True) # 加载Qchart波形界面 self.plot_qchart = QChartViewPlot() self.plot_view.setChart(self.plot_qchart) self.plot_view.setRenderHint(QPainter.Antialiasing) # 抗锯齿 self.plot_view.setRubberBand(QChartView.RectangleRubberBand) # ============================================================================= # def wheelEvent(self, event): # if self.plot_view.underMouse: # # 鼠标滚轮:缩放Qchart波形 # if event.angleDelta().y() >= 0: # # 鼠标滚轮向上 # if event.x() < ( # self.plot_view.width() + self.plot_view.x()) and event.x() > self.plot_view.x(): # if event.y() < ( # self.plot_view.height() + self.plot_view.y()) and event.y() > self.plot_view.y(): # self.plot_qchart.zoomIn() # else: # # 鼠标滚轮向下 # if event.x() < ( # self.plot_view.width() + self.plot_view.x()) and event.x() > self.plot_view.x(): # if event.y() < ( # self.plot_view.height() + self.plot_view.y()) and event.y() > self.plot_view.y(): # self.plot_qchart.zoomOut() # ============================================================================= #%% 重写关闭按钮 def closeEvent(self, event): self.update_data_thread.stop() # 可选:加入超时判断,防止线程未能正常退出导致程序无法关闭 if not self.update_data_thread.wait(1000): # 等待5秒 print("线程未在指定时间内退出,可能需要进一步处理") # 如果串口已经打开,则关闭串口 if self.ser.is_open: self.port_close() # 调用父类的关闭事件处理函数 super().closeEvent(event) # %%建立控件信号与槽关系 def init(self): # 串口检测按钮 self.pushButtonTestSerial.clicked.connect(self.port_check) # 串口打开按钮 self.pushButtonOpenSerial.clicked.connect(self.port_open) # 串口关闭按钮 self.pushButtonCloseSerial.clicked.connect(self.port_close) # 定时发送数据 self.timer_send = QTimer() self.timer_send.timeout.connect(self.data_send) self.checkBoxReapitSend.stateChanged.connect(self.data_send_timer) # 发送数据按钮 self.pushButtonSend.clicked.connect(self.data_send) # 保存日志 self.pushButtonLogSave.clicked.connect(self.savefiles) # 加载日志 self.pushButtonLogLoad.clicked.connect(self.openfiles) # 跳转链接 #self.commandLinkButton1.clicked.connect(self.link) # 清除发送按钮 self.pushButtonClearSend.clicked.connect(self.send_data_clear) # 清除接收按钮 self.pushButtonClearReceive.clicked.connect(self.receive_data_clear) # 开始绘图 self.pushButtonStartPlot.clicked.connect(self.btn_start_clicked) # 关闭绘图 self.pushButtonStopPlot.clicked.connect(self.btn_stop_clicked) #线程信号发射 self.update_data_thread._signal_update.connect(self.update_data_thread_slot) self.comboBoxPlot.currentIndexChanged.connect(self.plot_item_changed) # %% 串口检测 def port_check(self): # 检测所有存在的串口,将信息存储在字典中 self.Com_Dict = {} port_list = list(serial.tools.list_ports.comports()) self.comboBoxSerial.clear() for port in port_list: self.Com_Dict["%s" % port[0]] = "%s" % port[1] self.comboBoxSerial.addItem(port[0]) # 无串口判断 if len(self.Com_Dict) == 0: self.comboBoxSerial.addItem("无串口") # %%打开串口 def port_open(self): self.ser.port = self.comboBoxSerial.currentText() # 串口号 self.ser.baudrate = int(self.comboBoxBaudrate.currentText()) # 波特率 self.ser.timeout = 0.1 flag_data = int(self.comboBoxDataBits.currentText()) # 数据位 if flag_data == 5: self.ser.bytesize = serial.FIVEBITS elif flag_data == 6: self.ser.bytesize = serial.SIXBITS elif flag_data == 7: self.ser.bytesize = serial.SEVENBITS else: self.ser.bytesize = serial.EIGHTBITS flag_data = self.comboBoxCheckBit.currentText() # 校验位 if flag_data == "None": self.ser.parity = serial.PARITY_NONE elif flag_data == "Odd": self.ser.parity = serial.PARITY_ODD else: self.ser.parity = serial.PARITY_EVEN flag_data = int(self.comboBoxStopBit.currentText()) # 停止位 if flag_data == 1: self.ser.stopbits = serial.STOPBITS_ONE else: self.ser.stopbits = serial.STOPBITS_TWO flag_data = self.comboBoxFlow.currentText() # 流控 if flag_data == "No Ctrl Flow": self.ser.xonxoff = False #软件流控 self.ser.dsrdtr = False #硬件流控 DTR self.ser.rtscts = False #硬件流控 RTS elif flag_data == "SW Ctrl Flow": self.ser.xonxoff = True #软件流控 else: if self.checkBoxDTR.isChecked(): self.ser.dsrdtr = True #硬件流控 DTR if self.checkBoxRTS.isChecked(): self.ser.rtscts = True #硬件流控 RTS try: time.sleep(0.1) if self.ser.is_open: self.ser.close() self.ser.open() except: QMessageBox.critical(self, "串口异常", "此串口不能被打开!") return None # 串口打开后,切换开关串口按钮使能状态,防止失误操作 if self.ser.isOpen(): self.pushButtonOpenSerial.setEnabled(False) self.pushButtonCloseSerial.setEnabled(True) self.comboBoxBaudrate.setEnabled(False) self.comboBoxSerial.setEnabled(False) #self.formGroupBox1.setTitle("串口状态(开启)") #日志保存 # 格式化日期时间字符串,用于文件名 # 例如:2024-05-28_12-34-56.txt self.file =time.strftime("%Y%m%d%H%M%S", time.localtime()) self.filename = self.file + ".txt" # 定时器接收数据 self.timer = QTimer() self.timer.timeout.connect(self.data_receive) # 打开串口接收定时器,周期为1ms self.timer.start(1) # %%接收数据 def data_receive(self): try: num = self.ser.inWaiting() # ============================================================================= # if num > 0: # time.sleep(0.1) # num = self.ser.inWaiting() #延时,再读一次数据,确保数据完整性 # ============================================================================= except: QMessageBox.critical(self, '串口异常', '串口接收数据异常,请重新连接设备!') self.port_close() return None if num > 0: #data = self.ser.read(num) data = self.ser.readline() num = len(data) # 时间显示 if self.checkBoxAddDate.isChecked(): nowTime = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") nowTime = nowTime[:-3] self.textEditReceive.insertPlainText(nowTime + " ") # HEX显示数据 if self.checkBoxHexReceive.checkState(): out_s = '' for i in range(0, len(data)): out_s = out_s + '{:02X}'.format(data[i]) + ' ' self.textEditReceive.insertPlainText(out_s) # ASCII显示数据 else: self.textEditReceive.insertPlainText(data.decode('utf-8',errors='replace')) # 接收换行 if self.checkBoxCRLF.isChecked(): self.textEditReceive.insertPlainText('\r\n') # 获取到text光标 textCursor = self.textEditReceive.textCursor() # 滚动到底部 textCursor.movePosition(textCursor.End) # 设置光标到text中去 self.textEditReceive.setTextCursor(textCursor) # 统计接收字符的数量 self.data_num_received += num self.lineEditReceiveNum.setText(str(self.data_num_received)) try: self.dataReceive = data.decode('utf-8') # 写入当前日期的文件 #print(data.decode('utf-8')) # 打开文件,如果文件不存在则创建,如果存在则追加内容 #============================================================================= with open(self.filename, 'a', encoding='utf-8',newline='') as file: #print (data.decode('utf-8')) saveData = (time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) + " " + data.decode('utf-8') file.write(saveData) # 写入内容 #============================================================================= # 发送标志位给显示页面预处理 self.dataReceive = data.decode('utf-8') if self.dataReceive[:2]=="A+": self.update_data_thread.SetFlag(1) self.update_data_thread.SetReceiveData(self.dataReceive) except: pass else: pass # %%定时发送数据 def data_send_timer(self): try: if 1<= int(self.lineEditTime.text()) <= 30000: # 定时时间1ms~30s内 if self.checkBoxReapitSend.isChecked(): self.timer_send.start(int(self.lineEditTime.text())) self.lineEditTime.setEnabled(False) else: self.timer_send.stop() self.lineEditTime.setEnabled(True) else: QMessageBox.critical(self, '定时发送数据异常', '定时发送数据周期仅可设置在30秒内!') except: QMessageBox.critical(self, '定时发送数据异常', '请设置正确的数值类型!') # %%发送数据 def data_send(self): if self.ser.isOpen(): input_s = self.textEditSend.toPlainText() # 判断是否为非空字符串 if input_s != "": # 时间显示 if self.checkBoxAddDate.isChecked(): self.textEditReceive.insertPlainText((time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) + " ") # HEX发送 if self.checkBoxHexSend.isChecked(): input_s = input_s.strip() send_list = [] while input_s != '': try: num = int(input_s[0:2], 16) except ValueError: QMessageBox.critical(self, '串口异常', '请输入规范十六进制数据,以空格分开!') return None input_s = input_s[2:].strip() send_list.append(num) input_s = bytes(send_list) # ASCII发送 else: input_s = (input_s).encode('utf-8') # HEX接收显示 if self.checkBoxHexReceive.isChecked(): out_s = '' for i in range(0, len(input_s)): out_s = out_s + '{:02X}'.format(input_s[i]) + ' ' self.textEditReceive.insertPlainText(out_s) # ASCII接收显示 else: self.textEditReceive.insertPlainText(input_s.decode('utf-8')) # 接收换行 if self.checkBoxCRLF.isChecked(): self.textEditReceive.insertPlainText('\r\n') # 获取到Text光标 textCursor = self.textEditReceive.textCursor() # 滚动到底部 textCursor.movePosition(textCursor.End) # 设置光标到Text中去 self.textEditReceive.setTextCursor(textCursor) # 统计发送字符数量 num = self.ser.write(input_s) self.data_num_sended += num self.lineEditSendNum.setText(str(self.data_num_sended)) else: pass # %%保存日志 def savefiles(self): dlg = QFileDialog() filenames = dlg.getSaveFileName(None, "保存日志文件", None, "Txt files(*.txt)") try: with open(file = filenames[0], mode='w', encoding='utf-8') as file: file.write(self.textEditReceive.toPlainText()) except: QMessageBox.critical(self, '日志异常', '保存日志文件失败!') #%% 加载日志 def openfiles(self): dlg = QFileDialog() filenames = dlg.getOpenFileName(None, "加载日志文件", None, "Txt files(*.txt)") try: with open(file = filenames[0], mode='r', encoding='utf-8') as file: self.textEditSend.setPlainText(file.read()) except: QMessageBox.critical(self, '日志异常', '加载日志文件失败!') # %%打开博客链接和公众号二维码 #webbrowser.open('https://blog.csdn.net/m0_38106923') # %%清除发送数据显示 def send_data_clear(self): self.textEditSend.setText("") self.data_num_sended = 0 self.lineEditSendNum.setText(str(self.data_num_sended)) # 清除接收数据显示 def receive_data_clear(self): self.textEditReceive.setText("") self.data_num_received = 0 self.lineEditSendNum.setText(str(self.data_num_received)) # 关闭串口 def port_close(self): try: self.timer.stop() self.timer_send.stop() self.ser.close() except: QMessageBox.critical(self, '串口异常', '关闭串口失败,请重启程序!') return None # 切换开关串口按钮使能状态和定时发送使能状态 self.pushButtonOpenSerial.setEnabled(True) self.pushButtonCloseSerial.setEnabled(False) self.lineEditTime.setEnabled(True) self.comboBoxBaudrate.setEnabled(True) self.comboBoxSerial.setEnabled(True) # 发送数据和接收数据数目置零 self.data_num_sended = 0 self.lineEditSendNum.setText(str(self.data_num_sended)) self.data_num_received = 0 self.lineEditReceiveNum.setText(str(self.data_num_received)) #self.formGroupBox1.setTitle("串口状态(关闭)") def btn_start_clicked(self): #开启按钮 self.filenameCsv= self.file + ".csv" with open(self.filenameCsv, mode='a', newline='') as file: # 创建一个写入器对象 writer = csv.writer(file) # 写入数据,这里假设CSV文件有标题行 writer.writerow(('time','Methane', 'Air Temp', 'Laser Temp', 'Laser Intensity')) # 线程里面写入处理好的数据 self.update_data_thread.SetFilenameCsv(self.filenameCsv) self.update_data_thread.start() self.pushButtonStartPlot.setEnabled(False) self.pushButtonStopPlot.setEnabled(True) self.radioButtonCH4QX.setEnabled(False) self.radioButtonCH4TF.setEnabled(False) self.comboBoxPlot.addItems(["甲烷浓度","环境温度","激光器温度","激光强度"]) self.comboBoxPlot.setCurrentText("甲烷浓度") self.update_data_thread.SetPlotItem(0) self.update_data_thread.restart() def btn_stop_clicked(self): self.update_data_thread.stop() self.pushButtonStartPlot.setEnabled(True) self.pushButtonStopPlot.setEnabled(False) self.radioButtonCH4QX.setEnabled(True) self.radioButtonCH4TF.setEnabled(True) self.comboBoxPlot.clear() def update_data_thread_slot(self, data): # 线程回调函数 #data = json.loads(data) self.plot_qchart.handle_update(float(data)) def plot_item_changed(self,index): print(index) self.update_data_thread.SetPlotItem(index) #self.plot_qchart.series.replace([]) #执行 if __name__ == '__main__': app = QtWidgets.QApplication(sys.argv) myshow = PyQt5Serial() myshow.show() sys.exit(app.exec_())