2024-07-12 08:52:27 +08:00

612 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
Created on Mon May 27 14:31:38 2024
@author: WANGXIBAO
"""
import sys
import serial
import serial.tools.list_ports
import time,datetime
from PyQt5 import QtWidgets
from PyQt5.Qt import QPainter
from PyQt5.QtWidgets import QMessageBox ,QFileDialog
from PyQt5.QtCore import QTimer
from PyUartUi import Ui_UartAssistant
from UartDataPolt import QChartViewPlot,UpdateDataThread,GetDataQX,GetDataTF,GetDataOther
from PyQt5.QtChart import QChartView
from PyQt5.QtGui import QIcon
class PyQt5Serial(QtWidgets.QWidget,Ui_UartAssistant):
# %%初始化程序
def __init__(self):
super(PyQt5Serial, self).__init__()
self.setupUi(self)
self.update_data_thread = UpdateDataThread() # 创建更新波形数据线程
self.get_data_qx = GetDataQX()
self.get_data_qx.IndOfReturn(0) #根据数据特点给一个初始值
self.get_data_tf = GetDataTF()
self.get_data_tf.IndOfReturn(0) #根据数据特点给一个初始值
self.get_data_other = GetDataOther()
self.get_data_other.IndOfReturn(0) #根据数据特点给一个初始值
self.init()
self.ser = serial.Serial() #创建一个空对象
self.port_check()
# 设置Logo和标题
self.setWindowIcon(QIcon('./favicon.ico'))
self.setWindowTitle("调试助手")
# 设置禁止拉伸窗口大小
self.setFixedSize(self.width(), self.height())
# 发送数据和接收数据数目置零
self.data_num_sended = 0
self.lineEditSendNum.setText(str(self.data_num_sended))
self.data_num_received = 0
self.lineEditReceiveNum.setText(str(self.data_num_received))
# 串口关闭按钮使能关闭
self.pushButtonCloseSerial.setEnabled(False)
# 发送框、文本框清除
self.textEditReceive.setText("")
self.textEditSend.setText("")
# %%第二个TAB 初始化画图页面表格
self.pushButtonStopPlot.setEnabled(False)
self.pushButtonStartPlot.setEnabled(True)
self.radioButtonCH4QX.setEnabled(True)
self.radioButtonCH4TF.setEnabled(True)
self.checkBoxAutoSaveCsv.setEnabled(False)
# 加载Qchart波形界面
self.plot_qchart = QChartViewPlot()
self.plot_view.setChart(self.plot_qchart)
self.plot_view.setRenderHint(QPainter.Antialiasing) # 抗锯齿
self.plot_view.setRubberBand(QChartView.RectangleRubberBand)
# 用于暂存接收的串口数据
self.buffer = b''
# 用于暂存解码数据
self.lineUtf8 = ""
# 用于标志是否开始存CSV
self.CsvFlag = 0
# =============================================================================
# def wheelEvent(self, event):
# if self.plot_view.underMouse:
# # 鼠标滚轮:缩放Qchart波形
# if event.angleDelta().y() >= 0:
# # 鼠标滚轮向上
# if event.x() < (
# self.plot_view.width() + self.plot_view.x()) and event.x() > self.plot_view.x():
# if event.y() < (
# self.plot_view.height() + self.plot_view.y()) and event.y() > self.plot_view.y():
# self.plot_qchart.zoomIn()
# else:
# # 鼠标滚轮向下
# if event.x() < (
# self.plot_view.width() + self.plot_view.x()) and event.x() > self.plot_view.x():
# if event.y() < (
# self.plot_view.height() + self.plot_view.y()) and event.y() > self.plot_view.y():
# self.plot_qchart.zoomOut()
# =============================================================================
#%% 重写关闭按钮
def closeEvent(self, event):
self.update_data_thread.stop()
# 可选:加入超时判断,防止线程未能正常退出导致程序无法关闭
if not self.update_data_thread.wait(1000): # 等待5秒
print("线程未在指定时间内退出,可能需要进一步处理")
# 如果串口已经打开,则关闭串口
if self.ser.is_open:
self.port_close()
# 调用父类的关闭事件处理函数
super().closeEvent(event)
# %%建立控件信号与槽关系
def init(self):
# 串口检测按钮
self.pushButtonTestSerial.clicked.connect(self.port_check)
# 串口打开按钮
self.pushButtonOpenSerial.clicked.connect(self.port_open)
# 串口关闭按钮
self.pushButtonCloseSerial.clicked.connect(self.port_close)
# 定时发送数据
self.timer_send = QTimer()
self.timer_send.timeout.connect(self.data_send)
self.checkBoxReapitSend.stateChanged.connect(self.data_send_timer)
# 发送数据按钮
self.pushButtonSend.clicked.connect(self.data_send)
# 保存日志
self.pushButtonLogSave.clicked.connect(self.savefiles)
# 加载日志
self.pushButtonLogLoad.clicked.connect(self.openfiles)
# 跳转链接
#self.commandLinkButton1.clicked.connect(self.link)
# 清除发送按钮
self.pushButtonClearSend.clicked.connect(self.send_data_clear)
# 清除接收按钮
self.pushButtonClearReceive.clicked.connect(self.receive_data_clear)
# 开始绘图
self.pushButtonStartPlot.clicked.connect(self.btn_start_clicked)
# 关闭绘图
self.pushButtonStopPlot.clicked.connect(self.btn_stop_clicked)
#线程信号发射
self.update_data_thread._signal_update.connect(self.update_data_thread_slot)
# 选择绘图
self.comboBoxPlot.currentIndexChanged.connect(self.plot_item_changed)
# 重置绘图
self.pushButtonResetPlot.clicked.connect(self.plot_reset)
# %% 串口检测
def port_check(self):
# 检测所有存在的串口,将信息存储在字典中
self.Com_Dict = {}
port_list = list(serial.tools.list_ports.comports())
self.comboBoxSerial.clear()
for port in port_list:
self.Com_Dict["%s" % port[0]] = "%s" % port[1]
self.comboBoxSerial.addItem(port[0])
# 无串口判断
if len(self.Com_Dict) == 0:
self.comboBoxSerial.addItem("无串口")
# %%打开串口
def port_open(self):
self.ser.port = self.comboBoxSerial.currentText() # 串口号
self.ser.baudrate = int(self.comboBoxBaudrate.currentText()) # 波特率
self.ser.timeout = 0.01
flag_data = int(self.comboBoxDataBits.currentText()) # 数据位
if flag_data == 5:
self.ser.bytesize = serial.FIVEBITS
elif flag_data == 6:
self.ser.bytesize = serial.SIXBITS
elif flag_data == 7:
self.ser.bytesize = serial.SEVENBITS
else:
self.ser.bytesize = serial.EIGHTBITS
flag_data = self.comboBoxCheckBit.currentText() # 校验位
if flag_data == "None":
self.ser.parity = serial.PARITY_NONE
elif flag_data == "Odd":
self.ser.parity = serial.PARITY_ODD
else:
self.ser.parity = serial.PARITY_EVEN
flag_data = int(self.comboBoxStopBit.currentText()) # 停止位
if flag_data == 1:
self.ser.stopbits = serial.STOPBITS_ONE
else:
self.ser.stopbits = serial.STOPBITS_TWO
flag_data = self.comboBoxFlow.currentText() # 流控
if flag_data == "No Ctrl Flow":
self.ser.xonxoff = False #软件流控
self.ser.dsrdtr = False #硬件流控 DTR
self.ser.rtscts = False #硬件流控 RTS
elif flag_data == "SW Ctrl Flow":
self.ser.xonxoff = True #软件流控
else:
if self.checkBoxDTR.isChecked():
self.ser.dsrdtr = True #硬件流控 DTR
if self.checkBoxRTS.isChecked():
self.ser.rtscts = True #硬件流控 RTS
try:
time.sleep(0.1)
if self.ser.is_open:
self.ser.close()
self.ser.open()
except:
QMessageBox.critical(self, "串口异常", "此串口不能被打开!")
return None
# 串口打开后,切换开关串口按钮使能状态,防止失误操作
if self.ser.isOpen():
self.pushButtonOpenSerial.setEnabled(False)
self.pushButtonCloseSerial.setEnabled(True)
self.comboBoxBaudrate.setEnabled(False)
self.comboBoxSerial.setEnabled(False)
#self.formGroupBox1.setTitle("串口状态(开启)")
#日志保存
# 格式化日期时间字符串,用于文件名
# 例如2024-05-28_12-34-56.txt
self.file =time.strftime("%Y%m%d%H%M%S", time.localtime())
self.filename = self.file + ".txt"
# 定时器接收数据
self.timer = QTimer()
self.timer.timeout.connect(self.data_receive)
# 打开串口接收定时器周期为1ms
self.timer.start(20)
# %%接收数据
def data_receive(self):
try:
num = self.ser.inWaiting()
if num>0 :
#print("接收数据",datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"))
data = self.ser.read(num)
self.buffer+=data
#print("接收完成",datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"))
if self.checkBoxAddDate.isChecked():
nowTime = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
nowTime = nowTime[:-3]
self.textEditReceive.insertPlainText(nowTime + " ")
# HEX显示数据
if self.checkBoxHexReceive.checkState():
out_s = ''
for i in range(0, len(data)):
out_s = out_s + '{:02X}'.format(data[i]) + ' '
self.textEditReceive.insertPlainText(out_s)
# ASCII显示数据
else:
#print("解码前",datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"))
self.textEditReceive.insertPlainText(data.decode('utf-8',errors='replace'))
#print("解码数据",datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"))
# 接收换行
if self.checkBoxCRLF.isChecked():
self.textEditReceive.insertPlainText('\r\n')
# 获取到text光标
textCursor = self.textEditReceive.textCursor()
# 滚动到底部
textCursor.movePosition(textCursor.End)
# 设置光标到text中去
self.textEditReceive.setTextCursor(textCursor)
# 统计接收字符的数量
self.data_num_received += num
self.lineEditReceiveNum.setText(str(self.data_num_received))
# 自动保存日志
if self.checkBoxAutoSaveLog.isChecked():
self.AutoSaveLog()
except:
QMessageBox.critical(self, '串口异常', '串口接收数据异常,请重新连接设备!')
self.port_close()
return None
# %%定时发送数据
def data_send_timer(self):
try:
if 1<= int(self.lineEditTime.text()) <= 30000: # 定时时间1ms~30s内
if self.checkBoxReapitSend.isChecked():
self.timer_send.start(int(self.lineEditTime.text()))
self.lineEditTime.setEnabled(False)
else:
self.timer_send.stop()
self.lineEditTime.setEnabled(True)
else:
QMessageBox.critical(self, '定时发送数据异常', '定时发送数据周期仅可设置在30秒内')
except:
QMessageBox.critical(self, '定时发送数据异常', '请设置正确的数值类型!')
# %%发送数据
def data_send(self):
if self.ser.isOpen():
input_s = self.textEditSend.toPlainText()
# 判断是否为非空字符串
if input_s != "":
# 时间显示
if self.checkBoxAddDate.isChecked():
self.textEditReceive.insertPlainText((time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) + " ")
# HEX发送
if self.checkBoxHexSend.isChecked():
input_s = input_s.strip()
send_list = []
while input_s != '':
try:
num = int(input_s[0:2], 16)
except ValueError:
QMessageBox.critical(self, '串口异常', '请输入规范十六进制数据,以空格分开!')
return None
input_s = input_s[2:].strip()
send_list.append(num)
input_s = bytes(send_list)
# ASCII发送
else:
input_s = (input_s).encode('utf-8')
# HEX接收显示
if self.checkBoxHexReceive.isChecked():
out_s = ''
for i in range(0, len(input_s)):
out_s = out_s + '{:02X}'.format(input_s[i]) + ' '
self.textEditReceive.insertPlainText(out_s)
# ASCII接收显示
else:
self.textEditReceive.insertPlainText(input_s.decode('utf-8'))
# 接收换行
if self.checkBoxCRLF.isChecked():
self.textEditReceive.insertPlainText('\r\n')
# 获取到Text光标
textCursor = self.textEditReceive.textCursor()
# 滚动到底部
textCursor.movePosition(textCursor.End)
# 设置光标到Text中去
self.textEditReceive.setTextCursor(textCursor)
# 统计发送字符数量
num = self.ser.write(input_s)
self.data_num_sended += num
self.lineEditSendNum.setText(str(self.data_num_sended))
else:
pass
# %%保存日志
def savefiles(self):
dlg = QFileDialog()
filenames = dlg.getSaveFileName(None, "保存日志文件", None, "Txt files(*.txt)")
try:
with open(file = filenames[0], mode='w', encoding='utf-8') as file:
file.write(self.textEditReceive.toPlainText())
except:
QMessageBox.critical(self, '日志异常', '保存日志文件失败!')
#%% 加载日志
def openfiles(self):
dlg = QFileDialog()
filenames = dlg.getOpenFileName(None, "加载日志文件", None, "Txt files(*.txt)")
try:
with open(file = filenames[0], mode='r', encoding='utf-8') as file:
self.textEditSend.setPlainText(file.read())
except:
QMessageBox.critical(self, '日志异常', '加载日志文件失败!')
# %%打开博客链接和公众号二维码
#webbrowser.open('https://blog.csdn.net/m0_38106923')
# %%清除发送数据显示
# 自动保存日志 log he csv
def AutoSaveLog(self):
try:
lines = self.buffer.split(b'\n') # 或者使用 b'\r\n' 根据你的需要
# 最后一个元素可能不包含完整的行,所以将其保留作为新的缓存
self.buffer = lines.pop()
# 处理每一行数据
for line in lines:
# 注意:每行数据可能不包含结尾的换行符,所以在处理前检查一下
if line.endswith(b'\r'):
line = line[:-1] # 移除回车
# 写入当前日期的文件,打开文件,如果文件不存在则创建,如果存在则追加内容
with open(self.filename, 'a', encoding='utf-8',newline='') as file:
#print (data.decode('utf-8'))
self.lineUtf8 = line.decode('utf-8')
saveData = (time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) + " " + self.lineUtf8 + "\r\n"
#print("saveData:",saveData)
file.write(saveData) # 写入内容
#判断选择的何种格式数据
if self.radioButtonCH4QX.isChecked() and self.lineUtf8[:2]=="A+":
print(self.lineUtf8)
dataSplit = self.get_data_qx.Transdata(self.lineUtf8)
self.filenameCsv= self.file + ".csv"
if self.checkBoxAutoSaveCsv.isChecked() and self.CsvFlag: #写入CSV文件
try:
self.get_data_qx.SaveCsv(self.filenameCsv)
except:
print("写入CSV失败")
pass
self.update_data_thread.SetFlag(1)
self.update_data_thread.SetReceiveData(dataSplit)
#更新当前数据
self.lineEditCurrentValue.setText(str(round(dataSplit,3)))
self.lineEditWindowMean.setText (str(round(self.plot_qchart.windowAverage,3)))
self.lineEditWindowMSE.setText (str(round(self.plot_qchart.windowStd,3)))
#print(dataSplit)
#print("接收发数",datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"))
elif self.radioButtonCH4TF.isChecked() :
print(self.lineUtf8)
dataSplit = self.get_data_tf.Transdata(self.lineUtf8)
#print("dataSplit",type(dataSplit))
if isinstance(dataSplit, (float, int)):
self.filenameCsv= self.file + ".csv"
if self.checkBoxAutoSaveCsv.isChecked() and self.CsvFlag: #写入CSV文件
try:
self.get_data_tf.SaveCsv(self.filenameCsv)
except:
print("写入CSV失败")
pass
self.update_data_thread.SetFlag(1)
#print("dataSplit",dataSplit)
self.update_data_thread.SetReceiveData(dataSplit)
#更新当前数据
self.lineEditCurrentValue.setText(str(round(dataSplit,3)))
self.lineEditWindowMean.setText (str(round(self.plot_qchart.windowAverage,3)))
self.lineEditWindowMSE.setText (str(round(self.plot_qchart.windowStd,3)))
else:
print("dataSplit type:",type(dataSplit))
elif self.radioButtonOtherData.isChecked() :
print(self.lineUtf8)
dataSplit = self.get_data_other.Transdata(self.lineUtf8)
#print("dataSplit",type(dataSplit))
if isinstance(dataSplit, (float, int)):
self.filenameCsv= self.file + ".csv"
if self.checkBoxAutoSaveCsv.isChecked() and self.CsvFlag: #写入CSV文件
try:
self.get_data_other.SaveCsv(self.filenameCsv)
except:
print("写入CSV失败")
pass
self.update_data_thread.SetFlag(1)
#print("dataSplit",dataSplit)
self.update_data_thread.SetReceiveData(dataSplit)
#更新当前数据
self.lineEditCurrentValue.setText(str(round(dataSplit,3)))
self.lineEditWindowMean.setText (str(round(self.plot_qchart.windowAverage,3)))
self.lineEditWindowMSE.setText (str(round(self.plot_qchart.windowStd,3)))
else:
print("dataSplit type:",type(dataSplit))
except Exception as e:
print(f"Error reading configuration: {e}")
print("自动保存日志失败")
pass
def send_data_clear(self):
self.textEditSend.setText("")
self.data_num_sended = 0
self.lineEditSendNum.setText(str(self.data_num_sended))
# 清除接收数据显示
def receive_data_clear(self):
self.textEditReceive.setText("")
self.data_num_received = 0
self.lineEditSendNum.setText(str(self.data_num_received))
# 关闭串口
def port_close(self):
try:
self.timer.stop()
self.timer_send.stop()
self.btn_stop_clicked() #执行停止绘图操作
self.ser.close()
except:
QMessageBox.critical(self, '串口异常', '关闭串口失败,请重启程序!')
return None
# 切换开关串口按钮使能状态和定时发送使能状态
self.pushButtonOpenSerial.setEnabled(True)
self.pushButtonCloseSerial.setEnabled(False)
self.lineEditTime.setEnabled(True)
self.comboBoxBaudrate.setEnabled(True)
self.comboBoxSerial.setEnabled(True)
# 发送数据和接收数据数目置零
self.data_num_sended = 0
self.lineEditSendNum.setText(str(self.data_num_sended))
self.data_num_received = 0
self.lineEditReceiveNum.setText(str(self.data_num_received))
#self.formGroupBox1.setTitle("串口状态(关闭)")
def btn_start_clicked(self):
#开启按钮
self.update_data_thread.start()
self.update_data_thread.restart()
self.pushButtonStartPlot.setEnabled(False)
self.pushButtonStopPlot.setEnabled(True)
self.radioButtonCH4QX.setEnabled(False)
self.radioButtonCH4TF.setEnabled(False)
self.radioButtonOtherData.setEnabled(False)
self.checkBoxAutoSaveCsv.setEnabled(True)
if self.radioButtonCH4QX.isChecked():
self.comboBoxPlot.addItems(["甲烷浓度","环境温度","激光器温度","激光强度"])
self.comboBoxPlot.setCurrentText("甲烷浓度")
self.get_data_qx.IndOfReturn(0)
elif self.radioButtonCH4TF.isChecked():
iterm = self.get_data_tf.rowTitle[1:]
print("iterm",iterm)
self.comboBoxPlot.addItems(iterm)
elif self.radioButtonOtherData.isChecked():
iterm = self.get_data_other.rowTitle[1:]
print("iterm",iterm)
self.comboBoxPlot.addItems(iterm)
#self.update_data_thread.SetPlotItem(0)
self.update_data_thread.restart()
self.CsvFlag = 1
def btn_stop_clicked(self):
self.update_data_thread.stop()
self.pushButtonStartPlot.setEnabled(True)
self.pushButtonStopPlot.setEnabled(False)
self.radioButtonCH4QX.setEnabled(True)
self.radioButtonCH4TF.setEnabled(True)
self.radioButtonOtherData.setEnabled(True)
self.checkBoxAutoSaveCsv.setEnabled(False)
self.comboBoxPlot.clear()
self.CsvFlag = 0
def update_data_thread_slot(self, data):
# 线程回调函数
#data = json.loads(data)
self.plot_qchart.handle_update(float(data))
#print("thread ",data)
def plot_item_changed(self,index):
print(index)
self.plot_qchart.clearSeries()
#self.update_data_thread.SetPlotItem(index)
self.get_data_qx.IndOfReturn(index)
#self.plot_qchart.series.replace([])
self.get_data_tf.IndOfReturn(index)
self.get_data_other.IndOfReturn(index)
def plot_reset(self):
self.plot_qchart.zoomReset()
#执行
if __name__ == '__main__':
app = QtWidgets.QApplication(sys.argv)
myshow = PyQt5Serial()
myshow.show()
sys.exit(app.exec_())