python/PyUartAssistant/UartDataPolt.py
2024-09-23 11:50:17 +08:00

433 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
Created on Tue May 28 15:40:48 2024
@author: WANGXIBAO
"""
# -*- coding: utf-8 -*-
import time,math,os
import re,csv,datetime
from PyQt5.QtCore import QThread, pyqtSignal,Qt
from PyQt5.QtWidgets import QMessageBox
from PyQt5.QtChart import QChart, QValueAxis, QSplineSeries,QLineSeries
from PyQt5.QtGui import QPen
from configparser import ConfigParser
# 波形显示
class QChartViewPlot(QChart):
# 相位振动波形
def __init__(self, parent=None):
super(QChartViewPlot, self).__init__(parent)
self.window = parent
self.xRange = 60
self.counter = 0
self.seriesList = []
self.legend().show()
self.axisX = QValueAxis()
self.axisX.setRange(0, self.xRange)
self.addAxis(self.axisX, Qt.AlignBottom)
# self.setAxisX(self.axisX, series)
# 增加X轴网格设置
self.axisX.setTickCount(13)
grid_pen_x = QPen(Qt.black, 0.5, Qt.DotLine) # 设置网格线的颜色、宽度和样式
self.axisX.setGridLinePen(grid_pen_x)
self.y_min = 0
self.y_max = 100
self.axisY = QValueAxis()
self.axisY.setRange(self.y_min, self.y_max)
self.addAxis(self.axisY, Qt.AlignLeft)
# self.setAxisY(self.axisY, series)
# 增加Y轴网格设置
self.axisY.setTickCount(11)
grid_pen_y = QPen(Qt.black, 0.5, Qt.DotLine) # 设置网格线的颜色、宽度和样式
self.axisY.setGridLinePen(grid_pen_y)
#self.series = QSplineSeries()
self.series = QLineSeries()
self.series.setName("波形")
self.series.setUseOpenGL(True)
self.addSeries(self.series)
self.series.attachAxis(self.axisX)
self.series.attachAxis(self.axisY)
self.curretValue = 0
self.windowAverage =0
self.windowStd = 0
def handle_update(self, ydata):
# 更新y值
if self.counter < self.xRange:
self.series.append(self.counter, ydata)
self.counter += 1
points = self.series.pointsVector()
self.y_min = min(points, key=lambda point: point.y()).y()
self.y_max = max(points, key=lambda point: point.y()).y()
else:
points = self.series.pointsVector()
for i in range(self.xRange - 1):
points[i].setY(points[i + 1].y())
points[-1].setY(ydata)
self.y_min = min(points, key=lambda point: point.y()).y()
self.y_max = max(points, key=lambda point: point.y()).y()
self.series.replace(points)
# 计算总和
total_sum = sum(point.y() for point in points)
# 计算点的数量
num_points = len(points)
# 计算均值,确保除数不为零
self.windowAverage = total_sum / num_points if num_points > 0 else 0
print("windowMean:",self.windowAverage)
# 计算每个点与均值的差的平方,然后求和
variance_sum = sum((point.y() - self.windowAverage) ** 2 for point in points)
# 计算方差,除以数据点的总数
variance = variance_sum / len(points)
self.axisY.setRange(math.floor(self.y_min-0.1),math.ceil( self.y_max+0.1))
# 计算标准差,即方差的平方根
self.windowStd = variance ** 0.5
print("standard_deviation", self.windowStd)
def clearSeries(self):
self.series.clear()
self.counter = 0
# 使用线程不断更新波形数据
class UpdateDataThread(QThread):
_signal_update = pyqtSignal(float) # 信号
def __init__(self, parent=None):
super(UpdateDataThread, self).__init__(parent)
self.is_exit = False
self.x_range = 1024
self.getDateQx = GetDataQX()
self.flag = 0
def run(self):
print("run start")
while not self.is_exit: # 直接在循环中检查退出标志
# for i in range(self.x_range):
# print(i,self.sin.get_data(i))
# self._signal_update.emit(str(self.sin.get_data(i))) # 发射信号
# time.sleep(0.01) # 实际应用中推荐使用QTimer代替
# 这里应该有一个 flag 默认是 0 外部调用可以传紧来1没循环完自己置零while 只起到监控左右
if self.flag == 1:
#print("flag",self.flag)
#data = self.setReceiveData
#print("发数",datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"))
print("data receive ",self.setReceiveData)
# print(f"{data:.3f}")
try:
self._signal_update.emit(self.setReceiveData)
self.flag = 0
except Exception:
QMessageBox.critical(self, '数据异常', '数据格式不对,不能绘图')
pass
def SetReceiveData(self,data_in):
self.setReceiveData = data_in
def SetFlag(self,flag):
self.flag = flag
def SetFilenameCsv(self,filename):
self.filenameCsv = filename
def SetPlotItem(self,ind): #设置显示项目
self.ind = ind
def stop(self):
self.is_exit = True # 设置退出标志
print("停止绘图")
def restart(self):
self.is_exit = False
class GetDataQX(): #读取请芯数据
def __init__(self, parent=None):
super().__init__()
self.indOfReturn =0
self.regexQx = re.compile(r'(A\+|B\+|\s)+') # 编译正则表达式
self.TIME_FORMAT = "%Y-%m-%d %H:%M:%S"
def Transdata(self,data):
parts = re.split(self.regexQx,data)
# 过滤掉空字符串
parts = [part for part in parts if part]
#print(parts)
methane = float(parts[1])
airTemp = float(parts[3])
laserTemp = float( parts[5])/1
laerIntensity = float(parts[7])*10
timeCrvt=time.strftime(self.TIME_FORMAT, time.localtime())
self.data2csv =[(timeCrvt,methane,airTemp,laserTemp,laerIntensity)]
#print(self.data2csv)
if self.indOfReturn == 0:
returnData = methane
elif self.indOfReturn == 1:
returnData = airTemp
elif self.indOfReturn == 2:
returnData = laserTemp
elif self.indOfReturn == 3:
returnData = laerIntensity
print(returnData)
return returnData
def SaveCsv(self,filenameCsv):
# 打开一个文件用于写入,如果文件不存在则创建
if os.path.isfile(filenameCsv) == 0:
# 文件为空,需要写入表头
with open(filenameCsv, mode='w', newline='') as file:
writer = csv.writer(file)
writer.writerow(('time', 'Methane', 'Air Temp', 'Laser Temp', 'Laser Intensity'))
else:
with open(filenameCsv, mode='a', newline='') as file:
# 创建一个写入器对象
writer = csv.writer(file)
# 写入数据行
writer.writerows(self.data2csv)
def IndOfReturn(self,ind):
self.indOfReturn = ind
class GetDataTF(): #读取费加罗或者自定义数据
def __init__(self, parent=None):
super().__init__()
self.indOfReturn =0
#self.regex = re.compile(r'(A\+|B\+|\s)+') # 编译正则表达式
self.TIME_FORMAT = "%Y-%m-%d %H:%M:%S"
self.regularIniPath = "regular.ini" # 配置文件默认路径
self.headStr = ""
self.rowTitle=[]
self.buttons_config = [[None for _ in range(3)] for _ in range(20)]
#存放快捷按键配置
self.CheckCfgIniData() # 初始化配置文件
def Transdata(self,data):
if data[:2] ==self.headStr:
# parts = re.split(self.regex,data)
# # 过滤掉空字符串
# parts = [part for part in parts if part]
#print(parts)
# methane = float(parts[1])
# airTemp = float(parts[3])
# laserTemp = float( parts[5])/1
# laerIntensity = float(parts[7])*10
# self.data2csv =[(timeCrvt,methane,airTemp,laserTemp,laerIntensity)]
self.data2csv = self.regex.findall(data)
#print(self.data2csv)
# if self.indOfReturn == 0:
# returnData = methane
# elif self.indOfReturn == 1:
# returnData = airTemp
# elif self.indOfReturn == 2:
# returnData = laserTemp
# elif self.indOfReturn == 3:
# returnData = laerIntensity
returnData = float(self.data2csv[self.indOfReturn])
print("returnData",returnData)
return returnData
else:
return "noNum"
def SaveCsv(self,filenameCsv):
# 打开一个文件用于写入,如果文件不存在则创建
if os.path.isfile(filenameCsv) == 0:
# 文件为空,需要写入表头
with open(filenameCsv, mode='w', newline='') as file:
writer = csv.writer(file)
#writer.writerow(('time', 'Methane', 'Air Temp', 'Laser Temp', 'Laser Intensity'))
writer.writerow(self.rowTitle)
else:
with open(filenameCsv, mode='a', newline='') as file:
# 创建一个写入器对象
writer = csv.writer(file)
# 写入数据行
timeCrvt=time.strftime(self.TIME_FORMAT, time.localtime())
self.data2csv.insert(0, timeCrvt)
print(self.data2csv)
writer.writerows([self.data2csv])
def IndOfReturn(self,ind):
self.indOfReturn = ind
def CheckCfgIniData(self):
if not os.path.exists(self.regularIniPath):
config = ConfigParser()
config.add_section('TF_config')
config.set('TF_config', 'regular', '\+?-?\d+(?:\.\d+)?')
#config.set('TF_config', 'regular', '(A\+|B\+|\s)+')
config.set('TF_config', 'headStr', 'A+')
config.set('TF_config', 'rowTitle', 'time,Methane,Air Temp,Laser Temp,Laser Intensity,amplification,NL,ND,Sinal,SNR,PEAK,Best Piont')
config.add_section('Quick_config')
config.set('Quick_config', 'Button00', 'write 0,2024100101|序列号')
config.set('Quick_config', 'Button01', 'write 1,65000|激光温度')
config.set('Quick_config', 'Button02', 'write 2,1000|K*1000')
config.set('Quick_config', 'Button03', 'write 3,0|B*1000')
config.set('Quick_config', 'Button04', 'write 4,0|可调电阻')
config.set('Quick_config', 'Button05', 'write 5,30|电阻抽头')
config.set('Quick_config', 'Button06', 'write 6,0|输出温度')
config.set('Quick_config', 'Button07', 'write 7,0|输出激光')
config.set('Quick_config', 'Button08', 'write 8,1|输出状态')
config.set('Quick_config', 'Button09', 'write 9,0|激光温补')
config.set('Quick_config', 'Button10', 'write 10,100|噪声长度')
config.set('Quick_config', 'Button11', 'write 11,600|扫描长度')
config.set('Quick_config', 'Button12', '')
config.set('Quick_config', 'Button13', 'write 13,30500|气温校准')
config.set('Quick_config', 'Button14', 'write 14,1|滑动开关')
config.set('Quick_config', 'Button15', 'write 15,10|滑动标准差')
config.set('Quick_config', 'Button16', 'write 16,1|浓度温补')
config.set('Quick_config', 'Button17', '')
config.set('Quick_config', 'Button18', '')
config.set('Quick_config', 'Button19', '')
with open(self.regularIniPath, 'w' ,encoding='utf-8') as f:
config.write(f)
config = ConfigParser()
config.read(self.regularIniPath, encoding='utf-8')
try:
self.headStr = config.get('TF_config', 'headStr')
self.regex = re.compile(config.get('TF_config', 'regular'))
self.rowTitleStr = config.get('TF_config', 'rowTitle')
self.rowTitle = self.rowTitleStr.split(',')
print(type(self.rowTitle))
print(f"Configuration read successfully: {self.headStr}, {self.regex}, {self.rowTitle}")
# 创建一个空字典来存储按钮名称和对应的配置
# 循环遍历按钮编号从0到19
for i in range(20):
button_name = f'Button{i:02}' # 格式化按钮名称,确保两位数
# 使用 get 方法安全地获取配置,如果不存在则返回空字符串
config_value = config.get('Quick_config', button_name, fallback='')
config_value_split = config_value.split('|')
# 将按钮名称和配置信息存储在字典中
self.buttons_config[i][0] = button_name
if len(config_value_split) == 2:
self.buttons_config[i][1] = config_value_split[0]
self.buttons_config[i][2] = config_value_split[1]
# 打印字典查看结果
for i in range(20):
print(f'{self.buttons_config[i][0]}: {self.buttons_config[i][1]}|{self.buttons_config[i][2]}')
except Exception as e:
print(f"Error reading configuration: {e}")
def SetCfgIniData(self,button_name,set_text):
config = ConfigParser()
config.read(self.regularIniPath, encoding='utf-8')
config.set('Quick_config', button_name, set_text)
with open(self.regularIniPath, 'w' ,encoding='utf-8') as f:
config.write(f)
class GetDataOther(): #读取费加罗或者自定义数据
def __init__(self, parent=None):
super().__init__()
self.indOfReturn =0
#self.regex = re.compile(r'(A\+|B\+|\s)+') # 编译正则表达式
self.TIME_FORMAT = "%Y-%m-%d %H:%M:%S"
self.OtherIniPath = "OtherData.ini" # 配置文件默认路径
self.headStr = ""
self.rowTitle=[]
self.CheckCfgIniData() # 初始化配置文件
def Transdata(self,data):
if data[:2] ==self.headStr:
self.data2csv = self.regex.findall(data)
returnData = float(self.data2csv[self.indOfReturn])
print("returnData",returnData)
return returnData
else:
return "noNum"
def SaveCsv(self,filenameCsv):
# 打开一个文件用于写入,如果文件不存在则创建
if os.path.isfile(filenameCsv) == 0:
# 文件为空,需要写入表头
with open(filenameCsv, mode='w', newline='') as file:
writer = csv.writer(file)
#writer.writerow(('time', 'Methane', 'Air Temp', 'Laser Temp', 'Laser Intensity'))
writer.writerow(self.rowTitle)
else:
with open(filenameCsv, mode='a', newline='') as file:
# 创建一个写入器对象
writer = csv.writer(file)
# 写入数据行
timeCrvt=time.strftime(self.TIME_FORMAT, time.localtime())
self.data2csv.insert(0, timeCrvt)
print(self.data2csv)
writer.writerows([self.data2csv])
def IndOfReturn(self,ind):
self.indOfReturn = ind
def CheckCfgIniData(self):
if not os.path.exists(self.OtherIniPath):
config = ConfigParser()
config.add_section('OtherData_config')
config.set('OtherData_config', 'regular', '\+?-?\d+(?:\.\d+)?')
#config.set('TF_config', 'regular', '(A\+|B\+|\s)+')
config.set('OtherData_config', 'headStr', 'A+')
config.set('OtherData_config', 'rowTitle', 'time,DATA1,DATA2')
with open(self.OtherIniPath, 'w') as f:
config.write(f)
config = ConfigParser()
config.read(self.OtherIniPath, encoding='utf-8')
try:
self.headStr = config.get('OtherData_config', 'headStr')
self.regex = re.compile(config.get('OtherData_config', 'regular'))
self.rowTitleStr = config.get('OtherData_config', 'rowTitle')
self.rowTitle = self.rowTitleStr.split(',')
print(type(self.rowTitle))
print(f"Configuration read successfully: {self.headStr}, {self.regex}, {self.rowTitle}")
except Exception as e:
print(f"Error reading configuration: {e}")