python/PyUartAssistant/UartDataPolt.py
2025-01-25 11:13:39 +08:00

225 lines
7.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
Created on Tue May 28 15:40:48 2024
@author: WANGXIBAO
"""
# -*- coding: utf-8 -*-
import time,math,os
import re,csv
from PyQt5.QtCore import QThread, pyqtSignal,Qt
from PyQt5.QtWidgets import QMessageBox
from PyQt5.QtChart import QChart, QValueAxis, QSplineSeries,QLineSeries
from PyQt5.QtGui import QPen
# 波形显示
class QChartViewPlot(QChart):
# 相位振动波形
def __init__(self, parent=None):
super(QChartViewPlot, self).__init__(parent)
self.window = parent
self.xRange = 60
self.counter = 0
self.seriesList = []
self.legend().show()
self.axisX = QValueAxis()
self.axisX.setRange(0, self.xRange)
self.addAxis(self.axisX, Qt.AlignBottom)
# self.setAxisX(self.axisX, series)
# 增加X轴网格设置
self.axisX.setTickCount(13)
grid_pen_x = QPen(Qt.black, 0.5, Qt.DotLine) # 设置网格线的颜色、宽度和样式
self.axisX.setGridLinePen(grid_pen_x)
self.y_min = 0
self.y_max = 100
self.axisY = QValueAxis()
self.axisY.setRange(self.y_min, self.y_max)
self.addAxis(self.axisY, Qt.AlignLeft)
# self.setAxisY(self.axisY, series)
# 增加Y轴网格设置
self.axisY.setTickCount(11)
grid_pen_y = QPen(Qt.black, 0.5, Qt.DotLine) # 设置网格线的颜色、宽度和样式
self.axisY.setGridLinePen(grid_pen_y)
#self.series = QSplineSeries()
self.series = QLineSeries()
self.series.setName("波形")
self.series.setUseOpenGL(True)
self.addSeries(self.series)
self.series.attachAxis(self.axisX)
self.series.attachAxis(self.axisY)
self.curretValue = 0
self.windowAverage =0
self.windowStd = 0
def handle_update(self, ydata):
# 更新y值
if self.counter < self.xRange:
self.series.append(self.counter, ydata)
self.counter += 1
points = self.series.pointsVector()
self.y_min = min(points, key=lambda point: point.y()).y()
self.y_max = max(points, key=lambda point: point.y()).y()
else:
points = self.series.pointsVector()
for i in range(self.xRange - 1):
points[i].setY(points[i + 1].y())
points[-1].setY(ydata)
self.y_min = min(points, key=lambda point: point.y()).y()
self.y_max = max(points, key=lambda point: point.y()).y()
self.series.replace(points)
# 计算总和
total_sum = sum(point.y() for point in points)
# 计算点的数量
num_points = len(points)
# 计算均值,确保除数不为零
self.windowAverage = total_sum / num_points if num_points > 0 else 0
print("windowMean:",self.windowAverage)
# 计算每个点与均值的差的平方,然后求和
variance_sum = sum((point.y() - self.windowAverage) ** 2 for point in points)
# 计算方差,除以数据点的总数
variance = variance_sum / len(points)
self.axisY.setRange(math.floor(self.y_min-0.1),math.ceil( self.y_max+0.1))
# 计算标准差,即方差的平方根
self.windowStd = variance ** 0.5
print("standard_deviation", self.windowStd)
def clearSeries(self):
self.series.clear()
self.counter = 0
# 使用线程不断更新波形数据
class UpdateDataThread(QThread):
_signal_update = pyqtSignal(float) # 信号
def __init__(self, parent=None):
super(UpdateDataThread, self).__init__(parent)
self.is_exit = False
self.x_range = 1024
self.flag = 0
def run(self):
print("run start")
while not self.is_exit: # 直接在循环中检查退出标志
# for i in range(self.x_range):
# print(i,self.sin.get_data(i))
# self._signal_update.emit(str(self.sin.get_data(i))) # 发射信号
# time.sleep(0.01) # 实际应用中推荐使用QTimer代替
# 这里应该有一个 flag 默认是 0 外部调用可以传紧来1没循环完自己置零while 只起到监控左右
if self.flag == 1:
#print("flag",self.flag)
#data = self.setReceiveData
#print("发数",datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"))
print("data receive ",self.setReceiveData)
# print(f"{data:.3f}")
try:
self._signal_update.emit(self.setReceiveData)
self.flag = 0
except Exception:
QMessageBox.critical(self, '数据异常', '数据格式不对,不能绘图')
pass
def SetReceiveData(self,data_in):
self.setReceiveData = data_in
def SetFlag(self,flag):
self.flag = flag
def SetFilenameCsv(self,filename):
self.filenameCsv = filename
def SetPlotItem(self,ind): #设置显示项目
self.ind = ind
def stop(self):
self.is_exit = True # 设置退出标志
print("停止绘图")
def restart(self):
self.is_exit = False
class GetDataTF(): #读取费加罗或者自定义数据
def __init__(self, parent=None):
super().__init__()
self.indOfReturn =0
self.TIME_FORMAT = "%Y-%m-%d %H:%M:%S"
self.headStr = ""
self.rowTitle=[]
self.csv_buffer = []
def SetConfig(self,config_in):
self.regex= re.compile(config_in[0]) #正则表达
self.headStr = config_in[1]
self.rowTitleStr = config_in[2]
#print(self.rowTitleStr)
self.rowTitle = self.rowTitleStr.split(',')
def Transdata(self,data):
if data[:2] ==self.headStr:
self.data2csv = self.regex.findall(data)
returnData = float(self.data2csv[self.indOfReturn])
print("returnData",returnData)
return returnData
else:
return "noNum"
def SaveCsv(self,filenameCsv,num):
# 打开一个文件用于写入,如果文件不存在则创建
# 增加num 写入文件间隔
if os.path.isfile(filenameCsv) == 0:
# 文件为空,需要写入表头
with open(filenameCsv, mode='w', newline='') as file:
writer = csv.writer(file)
writer.writerow(self.rowTitle)
else:
timeCrvt=time.strftime(self.TIME_FORMAT, time.localtime())
self.data2csv.insert(0, timeCrvt)
print(self.data2csv)
self.csv_buffer.append(self.data2csv)
if len(self.csv_buffer)>= num:
with open(filenameCsv, mode='a', newline='') as file:
# 创建一个写入器对象
writer = csv.writer(file)
# 写入数据行
writer.writerows(self.csv_buffer)
self.csv_buffer.clear()
def IndOfReturn(self,ind):
self.indOfReturn = ind