python/LaserMehtanePT_RB/pythonProject/laser_methane_pt.py
2025-03-14 16:15:51 +08:00

318 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import sys
import time
from datetime import datetime
from PyQt5.QtCore import QTimer
from PyQt5.QtWidgets import QApplication, QMessageBox
from calibration_manager import CalibrationManager
from config_manager import ConfigManager
from data_processor import DataProcessor
from log_config import setup_logging
from serial_manager import SerialManager
from ui_mian import UiMain
from ui_serial import UiSerial
setup_logging()
class LaserMethanePT(UiMain):
def __init__(self):
super().__init__()
logging.info("LaserMethanePT 初始化开始")
# 初始化各个模块
self.serial_window = UiSerial()
self.serial_manager = SerialManager()
self.data_processor = DataProcessor()
self.config_manager = ConfigManager()
self.calibration_manager = CalibrationManager(self.serial_manager, self.data_processor, self)
self.init_ui()
self.check_cfg_ini()
def init_ui(self):
# 绑定按钮事件
# 从配置菜单,打开串口设置窗口
self.actionSerial.triggered.connect(self.open_serial_window)
self.serial_window.pushButtonOpenSerial.clicked.connect(self.port_open) #打开串口
self.serial_window.pushButtonCloseSerial.clicked.connect(self.port_close) #关闭串口,检测串口在内部绑定
# 串口通信表格按钮时间,选择的信号和槽
self.pushButtonSerAll.clicked.connect(self.select_ser_channel_all)
self.pushButtonSerNo.clicked.connect(self.select_ser_channel_none)
self.pushButtonSerClear.clicked.connect(self.clear_ser_channel_all)
self.pushButtonSend.clicked.connect(self.data_send_form_textEdit)
# 标定功能
# 当点击温度标定按钮时连接到tempra_calibrate方法
self.pushButtonCalibTempra.clicked.connect(self.air_tempera_calibration)
# 当点击检查温度标定结果按钮时连接到calib_tempra_result方法
self.pushButtonCheckTempra.clicked.connect(self.air_tempera_calib_result)
# 当点击激光温度标定按钮时连接到laser_calibrate方法
self.pushButtonCalibLaserTemp.clicked.connect(self.laser_tempera_calibration)
# 当点击检查激光温度标定结果按钮时连接到calib_laser_result方法
self.pushButtonCheckLaserTempra.clicked.connect(self.laser_tempera_calib_result)
# 当点击停止激光标定按钮时连接到stop_laser_calib方法
self.pushButtonCalibLaserStop.clicked.connect(self.stop_laser_tempera_calibration)
# 当点击开始激光温度标定按钮时,连接到相应的方法
self.pushButtonCalibTempraLaserStart.clicked.connect(self.laser_tempera_start_calibration)
# 当点击检查开始激光温度标定结果按钮时,连接到相应的方法
self.pushButtonCheckTempraLaserStart.clicked.connect(self.laser_tempera_start_result)
# 当点击浓度标定按钮时连接到conc_calibrate方法
self.pushButtonCalibConcen.clicked.connect(self.conc_ppm_calibration)
# 当点击检查浓度标定结果按钮时连接到calib_concen_result方法
self.pushButtonCheckConcen.clicked.connect(self.conc_ppm_calib_result)
# 当点击重置浓度标定按钮时连接到reset_conc_calibrate方法
self.pushButtonCalibConcenReset.clicked.connect(self.reset_conc_ppm_calibration)
# 当点击检查重置浓度标定结果按钮时连接到reset_concen_result方法
self.pushButtonCheckConcenReset.clicked.connect(self.reset_conc_ppm_result)
def closeEvent(self, event):
self.save_cfg_ini()
self.serial_manager.port_all_close()
# 调用父类的关闭事件处理函数
super().closeEvent(event)
def open_serial_window(self):
self.serial_window.show()
def port_open(self):
# 打开串口
port_name = self.serial_window.get_port()
baudrate = self.serial_window.get_baudrate()
bytesize = self.serial_window.get_bytesize()
parity = self.serial_window.get_parity()
stopbits = self.serial_window.get_stopbit()
flowcontrol = self.serial_window.get_flow()
if self.serial_manager.open_port(port_name, baudrate, bytesize, parity, stopbits, flowcontrol):
self.add_item_ser_channel(port_name)
# 定时器接收数据
if not hasattr(self, 'timer'):
self.timer = QTimer()
self.timer.timeout.connect(self.data_receive)
# 打开串口接收定时器周期为20ms
self.timer.start(100)
def port_close(self):
# 关闭串口
port_name = self.serial_window.get_port()
if self.serial_manager.close_port(port_name):
self.delete_item_ser_channel(port_name)
def clear_ser_channel_all(self):
self.clear_ser_channel()
self.serial_manager.port_all_close()
def data_receive(self):
# 处理接收到的数据
for port_name in self.serial_manager.serials:
data = self.serial_manager.read_data(port_name)
if data is False:
time.sleep(1)
ind = self.find_port_row(port_name)
self.plainTextEditReceive.insertPlainText(f"[{port_name}] 链接断开\r\n")
self.set_background_color(ind, 0, 'red')
status = self.serial_manager.reopen_port(port_name)
if status:
self.plainTextEditReceive.insertPlainText(f"[{port_name}] 重连成功\r\n")
self.set_background_color(ind, 0, 'green')
if data:
# hex 格式
if self.checkBoxHexReceive.isChecked():
data_bytes = data
decoded_data = self.data_processor.format_bytes_to_hexstr_space(data_bytes)
data_hex_str = self.data_processor.format_bytes_to_hexstr(data_bytes)
self.disp_hex_receive(data_hex_str)
# 保存日志
self.data_processor.auto_save_log_hex(port_name, data_bytes)
else:
decoded_data = self.data_processor.decode_data(data)
# 保存日志
self.data_processor.auto_save_log_asc(port_name, decoded_data)
# 更新UI显示接收的数据
ind = self.find_port_row(port_name)
if ind == None:
return # 找不到对应的行,跳过处理
if self.tableWidgetSerChannel.item(ind, 0).checkState() == 2:
self.set_cursor_to_end() # 光标移动到末尾
if self.checkBoxAddDate.isChecked():
nowTime = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
self.plainTextEditReceive.insertPlainText(f" {nowTime} ")
self.plainTextEditReceive.insertPlainText(f"[{port_name}] ")
if self.checkBoxCRLF.isChecked():
decoded_data += '\r\n'
self.plainTextEditReceive.insertPlainText(decoded_data)
self.set_cursor_to_end() # 光标移动到末尾
def data_send(self, text=None):
# 发送数据
selected_name = self.find_selected_ports()
# 检查是否有打开的串口
selected_ports = [port_name for port_name, ser in self.serial_manager.serials.items() if
ser.is_open and port_name in selected_name]
if not selected_ports:
QMessageBox.critical(self, '串口异常', '没有打开的串口!')
return None
for port_name in selected_ports:
ser = self.serial_manager.serials[port_name]
if ser.isOpen():
if text is None:
input_s = self.plainTextEditSend.toPlainText()
else:
input_s = text
# 判断是否为非空字符串
if input_s != "":
# HEX发送
if self.checkBoxHexSend.isChecked():
send_data = self.data_processor.format_hexstr_space_to_bytes(input_s)
if send_data is None:
return None
if self.checkBoxAddCRC.isChecked():
crc = self.data_processor.crc_rtu(send_data)
send_data = send_data + crc
# ASCII发送
else:
send_data = self.data_processor.encode_data(input_s)
# 以下显示设置
# 时间显示
if self.checkBoxAddDate.isChecked():
nowTime = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
self.plainTextEditReceive.insertPlainText(f" {nowTime} ")
# HEX接收显示
if self.checkBoxHexReceive.isChecked():
self.plainTextEditReceive.insertPlainText(
self.data_processor.format_bytes_to_hexstr_space(send_data))
# ASCII接收显示
else:
self.plainTextEditReceive.insertPlainText(self.data_processor.decode_data(send_data))
# 接收换行
if self.checkBoxCRLF.isChecked():
self.plainTextEditReceive.insertPlainText('\r\n')
self.set_cursor_to_end()
self.serial_manager.write_data(port_name, send_data)
self.set_cursor_to_end()
def data_send_channel_warning(self):
selected_name = self.find_selected_ports()
selected_num = len(selected_name)
if selected_num == 0:
QMessageBox.critical(self, '串口异常', '请先选择串口!')
return None
if selected_num > 1:
reply = QMessageBox.question(None, "警告", "发送所有选中通道,是否继续执行?",
QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
if reply != QMessageBox.Yes:
return None
def data_send_form_textEdit(self):
self.data_send_channel_warning()
self.data_send()
def air_tempera_calibration(self):
# 开始温度标定
self.data_send_channel_warning()
temp_str = self.lineEditTempra.text()
self.calibration_manager.calibrate_air_tempera(temp_str)
def air_tempera_calib_result(self):
self.data_send_channel_warning()
self.calibration_manager.check_air_tempera()
def laser_tempera_calibration(self):
self.data_send_channel_warning()
self.calibration_manager.calibrate_laser_tempra()
def stop_laser_tempera_calibration(self):
self.calibration_manager.stop_calibrate_laser_tempra()
def laser_tempera_calib_result(self):
self.data_send_channel_warning()
self.calibration_manager.check_laser_tempra()
def laser_tempera_start_calibration(self):
self.data_send_channel_warning()
text_str = self.lineEditTempraLaserStart.text()
self.calibration_manager.calibrate_laser_tempera_start(text_str)
def laser_tempera_start_result(self):
self.data_send_channel_warning()
self.calibration_manager.check_laser_tempera_start()
def conc_ppm_calibration(self):
self.data_send_channel_warning()
ppm_str = self.lineEditConcen.text()
self.calibration_manager.calibrate_conc_ppm(ppm_str)
def conc_ppm_calib_result(self):
self.data_send_channel_warning()
self.calibration_manager.check_conc_ppm()
def reset_conc_ppm_calibration(self):
self.data_send_channel_warning()
self.calibration_manager.calibrate_reset_conc_ppm()
def reset_conc_ppm_result(self):
self.data_send_channel_warning()
self.calibration_manager.check_reset_conc_ppm()
def check_cfg_ini(self):
row_count = self.tableWidget.rowCount()
if not self.config_manager.read_config():
self.config_manager.create_default_config(row_count)
else:
self.config_manager.read_config()
# 更新串口配置
self.serial_window.set_port(self.config_manager.get_port())
self.serial_window.set_baudrate(self.config_manager.get_baudrate())
# 更新modbus 配种
self.data_processor.update_funcode(self.config_manager.get_funcode())
self.data_processor.update_position(self.config_manager.get_position())
self.data_processor.update_log_time(self.config_manager.get_log_time())
for i in range(row_count):
row_str = self.config_manager.get_dishex_row(i)
row_str_split = row_str.split('|')
len_split = len(row_str_split)
for j in range(len_split - 1):
self.set_disp_hex(i, j, row_str_split[j])
self.set_disp_select(i, int(row_str_split[len_split - 1]))
def save_cfg_ini(self):
# 保存当串口
self.config_manager.set_port(self.serial_window.get_port())
self.config_manager.set_baudrate(str(self.serial_window.get_baudrate()))
row_count = self.tableWidget.rowCount()
for i in range(row_count):
rowstr = []
for col in range(self.tableWidget.columnCount() - 2):
item = self.tableWidget.item(i, col)
if item is not None:
rowstr.append(item.text())
else:
rowstr.append('') # 如果单元格为空,则添加空字符串
selectStr = str(self.tableWidget.item(i, 0).checkState())
row_string = '|'.join(rowstr) + f'|{selectStr}'
self.config_manager.set_dishex_row(i, row_string)
if __name__ == "__main__":
app = QApplication(sys.argv)
window = LaserMethanePT()
window.show()
sys.exit(app.exec_())