758 lines
32 KiB
Python
758 lines
32 KiB
Python
![]() |
import os
|
|||
|
import sys
|
|||
|
import time,datetime
|
|||
|
from configparser import ConfigParser
|
|||
|
|
|||
|
import serial,re
|
|||
|
import serial.tools.list_ports
|
|||
|
from PyQt5 import QtWidgets, QtGui
|
|||
|
from PyQt5.QtCore import QTimer
|
|||
|
from PyQt5.QtWidgets import QMessageBox, QTableWidgetItem
|
|||
|
|
|||
|
from LaserMethaneUI import Ui_MainWindow
|
|||
|
from serialUI import Ui_SerialSet
|
|||
|
|
|||
|
|
|||
|
class LaserMethanePyqt5(QtWidgets.QMainWindow, Ui_MainWindow):
|
|||
|
def __init__(self):
|
|||
|
super().__init__()
|
|||
|
self.setupUi(self)
|
|||
|
self.quick_num = 99
|
|||
|
self.serials = {} # 创建一个字典来存储多个串口对象
|
|||
|
# 用于暂存接收的串口数据
|
|||
|
self.buffers = {} # 创建一个字典来存储每个串口的数据缓冲
|
|||
|
# 用于暂存解码数据
|
|||
|
self.lineUtf8 = ""
|
|||
|
self.log_time = 10
|
|||
|
self.funcode = "F4"
|
|||
|
self.position = 5
|
|||
|
#self.CheckCfgIniData() #更改按钮的文字
|
|||
|
self.log_count = 0
|
|||
|
self.log_buffer = {}
|
|||
|
self.IniPath = "setup.ini"
|
|||
|
self.SerialWindow = PT_Serial()
|
|||
|
self.init()
|
|||
|
self.CheckCfgIniData()
|
|||
|
|
|||
|
|
|||
|
def init(self):
|
|||
|
self.setWindowTitle('LaserMethane')
|
|||
|
self.tableWidgetSerChannel.setRowCount(20)
|
|||
|
|
|||
|
# 舒适化界面的验证
|
|||
|
self.lineEditTempra.setValidator(QtGui.QDoubleValidator()) # 只允许输入整数
|
|||
|
|
|||
|
self.actionSerial.triggered.connect(self.openSerialWindow)
|
|||
|
self.SerialWindow.pushButtonOpenSerial.clicked.connect(self.port_open)
|
|||
|
self.SerialWindow.pushButtonCloseSerial.clicked.connect(self.port_close)
|
|||
|
self.pushButtonSend.clicked.connect(self.data_send_form_textEdit)
|
|||
|
self.pushButtonCalibTempra.clicked.connect(self.tempra_calibrate)
|
|||
|
self.pushButtonCheckTempra.clicked.connect(self.tempra_calib_check)
|
|||
|
|
|||
|
# 建立表格解析并显示hex
|
|||
|
self.tableWidget.setRowCount(self.quick_num)
|
|||
|
for i in range(self.quick_num): # 从0到20
|
|||
|
item = QtWidgets.QTableWidgetItem()
|
|||
|
item.setCheckState(0)
|
|||
|
self.tableWidget.setItem(i, 0, item)
|
|||
|
# 建立表格选择的信号和槽
|
|||
|
self.pushButtonSerAll.clicked.connect(self.select_ser_channel_all)
|
|||
|
self.pushButtonSerNo.clicked.connect(self.select_ser_channel_none)
|
|||
|
self.pushButtonSerClear.clicked.connect(self.clear_ser_channel)
|
|||
|
|
|||
|
def closeEvent(self, event):
|
|||
|
self.SavConfig()
|
|||
|
self.port_all_close()
|
|||
|
|
|||
|
# 调用父类的关闭事件处理函数
|
|||
|
super().closeEvent(event)
|
|||
|
|
|||
|
def openSerialWindow(self):
|
|||
|
self.SerialWindow.show()
|
|||
|
#self.SerialWindow.port_check()
|
|||
|
|
|||
|
def port_open(self):
|
|||
|
self.SerialWindow.port_open()
|
|||
|
port_name = self.SerialWindow.ser.port
|
|||
|
try:
|
|||
|
time.sleep(0.1)
|
|||
|
if port_name in self.serials and self.serials[port_name].is_open:
|
|||
|
self.serials[port_name].close()
|
|||
|
self.serials[port_name] = self.SerialWindow.ser
|
|||
|
self.serials[port_name].open()
|
|||
|
# 添加通道
|
|||
|
self.add_item_ser_channel(port_name)
|
|||
|
# 初始化缓冲区
|
|||
|
self.buffers[port_name] = b''
|
|||
|
|
|||
|
# 定时器接收数据
|
|||
|
if not hasattr(self, 'timer'):
|
|||
|
self.timer = QTimer()
|
|||
|
self.timer.timeout.connect(self.data_receive)
|
|||
|
# 打开串口接收定时器,周期为20ms
|
|||
|
self.timer.start(50)
|
|||
|
|
|||
|
except Exception as e:
|
|||
|
QMessageBox.critical(self, "串口异常", f"此串口不能被打开!错误信息: {str(e)}")
|
|||
|
del self.serials[port_name]
|
|||
|
return None
|
|||
|
|
|||
|
def port_close(self):
|
|||
|
try:
|
|||
|
self.SerialWindow.port_close()
|
|||
|
port_name = self.SerialWindow.ser.port
|
|||
|
self.SerialWindow.ser.close()
|
|||
|
del self.serials[port_name]
|
|||
|
# 删除通道
|
|||
|
self.delete_item_ser_channel(port_name)
|
|||
|
# 删除缓冲区
|
|||
|
del self.buffers[port_name]
|
|||
|
except Exception as e:
|
|||
|
QMessageBox.critical(self, "串口异常", f"此串口不能被关闭!错误信息: {str(e)}")
|
|||
|
return None
|
|||
|
|
|||
|
def port_all_close(self):
|
|||
|
for port_name in self.serials:
|
|||
|
self.serials[port_name].close()
|
|||
|
del self.buffers[port_name]
|
|||
|
self.serials = {}
|
|||
|
|
|||
|
|
|||
|
# %%接收数据
|
|||
|
def add_item_ser_channel(self, port_name):
|
|||
|
row_count = self.tableWidgetSerChannel.rowCount() # 获取表格的行数
|
|||
|
|
|||
|
# 检查 port_name 是否已经存在于表格中
|
|||
|
for i in range(row_count):
|
|||
|
item = self.tableWidgetSerChannel.item(i, 1) # 假设 port_name 在第2列(索引为2)
|
|||
|
if item and item.text() == port_name:
|
|||
|
print(f"{port_name} 已经存在于表格中")
|
|||
|
return
|
|||
|
|
|||
|
# 找到第一个空白行
|
|||
|
for i in range(row_count):
|
|||
|
item = self.tableWidgetSerChannel.item(i, 1) # 假设 port_name 在第三列(索引为2)
|
|||
|
if not item or not item.text():
|
|||
|
# 在第一个空白行插入 port_name
|
|||
|
self.tableWidgetSerChannel.setItem(i, 1, QTableWidgetItem(port_name))
|
|||
|
item = QTableWidgetItem()
|
|||
|
item.setCheckState(2) # 2 表示 Qt.Checked
|
|||
|
self.tableWidgetSerChannel.setItem(i, 0, item)
|
|||
|
green_brush = QtGui.QBrush(QtGui.QColor(0, 255, 0))
|
|||
|
self.tableWidgetSerChannel.item(i, 0).setBackground(green_brush)
|
|||
|
print(f"{port_name} 已添加到表格中")
|
|||
|
return
|
|||
|
|
|||
|
# 如果没有空白行,则在表格末尾添加一行并插入 port_name
|
|||
|
self.tableWidgetSerChannel.insertRow(row_count)
|
|||
|
self.tableWidgetSerChannel.setItem(row_count, 1, QTableWidgetItem(port_name))
|
|||
|
item = QTableWidgetItem()
|
|||
|
item.setCheckState(2) # 2 表示 Qt.Checked
|
|||
|
self.tableWidgetSerChannel.setItem(row_count, 0, item)
|
|||
|
green_brush = QtGui.QBrush(QtGui.QColor(0, 255, 0))
|
|||
|
self.tableWidgetSerChannel.item(row_count, 0).setBackground(green_brush)
|
|||
|
print(f"{port_name} 已添加到表格末尾")
|
|||
|
|
|||
|
# 自动调整列宽
|
|||
|
self.tableWidgetSerChannel.resizeColumnsToContents(1)
|
|||
|
|
|||
|
def delete_item_ser_channel(self, port_name):
|
|||
|
row = self.find_port_row(port_name)
|
|||
|
if row is not None:
|
|||
|
self.tableWidgetSerChannel.removeRow(row)
|
|||
|
print(f"{port_name} 已从表格中删除")
|
|||
|
else:
|
|||
|
print(f"{port_name} 不存在于表格中")
|
|||
|
|
|||
|
def find_port_row(self, port_name):
|
|||
|
"""查找 port_name 在表格中的行位置"""
|
|||
|
row_count = self.tableWidgetSerChannel.rowCount()
|
|||
|
for i in range(row_count):
|
|||
|
item = self.tableWidgetSerChannel.item(i, 1) # 假设 port_name 在第2列(索引为1)
|
|||
|
if item and item.text() == port_name:
|
|||
|
return i
|
|||
|
return None
|
|||
|
def find_selected_port(self):
|
|||
|
|
|||
|
selected_name = []
|
|||
|
for i in range(self.tableWidgetSerChannel.rowCount()):
|
|||
|
item = self.tableWidgetSerChannel.item(i, 0)
|
|||
|
if item and item.checkState() == 2:
|
|||
|
selected_name.append(self.tableWidgetSerChannel.item(i, 1).text())
|
|||
|
return selected_name
|
|||
|
|
|||
|
|
|||
|
|
|||
|
def select_ser_channel_all(self):
|
|||
|
for i in range(self.tableWidgetSerChannel.rowCount()):
|
|||
|
item = self.tableWidgetSerChannel.item(i, 0)
|
|||
|
if item:
|
|||
|
item.setCheckState(2)
|
|||
|
def select_ser_channel_none(self):
|
|||
|
for i in range(self.tableWidgetSerChannel.rowCount()):
|
|||
|
item = self.tableWidgetSerChannel.item(i, 0)
|
|||
|
if item:
|
|||
|
item.setCheckState(0)
|
|||
|
def clear_ser_channel(self):
|
|||
|
for i in range(self.tableWidgetSerChannel.rowCount()):
|
|||
|
item = self.tableWidgetSerChannel.item(i, 0)
|
|||
|
if item:
|
|||
|
item.setCheckState(0)
|
|||
|
green_brush = QtGui.QBrush(QtGui.QColor(255, 255, 255))
|
|||
|
self.tableWidgetSerChannel.item(i, 0).setBackground(green_brush)
|
|||
|
self.tableWidgetSerChannel.setItem(i, 1, QTableWidgetItem(""))
|
|||
|
self.port_all_close()
|
|||
|
def data_receive(self):
|
|||
|
for port_name, ser in self.serials.items():
|
|||
|
try:
|
|||
|
num = ser.in_waiting
|
|||
|
if num > 0:
|
|||
|
try:
|
|||
|
data = ser.read(num)
|
|||
|
self.buffers[port_name] = data
|
|||
|
except Exception as e:
|
|||
|
QMessageBox.critical(self, '串口异常', f'读取数据时发生异常: {str(e)}')
|
|||
|
continue
|
|||
|
|
|||
|
# 以下显示用
|
|||
|
ind = self.find_port_row(port_name)
|
|||
|
if self.tableWidgetSerChannel.item(ind, 0).checkState() == 2:
|
|||
|
|
|||
|
textCursor = self.plainTextEditReceive.textCursor()
|
|||
|
textCursor.movePosition(textCursor.End)
|
|||
|
self.plainTextEditReceive.setTextCursor(textCursor)
|
|||
|
if self.checkBoxAddDate.checkState():
|
|||
|
nowTime = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
|
|||
|
self.plainTextEditReceive.insertPlainText(f" {nowTime} ")
|
|||
|
self.plainTextEditReceive.insertPlainText(f"[{port_name}] ")
|
|||
|
|
|||
|
if self.checkBoxHexReceive.checkState():
|
|||
|
out_s = ' '.join(f'{byte:02X}' for byte in data)
|
|||
|
self.plainTextEditReceive.insertPlainText(out_s)
|
|||
|
# 解析数据
|
|||
|
line_utf8 = data.hex().upper()
|
|||
|
self.Disp_hex_receive(line_utf8)
|
|||
|
else:
|
|||
|
try:
|
|||
|
decoded_data = data.decode('utf-8', errors='replace')
|
|||
|
self.plainTextEditReceive.insertPlainText(decoded_data)
|
|||
|
except Exception as e:
|
|||
|
self.plainTextEditReceive.insertPlainText(f"解码失败: {str(e)}")
|
|||
|
|
|||
|
if self.checkBoxCRLF.isChecked():
|
|||
|
self.plainTextEditReceive.insertPlainText('\r\n')
|
|||
|
|
|||
|
|
|||
|
# 自动保存日志,与通道选择无关
|
|||
|
if self.checkBoxAutoSaveLog.isChecked():
|
|||
|
self.AutoSaveLog(data, port_name)
|
|||
|
|
|||
|
except Exception as e:
|
|||
|
textCursor = self.plainTextEditReceive.textCursor()
|
|||
|
textCursor.movePosition(textCursor.End)
|
|||
|
self.plainTextEditReceive.setTextCursor(textCursor)
|
|||
|
self.plainTextEditReceive.insertPlainText(f"[{port_name}] 串口断开,重连中...\r\n")
|
|||
|
ser.close()
|
|||
|
ind = self.find_port_row(port_name)
|
|||
|
red_brush = QtGui.QBrush(QtGui.QColor(255, 0, 0))
|
|||
|
self.tableWidgetSerChannel.item(ind, 0).setBackground(red_brush)
|
|||
|
|
|||
|
try:
|
|||
|
time.sleep(1)
|
|||
|
if not ser.is_open:
|
|||
|
ser.open()
|
|||
|
self.plainTextEditReceive.insertPlainText(f"[{port_name}] 重连成功\r\n")
|
|||
|
green_brush = QtGui.QBrush(QtGui.QColor(0, 255, 0))
|
|||
|
self.tableWidgetSerChannel.item(ind, 0).setBackground(green_brush)
|
|||
|
else:
|
|||
|
self.plainTextEditReceive.insertPlainText(f"[{port_name}] 串口已连接,无需重连\r\n")
|
|||
|
except Exception as e:
|
|||
|
self.plainTextEditReceive.insertPlainText(f"[{port_name}] 重连失败: {str(e)}\r\n")
|
|||
|
def AutoSaveLog(self,data,port_name):
|
|||
|
line_utf8 = data
|
|||
|
if self.checkBoxHexReceive.checkState():
|
|||
|
line_utf8 = data.hex().upper()
|
|||
|
out_s = ' '.join(['{:02X}'.format(b) for b in bytes.fromhex(line_utf8)])
|
|||
|
else:
|
|||
|
line_utf8 = data.decode('utf-8', errors='replace')
|
|||
|
|
|||
|
|
|||
|
|
|||
|
if self.checkBoxHexReceive.checkState():
|
|||
|
# 根据功能码保存数据
|
|||
|
|
|||
|
if line_utf8[2 * self.position:2 * self.position + 2] == self.funcode:
|
|||
|
sav_name = line_utf8[0:8] + '_' + datetime.datetime.now().strftime("%Y-%m-%d") + '.txt'
|
|||
|
sav_str = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")[:23] + " " + out_s[12:] + "\r\n"
|
|||
|
# 当表示浓度的字符不为0时 将数据单独记录
|
|||
|
if line_utf8[14:18] != '0000':
|
|||
|
sav_name = 'alarm_' + line_utf8[0:8] + '_' + datetime.datetime.now().strftime("%Y-%m-%d") + '.txt'
|
|||
|
sav_str = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")[:23] + " " + out_s[12:] + "\r\n"
|
|||
|
|
|||
|
else:
|
|||
|
sav_name = port_name + '_' + datetime.datetime.now().strftime("%Y-%m-%d") + '.log'
|
|||
|
sav_str = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")[:23] + " " + out_s + "\r\n"
|
|||
|
|
|||
|
else:
|
|||
|
line = line_utf8.split(',')
|
|||
|
if len(line) == 2:
|
|||
|
sav_name = line[0] + '_' + datetime.datetime.now().strftime("%Y-%m-%d") + '.txt'
|
|||
|
sav_str = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")[:23] + " " + line[1]
|
|||
|
|
|||
|
else:
|
|||
|
sav_name = port_name + '_' + datetime.datetime.now().strftime("%Y-%m-%d") + '.log'
|
|||
|
sav_str = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")[:23] + " " + line_utf8
|
|||
|
|
|||
|
# 将日志信息添加到缓冲区
|
|||
|
if sav_name not in self.log_buffer:
|
|||
|
self.log_buffer[sav_name] = []
|
|||
|
self.log_buffer[sav_name].append(sav_str)
|
|||
|
|
|||
|
# 当缓冲区中的记录达到10条时,写入文件
|
|||
|
if len(self.log_buffer[sav_name]) >= self.log_time:
|
|||
|
with open(sav_name, mode='a', newline='', encoding='utf-8', errors='replace') as file:
|
|||
|
file.writelines(self.log_buffer[sav_name])
|
|||
|
self.log_buffer[sav_name].clear()
|
|||
|
def Disp_hex_receive(self,hexdata):
|
|||
|
len = self.tableWidget.rowCount()
|
|||
|
try:
|
|||
|
for i in range(len):
|
|||
|
if self.tableWidget.item(i, 0).checkState() == 2:
|
|||
|
start = int(self.tableWidget.item(i, 1).text())
|
|||
|
num = int(self.tableWidget.item(i, 2).text())
|
|||
|
scale = int(self.tableWidget.item(i, 3).text())
|
|||
|
hex_str = hexdata[start*2:(start+num)*2]
|
|||
|
item = QTableWidgetItem(hex_str)
|
|||
|
self.tableWidget.setItem(i, 4, item)
|
|||
|
value = int(hex_str,16)/scale
|
|||
|
item = QTableWidgetItem(str(value))
|
|||
|
self.tableWidget.setItem(i, 5, item)
|
|||
|
except Exception as e:
|
|||
|
print(e)
|
|||
|
pass
|
|||
|
|
|||
|
def data_send(self, text_quick=None):
|
|||
|
|
|||
|
selected_name = self.find_selected_port()
|
|||
|
# 检查是否有打开的串口
|
|||
|
selected_ports = [port_name for port_name, ser in self.serials.items() if ser.is_open and port_name in selected_name]
|
|||
|
if not selected_ports:
|
|||
|
QMessageBox.critical(self, '串口异常', '没有打开的串口!')
|
|||
|
return None
|
|||
|
|
|||
|
for port_name in selected_ports:
|
|||
|
ser = self.serials[port_name]
|
|||
|
if ser.isOpen():
|
|||
|
if text_quick is None:
|
|||
|
input_s = self.plainTextEditSend.toPlainText()
|
|||
|
else:
|
|||
|
input_s = text_quick
|
|||
|
|
|||
|
# 判断是否为非空字符串
|
|||
|
if input_s != "":
|
|||
|
# 时间显示
|
|||
|
if self.checkBoxAddDate.isChecked():
|
|||
|
self.plainTextEditReceive.insertPlainText((time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) + " ")
|
|||
|
|
|||
|
# HEX发送
|
|||
|
if self.checkBoxHexSend.isChecked():
|
|||
|
input_s = input_s.strip()
|
|||
|
send_list = []
|
|||
|
while input_s != '':
|
|||
|
try:
|
|||
|
num = int(input_s[0:2], 16)
|
|||
|
except ValueError:
|
|||
|
QMessageBox.critical(self, '串口异常', '请输入规范十六进制数据,以空格分开!')
|
|||
|
return None
|
|||
|
|
|||
|
input_s = input_s[2:].strip()
|
|||
|
send_list.append(num)
|
|||
|
|
|||
|
input_s = bytes(send_list) # 将字符串转换为字节串
|
|||
|
if self.checkBoxAddCRC.isChecked():
|
|||
|
crc = self.crc_rtu(input_s)
|
|||
|
input_s = input_s + crc
|
|||
|
|
|||
|
# ASCII发送
|
|||
|
else:
|
|||
|
input_s = (input_s).encode('utf-8')
|
|||
|
input_s = re.sub(b'(?<!\r)\n', b'\r\n', input_s)
|
|||
|
|
|||
|
# HEX接收显示
|
|||
|
if self.checkBoxHexReceive.isChecked():
|
|||
|
out_s = ''
|
|||
|
for i in range(0, len(input_s)):
|
|||
|
out_s = out_s + '{:02X}'.format(input_s[i]) + ' '
|
|||
|
|
|||
|
self.plainTextEditReceive.insertPlainText(out_s)
|
|||
|
# ASCII接收显示
|
|||
|
else:
|
|||
|
self.plainTextEditReceive.insertPlainText(input_s.decode('utf-8', errors='replace'))
|
|||
|
|
|||
|
# 接收换行
|
|||
|
if self.checkBoxCRLF.isChecked():
|
|||
|
self.plainTextEditReceive.insertPlainText('\r\n')
|
|||
|
|
|||
|
# 获取到Text光标
|
|||
|
textCursor = self.plainTextEditReceive.textCursor()
|
|||
|
# 滚动到底部
|
|||
|
textCursor.movePosition(textCursor.End)
|
|||
|
# 设置光标到Text中去
|
|||
|
self.plainTextEditReceive.setTextCursor(textCursor)
|
|||
|
# 发送数据
|
|||
|
ser.write(input_s)
|
|||
|
def data_send_channel_warning(self):
|
|||
|
selected_name = self.find_selected_port()
|
|||
|
selected_num = len(selected_name)
|
|||
|
if selected_num == 0:
|
|||
|
QMessageBox.critical(self, '串口异常', '请先选择串口!')
|
|||
|
return None
|
|||
|
if selected_num > 1:
|
|||
|
reply = QMessageBox.question(None, "警告", "发送所有选中通道,是否继续执行?", QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
|
|||
|
if reply != QMessageBox.Yes:
|
|||
|
return None
|
|||
|
|
|||
|
def data_send_form_textEdit(self):
|
|||
|
self.data_send_channel_warning()
|
|||
|
self.data_send()
|
|||
|
def crc_rtu(self, data):
|
|||
|
crc = 0xFFFF
|
|||
|
for pos in data:
|
|||
|
crc ^= pos
|
|||
|
for _ in range(8):
|
|||
|
if (crc & 0x0001) != 0:
|
|||
|
crc >>= 1
|
|||
|
crc ^= 0xA001
|
|||
|
else:
|
|||
|
crc >>= 1
|
|||
|
return crc.to_bytes(2, byteorder='little') # 返回低字节在前
|
|||
|
def CheckCfgIniData(self):
|
|||
|
if not os.path.exists(self.IniPath):
|
|||
|
config = ConfigParser()
|
|||
|
#UI_config
|
|||
|
# config.add_section('UI_config')
|
|||
|
# config.set('UI_config', 'port', '9000')
|
|||
|
# config.set('UI_config', 'hex_send', '0')
|
|||
|
# config.set('UI_config', 'hex_receive', '0')
|
|||
|
# config.set('UI_config', 'add_date', '0')
|
|||
|
# config.set('UI_config', 'cr_lf', '0')
|
|||
|
# config.set('UI_config', 'auto_sav_log', '2')
|
|||
|
# config.set('UI_config', 'modbus_csv', '0')
|
|||
|
|
|||
|
# Modbus_config
|
|||
|
config.add_section('Modbus_config')
|
|||
|
config.set('Modbus_config', 'funcode', 'F4') #识别码
|
|||
|
config.set('Modbus_config', 'position', '5') #识别码所在位置
|
|||
|
|
|||
|
#DisHex_config
|
|||
|
config.add_section('DisHex_config')
|
|||
|
for i in range(self.quick_num):
|
|||
|
idx = f'{i:02}'
|
|||
|
rowname = f'row{idx}'
|
|||
|
config.set('DisHex_config', rowname, '|||||0')
|
|||
|
|
|||
|
|
|||
|
#Quick_config
|
|||
|
config.add_section('Quick_config')
|
|||
|
config.set('Quick_config', 'log_time', '10')
|
|||
|
for i in range(self.quick_num):
|
|||
|
idx = f'{i:02}'
|
|||
|
button_name = f'Button{idx}' # 格式化按钮名称,确保两位数
|
|||
|
config.set('Quick_config', button_name, '')
|
|||
|
with open(self.IniPath, 'w', encoding='utf-8') as f:
|
|||
|
config.write(f)
|
|||
|
#读取数据
|
|||
|
config = ConfigParser()
|
|||
|
config.read(self.IniPath, encoding='utf-8')
|
|||
|
try:
|
|||
|
# UI_config
|
|||
|
# port = config.get('UI_config', 'port')
|
|||
|
# hex_send = int(config.get('UI_config', 'hex_send'))
|
|||
|
# hex_receive = int(config.get('UI_config', 'hex_receive'))
|
|||
|
# add_date = int(config.get('UI_config', 'add_date'))
|
|||
|
# cr_lf = int(config.get('UI_config', 'cr_lf'))
|
|||
|
# auto_sav_log = int(config.get('UI_config', 'auto_sav_log'))
|
|||
|
# modbus_csv = int(config.get('UI_config', 'modbus_csv'))
|
|||
|
#self.lineEdit_port.setText(port)
|
|||
|
# self.checkBoxHexSend.setChecked(hex_send)
|
|||
|
# self.checkBoxHexReceive.setChecked(hex_receive)
|
|||
|
# self.checkBoxAddDate.setChecked(add_date)
|
|||
|
# self.checkBoxCRLF.setChecked(cr_lf)
|
|||
|
# self.checkBoxAutoSaveLog.setChecked(auto_sav_log)
|
|||
|
#self.checkBoxAutoSaveCsv.setChecked(modbus_csv)
|
|||
|
|
|||
|
|
|||
|
# Modbus_config
|
|||
|
self.funcode = config.get('Modbus_config', 'funcode')
|
|||
|
self.position = int(config.get('Modbus_config', 'position'))
|
|||
|
|
|||
|
# DisHex_config
|
|||
|
for i in range(self.quick_num):
|
|||
|
idx = f'{i:02}'
|
|||
|
rowname = f'row{idx}'
|
|||
|
rowstr = config.get('DisHex_config', rowname)
|
|||
|
rowstr_split = rowstr.split('|')
|
|||
|
|
|||
|
for col in range(len(rowstr_split)-1):
|
|||
|
item = QtWidgets.QTableWidgetItem()
|
|||
|
item.setText(rowstr_split[col])
|
|||
|
self.tableWidget.setItem(i, col, item)
|
|||
|
select = int(rowstr_split[col+1])
|
|||
|
item = self.tableWidget.item(i, 0) # 先反读回来,再设置,不然text丢失
|
|||
|
item.setCheckState(select)
|
|||
|
self.tableWidget.setItem(i, 0, item)
|
|||
|
|
|||
|
|
|||
|
|
|||
|
|
|||
|
# Quick_config
|
|||
|
# 创建一个空字典来存储按钮名称和对应的配置
|
|||
|
self.log_time = int(config.get('Quick_config', 'log_time'))
|
|||
|
# 循环遍历按钮编号,从0到19
|
|||
|
# for i in range(self.quick_num):
|
|||
|
# idx = f'{i:02}'
|
|||
|
# button_name = f'Button{idx}' # 格式化按钮名称,确保两位数
|
|||
|
# # 使用 get 方法安全地获取配置,如果不存在则返回空字符串
|
|||
|
# config_value = config.get('Quick_config', button_name, fallback='')
|
|||
|
# config_value_split = config_value.split('|')
|
|||
|
# # 将按钮名称和配置信息存储在字典中
|
|||
|
# if len(config_value_split) == 2:
|
|||
|
# # 打印字典查看结果,并赋值
|
|||
|
# print(button_name, config_value_split[0], config_value_split[1])
|
|||
|
# getattr(self, f'pushButtonQuick_{idx}').setText(config_value_split[1])
|
|||
|
# getattr(self, f'lineEditQuick_{idx}').setText(config_value_split[0])
|
|||
|
|
|||
|
except Exception as e:
|
|||
|
print(f"Error reading configuration: {e}")
|
|||
|
|
|||
|
def SavConfig(self):
|
|||
|
#关闭界面前保存快捷区域的命令和名称
|
|||
|
config = ConfigParser()
|
|||
|
config.read(self.IniPath, encoding='utf-8')
|
|||
|
# 保存UI_config
|
|||
|
#config.set('UI_config', 'port', str(self.lineEdit_port.text()))
|
|||
|
# config.set('UI_config', 'hex_send', str(self.checkBoxHexSend.checkState()))
|
|||
|
# config.set('UI_config', 'hex_receive', str(self.checkBoxHexReceive.checkState()))
|
|||
|
# config.set('UI_config', 'add_date', str(self.checkBoxAddDate.checkState()))
|
|||
|
# config.set('UI_config','cr_lf',str(self.checkBoxCRLF.checkState()))
|
|||
|
# config.set('UI_config', 'auto_sav_log', str(self.checkBoxAutoSaveLog.checkState()))
|
|||
|
#config.set('UI_config', 'modbus_csv', str(self.checkBoxAutoSaveCsv.checkState()))
|
|||
|
|
|||
|
# 保存DisHex_config
|
|||
|
for i in range(self.quick_num):
|
|||
|
rowstr = []
|
|||
|
idx = f'{i:02}'
|
|||
|
rowname = f'row{idx}'
|
|||
|
for col in range(self.tableWidget.columnCount()-1):
|
|||
|
item = self.tableWidget.item(i, col)
|
|||
|
if item is not None:
|
|||
|
rowstr.append(item.text())
|
|||
|
else:
|
|||
|
rowstr.append('') # 如果单元格为空,则添加空字符串
|
|||
|
selectStr = str(self.tableWidget.item(i,0).checkState())
|
|||
|
row_string = '|'.join(rowstr) + f'|{selectStr}'
|
|||
|
config.set('DisHex_config', rowname, row_string)
|
|||
|
|
|||
|
# 保存快捷按钮
|
|||
|
# for i in range(self.quick_num): # 假设有20个按钮
|
|||
|
# index_str = f"{i:02}" # 确保编号为两位数字形式
|
|||
|
# lineEditName = f"lineEditQuick_{index_str}"
|
|||
|
# buttonName = f"pushButtonQuick_{index_str}" # 格式化按钮名称,确保两位数
|
|||
|
# button_name = f"button{index_str}"
|
|||
|
# set_text = getattr(self, lineEditName).text() + "|" + getattr(self, buttonName).text()
|
|||
|
# config.set('Quick_config', button_name, set_text)
|
|||
|
|
|||
|
with open(self.IniPath, 'w', encoding='utf-8') as f:
|
|||
|
config.write(f)
|
|||
|
|
|||
|
def tempra_calibrate(self):
|
|||
|
# 多通道发送警告
|
|||
|
self.data_send_channel_warning()
|
|||
|
#获取温度后合成命令
|
|||
|
self.plainTextEditStastusCalib.appendPlainText('################')
|
|||
|
self.plainTextEditStastusCalib.appendPlainText('环境温度,开始标定')
|
|||
|
temp_str = self.lineEditTempra.text()
|
|||
|
if not temp_str.isdigit():
|
|||
|
self.plainTextEditStastusCalib.appendPlainText('环境温度输入错误')
|
|||
|
return
|
|||
|
temp_int = int(float(temp_str) * 100)
|
|||
|
temp_str = hex(temp_int)[2:].zfill(4) # 将十进制转换为十六进制,并填充到4位
|
|||
|
|
|||
|
|
|||
|
base_str = '0106012C' #温度标定的前置命令
|
|||
|
input_str = base_str + temp_str
|
|||
|
self.str_send(input_str)
|
|||
|
self.tempra_calib_check()
|
|||
|
|
|||
|
def tempra_calib_check(self):
|
|||
|
# 恢复所有端口单元格的默认背景颜色
|
|||
|
self.reset_port_colors()
|
|||
|
self.plainTextEditStastusCalib.appendPlainText('查询中...')
|
|||
|
check_str = '0103012C0001'
|
|||
|
# 查询温度标定结果
|
|||
|
self.query_timer = QTimer(self)
|
|||
|
self.query_timer.timeout.connect(lambda: self.ra(check_str))
|
|||
|
self.query_timer.start(1000) # 每隔1秒发送一次查询命令
|
|||
|
self.num = 0
|
|||
|
|
|||
|
|
|||
|
def laser_tempra_calibrate(self):
|
|||
|
# 多通道发送警告
|
|||
|
self.data_send_channel_warning()
|
|||
|
|
|||
|
def reset_port_colors(self):
|
|||
|
for port_name in self.buffers.keys():
|
|||
|
ind = self.find_port_row(port_name)
|
|||
|
if ind is not None:
|
|||
|
# 恢复默认背景颜色
|
|||
|
item = QTableWidgetItem('')
|
|||
|
self.tableWidgetSerChannel.setItem(ind, 2, item) # 设置第三列(索引为2)的文本
|
|||
|
brush = QtGui.QBrush(QtGui.QColor(255, 255, 255))
|
|||
|
self.tableWidgetSerChannel.item(ind, 2).setBackground(brush)
|
|||
|
|
|||
|
def send_query_command(self,commond_str):
|
|||
|
|
|||
|
self.str_send(commond_str)
|
|||
|
self.num +=1
|
|||
|
self.check_calib_result()
|
|||
|
if self.num > 5:
|
|||
|
self.query_timer.stop()
|
|||
|
self.plainTextEditStastusCalib.appendPlainText('查询结束,在通道处查看结果')
|
|||
|
|
|||
|
def stop_query(self):
|
|||
|
if hasattr(self, 'query_timer') and self.query_timer.isActive():
|
|||
|
self.query_timer.stop()
|
|||
|
|
|||
|
def check_calib_result(self):
|
|||
|
|
|||
|
for port_name, buffer in self.buffers.items():
|
|||
|
|
|||
|
if len(buffer)>6:
|
|||
|
ind = self.find_port_row(port_name)
|
|||
|
if buffer[:5] == b'\x01\x03\x02\x00\x02':
|
|||
|
self.plainTextEditStastusCalib.appendPlainText(f'{port_name}标定成功')
|
|||
|
if ind is not None:
|
|||
|
item = QTableWidgetItem('标定成功')
|
|||
|
self.tableWidgetSerChannel.setItem(ind, 2, item) # 设置第三列(索引为2)的文本
|
|||
|
brush = QtGui.QBrush(QtGui.QColor(0, 255, 0))
|
|||
|
self.tableWidgetSerChannel.item(ind, 2).setBackground(brush)
|
|||
|
|
|||
|
|
|||
|
elif buffer[:5] == b'\x01\x03\x02\x00\x01':
|
|||
|
self.plainTextEditStastusCalib.appendPlainText('标定中')
|
|||
|
if ind is not None:
|
|||
|
item = QTableWidgetItem('标定中')
|
|||
|
self.tableWidgetSerChannel.setItem(ind, 2, item) # 设置第三列(索引为2)的文本
|
|||
|
brush = QtGui.QBrush(QtGui.QColor(255, 255, 0))
|
|||
|
self.tableWidgetSerChannel.item(ind, 2).setBackground(brush)
|
|||
|
|
|||
|
|
|||
|
elif buffer[:5] == b'\x01\x03\x02\x00\x03':
|
|||
|
self.plainTextEditStastusCalib.appendPlainText('标定失败')
|
|||
|
if ind is not None:
|
|||
|
item = QTableWidgetItem('标定失败')
|
|||
|
self.tableWidgetSerChannel.setItem(ind, 2, item) # 设置第三列(索引为2)的文本
|
|||
|
brush = QtGui.QBrush(QtGui.QColor(255, 0, 0))
|
|||
|
self.tableWidgetSerChannel.item(ind, 2).setBackground(brush)
|
|||
|
|
|||
|
def str_send(self,send_str):
|
|||
|
send_bytes = bytes.fromhex(send_str)
|
|||
|
crc = self.crc_rtu(send_bytes)
|
|||
|
send_bytes = send_bytes + crc
|
|||
|
input_str = ' '.join(f'{byte:02X}' for byte in send_bytes)
|
|||
|
self.data_send(input_str)
|
|||
|
|
|||
|
|
|||
|
|
|||
|
class PT_Serial(QtWidgets.QDialog, Ui_SerialSet):
|
|||
|
def __init__(self):
|
|||
|
super().__init__()
|
|||
|
self.setupUi(self)
|
|||
|
|
|||
|
self.init()
|
|||
|
|
|||
|
def init(self):
|
|||
|
self.port_check()
|
|||
|
self.pushButtonTestSerial.clicked.connect(self.port_check)
|
|||
|
|
|||
|
|
|||
|
def port_check(self):
|
|||
|
try:
|
|||
|
# 获取所有存在的串口
|
|||
|
port_list = list(serial.tools.list_ports.comports())
|
|||
|
except Exception as e:
|
|||
|
print(f"Error fetching port list: {e}")
|
|||
|
port_list = []
|
|||
|
|
|||
|
# 清空组合框
|
|||
|
self.comboBoxSerial.clear()
|
|||
|
|
|||
|
if not port_list:
|
|||
|
# 如果没有检测到任何串口,添加提示项
|
|||
|
self.comboBoxSerial.addItem("无串口")
|
|||
|
self.Com_Dict = {}
|
|||
|
else:
|
|||
|
# 将串口信息存储在字典中,并添加到组合框
|
|||
|
self.Com_Dict = {port.device: port.description for port in port_list}
|
|||
|
for port in port_list:
|
|||
|
self.comboBoxSerial.addItem(port.device)
|
|||
|
|
|||
|
|
|||
|
|
|||
|
|
|||
|
|
|||
|
# %%打开串口
|
|||
|
def port_open(self):
|
|||
|
self.ser = serial.Serial() # 创建一个空对象
|
|||
|
self.ser.port = self.comboBoxSerial.currentText() # 串口号
|
|||
|
self.ser.baudrate = int(self.comboBoxBaudrate.currentText()) # 波特率
|
|||
|
self.ser.timeout = 0.001
|
|||
|
|
|||
|
flag_data = int(self.comboBoxDataBits.currentText()) # 数据位
|
|||
|
if flag_data == 5:
|
|||
|
self.ser.bytesize = serial.FIVEBITS
|
|||
|
elif flag_data == 6:
|
|||
|
self.ser.bytesize = serial.SIXBITS
|
|||
|
elif flag_data == 7:
|
|||
|
self.ser.bytesize = serial.SEVENBITS
|
|||
|
else:
|
|||
|
self.ser.bytesize = serial.EIGHTBITS
|
|||
|
|
|||
|
flag_data = self.comboBoxCheckBit.currentText() # 校验位
|
|||
|
if flag_data == "None":
|
|||
|
self.ser.parity = serial.PARITY_NONE
|
|||
|
elif flag_data == "Odd":
|
|||
|
self.ser.parity = serial.PARITY_ODD
|
|||
|
else:
|
|||
|
self.ser.parity = serial.PARITY_EVEN
|
|||
|
|
|||
|
flag_data = int(self.comboBoxStopBit.currentText()) # 停止位
|
|||
|
if flag_data == 1:
|
|||
|
self.ser.stopbits = serial.STOPBITS_ONE
|
|||
|
else:
|
|||
|
self.ser.stopbits = serial.STOPBITS_TWO
|
|||
|
|
|||
|
flag_data = self.comboBoxFlow.currentText() # 流控
|
|||
|
if flag_data == "No Ctrl Flow":
|
|||
|
self.ser.xonxoff = False # 软件流控
|
|||
|
self.ser.dsrdtr = False # 硬件流控 DTR
|
|||
|
self.ser.rtscts = False # 硬件流控 RTS
|
|||
|
elif flag_data == "SW Ctrl Flow":
|
|||
|
self.ser.xonxoff = True # 软件流控
|
|||
|
else:
|
|||
|
if self.checkBoxDTR.isChecked():
|
|||
|
self.ser.dsrdtr = True # 硬件流控 DTR
|
|||
|
if self.checkBoxRTS.isChecked():
|
|||
|
self.ser.rtscts = True # 硬件流控 RTS
|
|||
|
def port_close(self):
|
|||
|
self.ser.port = self.comboBoxSerial.currentText() # 串口号
|
|||
|
self.ser.close()
|
|||
|
|
|||
|
|
|||
|
#执行
|
|||
|
if __name__ == '__main__':
|
|||
|
app = QtWidgets.QApplication(sys.argv)
|
|||
|
myshow = LaserMethanePyqt5()
|
|||
|
myshow.show()
|
|||
|
sys.exit(app.exec_())
|