511 lines
20 KiB
Python
511 lines
20 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
Created on Mon May 27 14:31:38 2024
|
||
|
||
@author: WANGXIBAO
|
||
"""
|
||
|
||
import sys
|
||
import serial,csv
|
||
import serial.tools.list_ports
|
||
import time,datetime
|
||
|
||
from PyQt5 import QtWidgets
|
||
from PyQt5.Qt import QPainter
|
||
from PyQt5.QtWidgets import QMessageBox ,QFileDialog
|
||
from PyQt5.QtCore import QTimer
|
||
from PyUartUi import Ui_UartAssistant
|
||
from UartDataPolt import QChartViewPlot,UpdateDataThread,GetDataQX
|
||
from PyQt5.QtChart import QChart, QValueAxis, QChartView, QSplineSeries
|
||
from PyQt5.QtGui import QIcon
|
||
|
||
class PyQt5Serial(QtWidgets.QWidget,Ui_UartAssistant):
|
||
# %%初始化程序
|
||
def __init__(self):
|
||
super(PyQt5Serial, self).__init__()
|
||
self.setupUi(self)
|
||
self.update_data_thread = UpdateDataThread() # 创建更新波形数据线程
|
||
self.get_data_qx = GetDataQX()
|
||
self.init()
|
||
self.ser = serial.Serial() #创建一个空对象
|
||
self.port_check()
|
||
|
||
# 设置Logo和标题
|
||
self.setWindowIcon(QIcon('D:/workspace/py/PyUartAssistant/favicon.png'))
|
||
self.setWindowTitle("调试助手")
|
||
# 设置禁止拉伸窗口大小
|
||
self.setFixedSize(self.width(), self.height())
|
||
|
||
# 发送数据和接收数据数目置零
|
||
self.data_num_sended = 0
|
||
self.lineEditSendNum.setText(str(self.data_num_sended))
|
||
self.data_num_received = 0
|
||
self.lineEditReceiveNum.setText(str(self.data_num_received))
|
||
|
||
# 串口关闭按钮使能关闭
|
||
self.pushButtonCloseSerial.setEnabled(False)
|
||
|
||
# 发送框、文本框清除
|
||
self.textEditReceive.setText("")
|
||
self.textEditSend.setText("")
|
||
# %%第二个TAB 初始化画图页面表格
|
||
self.pushButtonStopPlot.setEnabled(False)
|
||
self.pushButtonStartPlot.setEnabled(True)
|
||
self.radioButtonCH4QX.setEnabled(True)
|
||
self.radioButtonCH4TF.setEnabled(True)
|
||
|
||
# 加载Qchart波形界面
|
||
self.plot_qchart = QChartViewPlot()
|
||
self.plot_view.setChart(self.plot_qchart)
|
||
self.plot_view.setRenderHint(QPainter.Antialiasing) # 抗锯齿
|
||
self.plot_view.setRubberBand(QChartView.RectangleRubberBand)
|
||
|
||
# =============================================================================
|
||
# def wheelEvent(self, event):
|
||
# if self.plot_view.underMouse:
|
||
# # 鼠标滚轮:缩放Qchart波形
|
||
# if event.angleDelta().y() >= 0:
|
||
# # 鼠标滚轮向上
|
||
# if event.x() < (
|
||
# self.plot_view.width() + self.plot_view.x()) and event.x() > self.plot_view.x():
|
||
# if event.y() < (
|
||
# self.plot_view.height() + self.plot_view.y()) and event.y() > self.plot_view.y():
|
||
# self.plot_qchart.zoomIn()
|
||
# else:
|
||
# # 鼠标滚轮向下
|
||
# if event.x() < (
|
||
# self.plot_view.width() + self.plot_view.x()) and event.x() > self.plot_view.x():
|
||
# if event.y() < (
|
||
# self.plot_view.height() + self.plot_view.y()) and event.y() > self.plot_view.y():
|
||
# self.plot_qchart.zoomOut()
|
||
# =============================================================================
|
||
#%% 重写关闭按钮
|
||
def closeEvent(self, event):
|
||
self.update_data_thread.stop()
|
||
# 可选:加入超时判断,防止线程未能正常退出导致程序无法关闭
|
||
if not self.update_data_thread.wait(1000): # 等待5秒
|
||
print("线程未在指定时间内退出,可能需要进一步处理")
|
||
|
||
|
||
# 如果串口已经打开,则关闭串口
|
||
if self.ser.is_open:
|
||
self.port_close()
|
||
# 调用父类的关闭事件处理函数
|
||
super().closeEvent(event)
|
||
|
||
# %%建立控件信号与槽关系
|
||
def init(self):
|
||
# 串口检测按钮
|
||
self.pushButtonTestSerial.clicked.connect(self.port_check)
|
||
# 串口打开按钮
|
||
self.pushButtonOpenSerial.clicked.connect(self.port_open)
|
||
# 串口关闭按钮
|
||
self.pushButtonCloseSerial.clicked.connect(self.port_close)
|
||
|
||
# 定时发送数据
|
||
self.timer_send = QTimer()
|
||
self.timer_send.timeout.connect(self.data_send)
|
||
self.checkBoxReapitSend.stateChanged.connect(self.data_send_timer)
|
||
|
||
# 发送数据按钮
|
||
self.pushButtonSend.clicked.connect(self.data_send)
|
||
|
||
# 保存日志
|
||
self.pushButtonLogSave.clicked.connect(self.savefiles)
|
||
# 加载日志
|
||
self.pushButtonLogLoad.clicked.connect(self.openfiles)
|
||
|
||
# 跳转链接
|
||
#self.commandLinkButton1.clicked.connect(self.link)
|
||
|
||
# 清除发送按钮
|
||
self.pushButtonClearSend.clicked.connect(self.send_data_clear)
|
||
|
||
# 清除接收按钮
|
||
self.pushButtonClearReceive.clicked.connect(self.receive_data_clear)
|
||
|
||
# 开始绘图
|
||
self.pushButtonStartPlot.clicked.connect(self.btn_start_clicked)
|
||
# 关闭绘图
|
||
self.pushButtonStopPlot.clicked.connect(self.btn_stop_clicked)
|
||
#线程信号发射
|
||
self.update_data_thread._signal_update.connect(self.update_data_thread_slot)
|
||
|
||
self.comboBoxPlot.currentIndexChanged.connect(self.plot_item_changed)
|
||
|
||
# %% 串口检测
|
||
def port_check(self):
|
||
# 检测所有存在的串口,将信息存储在字典中
|
||
self.Com_Dict = {}
|
||
port_list = list(serial.tools.list_ports.comports())
|
||
|
||
self.comboBoxSerial.clear()
|
||
for port in port_list:
|
||
self.Com_Dict["%s" % port[0]] = "%s" % port[1]
|
||
self.comboBoxSerial.addItem(port[0])
|
||
|
||
# 无串口判断
|
||
if len(self.Com_Dict) == 0:
|
||
self.comboBoxSerial.addItem("无串口")
|
||
# %%打开串口
|
||
def port_open(self):
|
||
self.ser.port = self.comboBoxSerial.currentText() # 串口号
|
||
self.ser.baudrate = int(self.comboBoxBaudrate.currentText()) # 波特率
|
||
self.ser.timeout = 0.1
|
||
|
||
|
||
|
||
flag_data = int(self.comboBoxDataBits.currentText()) # 数据位
|
||
if flag_data == 5:
|
||
self.ser.bytesize = serial.FIVEBITS
|
||
elif flag_data == 6:
|
||
self.ser.bytesize = serial.SIXBITS
|
||
elif flag_data == 7:
|
||
self.ser.bytesize = serial.SEVENBITS
|
||
else:
|
||
self.ser.bytesize = serial.EIGHTBITS
|
||
|
||
flag_data = self.comboBoxCheckBit.currentText() # 校验位
|
||
if flag_data == "None":
|
||
self.ser.parity = serial.PARITY_NONE
|
||
elif flag_data == "Odd":
|
||
self.ser.parity = serial.PARITY_ODD
|
||
else:
|
||
self.ser.parity = serial.PARITY_EVEN
|
||
|
||
flag_data = int(self.comboBoxStopBit.currentText()) # 停止位
|
||
if flag_data == 1:
|
||
self.ser.stopbits = serial.STOPBITS_ONE
|
||
else:
|
||
self.ser.stopbits = serial.STOPBITS_TWO
|
||
|
||
flag_data = self.comboBoxFlow.currentText() # 流控
|
||
if flag_data == "No Ctrl Flow":
|
||
self.ser.xonxoff = False #软件流控
|
||
self.ser.dsrdtr = False #硬件流控 DTR
|
||
self.ser.rtscts = False #硬件流控 RTS
|
||
elif flag_data == "SW Ctrl Flow":
|
||
self.ser.xonxoff = True #软件流控
|
||
else:
|
||
if self.checkBoxDTR.isChecked():
|
||
self.ser.dsrdtr = True #硬件流控 DTR
|
||
if self.checkBoxRTS.isChecked():
|
||
self.ser.rtscts = True #硬件流控 RTS
|
||
try:
|
||
time.sleep(0.1)
|
||
if self.ser.is_open:
|
||
self.ser.close()
|
||
self.ser.open()
|
||
except:
|
||
QMessageBox.critical(self, "串口异常", "此串口不能被打开!")
|
||
return None
|
||
|
||
# 串口打开后,切换开关串口按钮使能状态,防止失误操作
|
||
if self.ser.isOpen():
|
||
self.pushButtonOpenSerial.setEnabled(False)
|
||
self.pushButtonCloseSerial.setEnabled(True)
|
||
self.comboBoxBaudrate.setEnabled(False)
|
||
self.comboBoxSerial.setEnabled(False)
|
||
#self.formGroupBox1.setTitle("串口状态(开启)")
|
||
|
||
#日志保存
|
||
# 格式化日期时间字符串,用于文件名
|
||
# 例如:2024-05-28_12-34-56.txt
|
||
self.file =time.strftime("%Y%m%d%H%M%S", time.localtime())
|
||
self.filename = self.file + ".txt"
|
||
|
||
|
||
# 定时器接收数据
|
||
self.timer = QTimer()
|
||
self.timer.timeout.connect(self.data_receive)
|
||
# 打开串口接收定时器,周期为1ms
|
||
self.timer.start(1)
|
||
|
||
# %%接收数据
|
||
def data_receive(self):
|
||
try:
|
||
num = self.ser.inWaiting()
|
||
|
||
# =============================================================================
|
||
# if num > 0:
|
||
# time.sleep(0.1)
|
||
# num = self.ser.inWaiting() #延时,再读一次数据,确保数据完整性
|
||
# =============================================================================
|
||
except:
|
||
QMessageBox.critical(self, '串口异常', '串口接收数据异常,请重新连接设备!')
|
||
self.port_close()
|
||
return None
|
||
|
||
if num > 0:
|
||
#data = self.ser.read(num)
|
||
print("接收数据",datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"))
|
||
data = self.ser.readline()
|
||
num = len(data)
|
||
|
||
# 时间显示
|
||
if self.checkBoxAddDate.isChecked():
|
||
nowTime = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
|
||
nowTime = nowTime[:-3]
|
||
self.textEditReceive.insertPlainText(nowTime + " ")
|
||
|
||
# HEX显示数据
|
||
if self.checkBoxHexReceive.checkState():
|
||
out_s = ''
|
||
for i in range(0, len(data)):
|
||
out_s = out_s + '{:02X}'.format(data[i]) + ' '
|
||
|
||
self.textEditReceive.insertPlainText(out_s)
|
||
# ASCII显示数据
|
||
else:
|
||
self.textEditReceive.insertPlainText(data.decode('utf-8',errors='replace'))
|
||
print("解码数据",datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"))
|
||
# 接收换行
|
||
if self.checkBoxCRLF.isChecked():
|
||
self.textEditReceive.insertPlainText('\r\n')
|
||
|
||
# 获取到text光标
|
||
textCursor = self.textEditReceive.textCursor()
|
||
# 滚动到底部
|
||
textCursor.movePosition(textCursor.End)
|
||
# 设置光标到text中去
|
||
self.textEditReceive.setTextCursor(textCursor)
|
||
|
||
# 统计接收字符的数量
|
||
self.data_num_received += num
|
||
self.lineEditReceiveNum.setText(str(self.data_num_received))
|
||
|
||
|
||
|
||
try:
|
||
self.dataReceive = data.decode('utf-8')
|
||
print("二次解码数据",datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"))
|
||
|
||
# 写入当前日期的文件
|
||
#print(data.decode('utf-8'))
|
||
# 打开文件,如果文件不存在则创建,如果存在则追加内容
|
||
#=============================================================================
|
||
if self.checkBoxAutoSaveLog.isChecked():
|
||
with open(self.filename, 'a', encoding='utf-8',newline='') as file:
|
||
#print (data.decode('utf-8'))
|
||
saveData = (time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) + " " + data.decode('utf-8')
|
||
|
||
file.write(saveData) # 写入内容
|
||
#=============================================================================
|
||
|
||
# 发送标志位给显示页面预处理
|
||
self.dataReceive = data.decode('utf-8')
|
||
if self.dataReceive[:2]=="A+":
|
||
|
||
|
||
dataSplit = self.get_data_qx.Transdata(self.dataReceive)
|
||
|
||
|
||
self.update_data_thread.SetFlag(1)
|
||
self.update_data_thread.SetReceiveData(dataSplit)
|
||
print("接收发数",datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"))
|
||
except:
|
||
pass
|
||
else:
|
||
pass
|
||
|
||
|
||
# %%定时发送数据
|
||
def data_send_timer(self):
|
||
try:
|
||
if 1<= int(self.lineEditTime.text()) <= 30000: # 定时时间1ms~30s内
|
||
if self.checkBoxReapitSend.isChecked():
|
||
self.timer_send.start(int(self.lineEditTime.text()))
|
||
self.lineEditTime.setEnabled(False)
|
||
else:
|
||
self.timer_send.stop()
|
||
self.lineEditTime.setEnabled(True)
|
||
else:
|
||
QMessageBox.critical(self, '定时发送数据异常', '定时发送数据周期仅可设置在30秒内!')
|
||
except:
|
||
QMessageBox.critical(self, '定时发送数据异常', '请设置正确的数值类型!')
|
||
|
||
# %%发送数据
|
||
def data_send(self):
|
||
if self.ser.isOpen():
|
||
input_s = self.textEditSend.toPlainText()
|
||
|
||
# 判断是否为非空字符串
|
||
if input_s != "":
|
||
# 时间显示
|
||
if self.checkBoxAddDate.isChecked():
|
||
self.textEditReceive.insertPlainText((time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) + " ")
|
||
|
||
# HEX发送
|
||
if self.checkBoxHexSend.isChecked():
|
||
input_s = input_s.strip()
|
||
send_list = []
|
||
while input_s != '':
|
||
try:
|
||
num = int(input_s[0:2], 16)
|
||
except ValueError:
|
||
QMessageBox.critical(self, '串口异常', '请输入规范十六进制数据,以空格分开!')
|
||
return None
|
||
|
||
input_s = input_s[2:].strip()
|
||
send_list.append(num)
|
||
|
||
input_s = bytes(send_list)
|
||
# ASCII发送
|
||
else:
|
||
input_s = (input_s).encode('utf-8')
|
||
|
||
# HEX接收显示
|
||
if self.checkBoxHexReceive.isChecked():
|
||
out_s = ''
|
||
for i in range(0, len(input_s)):
|
||
out_s = out_s + '{:02X}'.format(input_s[i]) + ' '
|
||
|
||
self.textEditReceive.insertPlainText(out_s)
|
||
# ASCII接收显示
|
||
else:
|
||
self.textEditReceive.insertPlainText(input_s.decode('utf-8'))
|
||
|
||
# 接收换行
|
||
if self.checkBoxCRLF.isChecked():
|
||
self.textEditReceive.insertPlainText('\r\n')
|
||
|
||
# 获取到Text光标
|
||
textCursor = self.textEditReceive.textCursor()
|
||
# 滚动到底部
|
||
textCursor.movePosition(textCursor.End)
|
||
# 设置光标到Text中去
|
||
self.textEditReceive.setTextCursor(textCursor)
|
||
|
||
# 统计发送字符数量
|
||
num = self.ser.write(input_s)
|
||
self.data_num_sended += num
|
||
self.lineEditSendNum.setText(str(self.data_num_sended))
|
||
|
||
|
||
else:
|
||
pass
|
||
|
||
# %%保存日志
|
||
def savefiles(self):
|
||
dlg = QFileDialog()
|
||
filenames = dlg.getSaveFileName(None, "保存日志文件", None, "Txt files(*.txt)")
|
||
|
||
try:
|
||
with open(file = filenames[0], mode='w', encoding='utf-8') as file:
|
||
file.write(self.textEditReceive.toPlainText())
|
||
except:
|
||
QMessageBox.critical(self, '日志异常', '保存日志文件失败!')
|
||
|
||
#%% 加载日志
|
||
def openfiles(self):
|
||
dlg = QFileDialog()
|
||
filenames = dlg.getOpenFileName(None, "加载日志文件", None, "Txt files(*.txt)")
|
||
|
||
try:
|
||
with open(file = filenames[0], mode='r', encoding='utf-8') as file:
|
||
self.textEditSend.setPlainText(file.read())
|
||
except:
|
||
QMessageBox.critical(self, '日志异常', '加载日志文件失败!')
|
||
|
||
# %%打开博客链接和公众号二维码
|
||
|
||
#webbrowser.open('https://blog.csdn.net/m0_38106923')
|
||
|
||
# %%清除发送数据显示
|
||
def send_data_clear(self):
|
||
self.textEditSend.setText("")
|
||
|
||
self.data_num_sended = 0
|
||
self.lineEditSendNum.setText(str(self.data_num_sended))
|
||
|
||
# 清除接收数据显示
|
||
def receive_data_clear(self):
|
||
self.textEditReceive.setText("")
|
||
|
||
self.data_num_received = 0
|
||
self.lineEditSendNum.setText(str(self.data_num_received))
|
||
|
||
# 关闭串口
|
||
def port_close(self):
|
||
try:
|
||
self.timer.stop()
|
||
self.timer_send.stop()
|
||
|
||
self.ser.close()
|
||
except:
|
||
QMessageBox.critical(self, '串口异常', '关闭串口失败,请重启程序!')
|
||
return None
|
||
|
||
# 切换开关串口按钮使能状态和定时发送使能状态
|
||
self.pushButtonOpenSerial.setEnabled(True)
|
||
self.pushButtonCloseSerial.setEnabled(False)
|
||
self.lineEditTime.setEnabled(True)
|
||
self.comboBoxBaudrate.setEnabled(True)
|
||
self.comboBoxSerial.setEnabled(True)
|
||
|
||
# 发送数据和接收数据数目置零
|
||
self.data_num_sended = 0
|
||
self.lineEditSendNum.setText(str(self.data_num_sended))
|
||
self.data_num_received = 0
|
||
self.lineEditReceiveNum.setText(str(self.data_num_received))
|
||
|
||
#self.formGroupBox1.setTitle("串口状态(关闭)")
|
||
def btn_start_clicked(self):
|
||
#开启按钮
|
||
|
||
self.update_data_thread.start()
|
||
|
||
self.pushButtonStartPlot.setEnabled(False)
|
||
self.pushButtonStopPlot.setEnabled(True)
|
||
self.radioButtonCH4QX.setEnabled(False)
|
||
self.radioButtonCH4TF.setEnabled(False)
|
||
|
||
|
||
self.comboBoxPlot.addItems(["甲烷浓度","环境温度","激光器温度","激光强度"])
|
||
self.comboBoxPlot.setCurrentText("甲烷浓度")
|
||
#self.update_data_thread.SetPlotItem(0)
|
||
self.get_data_qx.IndOfReturn(0)
|
||
self.update_data_thread.restart()
|
||
|
||
self.filenameCsv= self.file + ".csv"
|
||
if self.checkBoxAutoSaveCsv.isChecked():
|
||
with open(self.filenameCsv, mode='a', newline='') as file:
|
||
# 创建一个写入器对象
|
||
writer = csv.writer(file)
|
||
# 写入数据,这里假设CSV文件有标题行
|
||
writer.writerow(('time','Methane', 'Air Temp', 'Laser Temp', 'Laser Intensity'))
|
||
try:
|
||
self.get_data_qx.SaveCsv(self.filenameCsv)
|
||
except:
|
||
pass
|
||
|
||
|
||
|
||
def btn_stop_clicked(self):
|
||
self.update_data_thread.stop()
|
||
self.pushButtonStartPlot.setEnabled(True)
|
||
self.pushButtonStopPlot.setEnabled(False)
|
||
self.radioButtonCH4QX.setEnabled(True)
|
||
self.radioButtonCH4TF.setEnabled(True)
|
||
self.comboBoxPlot.clear()
|
||
|
||
def update_data_thread_slot(self, data):
|
||
# 线程回调函数
|
||
#data = json.loads(data)
|
||
self.plot_qchart.handle_update(float(data))
|
||
print("thread ",data)
|
||
|
||
def plot_item_changed(self,index):
|
||
print(index)
|
||
#self.update_data_thread.SetPlotItem(index)
|
||
self.get_data_qx.IndOfReturn(index)
|
||
#self.plot_qchart.series.replace([])
|
||
|
||
|
||
|
||
#执行
|
||
if __name__ == '__main__':
|
||
app = QtWidgets.QApplication(sys.argv)
|
||
myshow = PyQt5Serial()
|
||
myshow.show()
|
||
sys.exit(app.exec_()) |