2025-03-14 15:46:15 +08:00

632 lines
28 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import sys
import time
from datetime import datetime
from PyQt5.QtCore import QTimer
from PyQt5.QtWidgets import QApplication, QMessageBox, QInputDialog
from calibration_manager import CalibrationManager
from config_manager import ConfigManager
from data_processor import DataProcessor
from log_config import setup_logging
from net_manager import NetManager
from serial_manager import SerialManager
from ui_mian import UiMain
from ui_net import UiNet
from ui_serial import UiSerial
setup_logging()
class LaserMethanePT(UiMain):
def __init__(self):
super().__init__()
logging.info("LaserMethanePT 初始化开始")
self.buffers = {}
self.timer = QTimer()
self.timer.timeout.connect(self.data_receive)
self.timer_net = QTimer()
self.timer_net.timeout.connect(self.data_receive_net)
self.timer_check_channel = QTimer()
self.timer_check_channel.timeout.connect(self.check_channel_status)
# 初始化各个模块
self.serial_window = UiSerial()
self.net_window = UiNet()
self.serial_manager = SerialManager(self.buffers)
self.net_manager = NetManager(self.buffers)
self.data_processor = DataProcessor()
self.config_manager = ConfigManager()
self.calibration_manager = CalibrationManager(self.serial_manager, self.data_processor, self)
self.init_ui()
self.check_cfg_ini()
def init_ui(self):
# 绑定按钮事件
# 从配置菜单,打开串口设置窗口
self.actionSerial.triggered.connect(self.open_serial_window)
self.serial_window.pushButtonOpenSerial.clicked.connect(self.port_open) #打开串口
self.serial_window.pushButtonCloseSerial.clicked.connect(self.port_close) #关闭串口,检测串口在内部绑定
# 从配置菜单,打开网络设置窗口
self.actionNet.triggered.connect(self.open_net_window)
self.net_window.pushButton_openNet.clicked.connect(self.net_open)
self.net_window.pushButton_closeNet.clicked.connect(self.net_close)
# 串口通信表格按钮时间,选择的信号和槽
self.pushButtonSerAll.clicked.connect(self.select_ser_channel_all)
self.pushButtonSerNo.clicked.connect(self.select_ser_channel_none)
self.pushButtonSerClear.clicked.connect(self.clear_ser_channel_all)
# 网口数据通道表格按钮,信号和槽
self.pushButtonNetAll.clicked.connect(self.select_net_channel_all)
self.pushButtonNetNo.clicked.connect(self.select_net_channel_none)
self.pushButtonNetClear.clicked.connect(self.clear_net_channel_all)
self.pushButtonSend.clicked.connect(self.data_send_form_textEdit)
# 快捷指令扩展区域
self.pushButton_expend.clicked.connect(self.adjust_sidebar)
# 清除发送按钮
self.pushButtonClearSend.clicked.connect(self.send_data_clear)
# 清除接收按钮
self.pushButtonClearReceive.clicked.connect(self.receive_data_clear)
# 连接按钮点击事件到槽函数
for i in range(self.quick_num): # 从0到20
index_str = f"{i:02}" # 确保编号为两位数字形式
buttonName = f"pushButtonQuick_{index_str}"
button = getattr(self, buttonName)
if button:
button.clicked.connect(lambda checked, idx=i: self.onButtonClick(idx))
self.double_click_timers = {} # 存储双击定时器
# 标定功能
# 当点击温度标定按钮时连接到tempra_calibrate方法
self.pushButtonCalibTempra.clicked.connect(self.air_tempera_calibration)
# 当点击检查温度标定结果按钮时连接到calib_tempra_result方法
self.pushButtonCheckTempra.clicked.connect(self.air_tempera_calib_result)
# 当点击激光温度标定按钮时连接到laser_calibrate方法
self.pushButtonCalibLaserTemp.clicked.connect(self.laser_tempera_calibration)
# 当点击检查激光温度标定结果按钮时连接到calib_laser_result方法
self.pushButtonCheckLaserTempra.clicked.connect(self.laser_tempera_calib_result)
# 当点击停止激光标定按钮时连接到stop_laser_calib方法
self.pushButtonCalibLaserStop.clicked.connect(self.stop_laser_tempera_calibration)
# 当点击检查工作温度标定结果按钮时连接到calib_work_tempra_result方法
self.pushButtonCheckWorkTempra.clicked.connect(self.laser_work_tempra_result)
# 当点击开始激光温度标定按钮时,连接到相应的方法
self.pushButtonCalibTempraLaserStart.clicked.connect(self.laser_tempera_start_calibration)
# 当点击检查开始激光温度标定结果按钮时,连接到相应的方法
self.pushButtonCheckTempraLaserStart.clicked.connect(self.laser_tempera_start_result)
# 当点击浓度标定按钮时连接到conc_calibrate方法
self.pushButtonCalibConcen.clicked.connect(self.conc_ppm_calibration)
# 当点击检查浓度标定结果按钮时连接到calib_concen_result方法
self.pushButtonCheckConcen.clicked.connect(self.conc_ppm_calib_result)
# 当点击重置浓度标定按钮时连接到reset_conc_calibrate方法
self.pushButtonCalibConcenReset.clicked.connect(self.reset_conc_ppm_calibration)
# 当点击检查重置浓度标定结果按钮时连接到reset_concen_result方法
self.pushButtonCheckConcenReset.clicked.connect(self.reset_conc_ppm_result)
self.pushButtonCalibHexLength.clicked.connect(self.set_out_hex_length)
self.pushButtonCheckHexLength.clicked.connect(self.get_out_hex_length)
def closeEvent(self, event):
self.save_cfg_ini()
self.serial_manager.port_all_close()
# 调用父类的关闭事件处理函数
logging.info("LaserMethanePT 关闭")
super().closeEvent(event)
def open_serial_window(self):
self.serial_window.show()
def port_open(self):
# 打开串口
port_name = self.serial_window.get_port()
baudrate = self.serial_window.get_baudrate()
bytesize = self.serial_window.get_bytesize()
parity = self.serial_window.get_parity()
stopbits = self.serial_window.get_stopbit()
flowcontrol = self.serial_window.get_flow()
self.stackedWidget.setCurrentIndex(1)
self.actionNet.setEnabled(False)
if self.serial_manager.open_port(port_name, baudrate, bytesize, parity, stopbits, flowcontrol):
self.add_item_ser_channel(port_name)
# 定时器接收数据
# 打开串口接收定时器周期为20ms
self.timer.start(20)
def port_close(self):
# 关闭串口
port_name = self.serial_window.get_port()
if self.serial_manager.close_port(port_name):
self.delete_item_ser_channel(port_name)
def clear_ser_channel_all(self):
self.calibration_manager.reset_port_colors()
self.clear_ser_channel()
self.serial_manager.port_all_close()
def clear_net_channel_all(self):
self.calibration_manager.reset_port_colors()
self.clear_net_channel()
# self.net_manager.net_all_close()
def open_net_window(self):
self.net_window.show()
def net_open(self):
ip_str = self.net_window.get_local_ip()
port_str = self.net_window.get_port()
net_name = ip_str + ':' + port_str
self.net_manager.open_net(net_name)
self.stackedWidget.setCurrentIndex(0)
self.actionSerial.setEnabled(False)
# 打开串口接收定时器周期为20ms
self.timer_net.start(20)
self.timer_check_channel.start(10000) # 添加定时器每10秒检查一次通道状态
def net_close(self):
ip_str = self.net_window.get_local_ip()
port_str = self.net_window.get_port()
net_name = ip_str + ':' + port_str
self.net_manager.close_net(net_name)
def data_receive(self):
# 处理接收到的数据
for port_name in self.serial_manager.serials:
data = self.serial_manager.read_data(port_name)
if data is False:
time.sleep(1)
ind = self.find_port_ser_row(port_name)
self.plainTextEditReceive.insertPlainText(f"[{port_name}] 链接断开\r\n")
self.set_background_color_ser_channel(ind, 0, 'red')
status = self.serial_manager.reopen_port(port_name)
if status:
self.plainTextEditReceive.insertPlainText(f"[{port_name}] 重连成功\r\n")
self.set_background_color_ser_channel(ind, 0, 'green')
if data:
# hex 格式
if self.checkBoxHexReceive.isChecked():
data_bytes = data
decoded_data = self.data_processor.format_bytes_to_hexstr_space(data_bytes)
data_hex_str = self.data_processor.format_bytes_to_hexstr(data_bytes)
# 保存日志
if self.checkBoxAutoSaveLog.isChecked():
self.data_processor.auto_save_log_hex(data_bytes, port_name)
else:
decoded_data = self.data_processor.decode_data(data)
# 保存日志
if self.checkBoxAutoSaveLog.isChecked():
self.data_processor.auto_save_log_asc(decoded_data, port_name)
# 更新UI显示接收的数据
ind = self.find_port_ser_row(port_name)
if ind is None:
return # 找不到对应的行,跳过处理
if self.tableWidgetSerChannel.item(ind, 0).checkState() == 2:
if 'data_hex_str' in locals():
self.disp_hex_receive(data_hex_str) # 显示在表格中
self.set_cursor_to_end() # 光标移动到末尾
if self.checkBoxAddDate.isChecked():
nowTime = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
self.plainTextEditReceive.insertPlainText(f" {nowTime} ")
self.plainTextEditReceive.insertPlainText(f"[{port_name}] ")
if self.checkBoxCRLF.isChecked():
decoded_data += '\r\n'
self.plainTextEditReceive.insertPlainText(decoded_data)
self.set_cursor_to_end() # 光标移动到末尾
def data_send(self, text=None):
# 发送数据
selected_name = self.find_selected_ser_ports()
# 检查是否有打开的串口
selected_ports = [port_name for port_name, ser in self.serial_manager.serials.items() if
ser.is_open and port_name in selected_name]
selected_channels = self.find_selected_net_ports()
if not selected_ports and self.stackedWidget.currentIndex() == 1:
# QMessageBox.critical(self, '串口异常', '没有打开的串口!')
self.plainTextEditReceive.insertPlainText('没有打开的串口!\r\n')
logging.debug('没有打开的串口!')
else:
for port_name in selected_ports:
ser = self.serial_manager.serials[port_name]
if ser.isOpen():
if text is None:
input_s = self.plainTextEditSend.toPlainText()
else:
input_s = text
# 判断是否为非空字符串
if input_s != "":
# HEX发送
if self.checkBoxHexSend.isChecked():
send_data = self.data_processor.format_hexstr_space_to_bytes(input_s)
if send_data is None:
return None
if self.checkBoxAddCRC.isChecked():
crc = self.data_processor.crc_rtu(send_data)
send_data = send_data + crc
# ASCII发送
else:
send_data = self.data_processor.encode_data(input_s)
# 以下显示设置
# 时间显示
if self.checkBoxAddDate.isChecked():
nowTime = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
self.plainTextEditReceive.insertPlainText(f" {nowTime} ")
# HEX接收显示
if self.checkBoxHexReceive.isChecked():
self.plainTextEditReceive.insertPlainText(
self.data_processor.format_bytes_to_hexstr_space(send_data))
# ASCII接收显示
else:
self.plainTextEditReceive.insertPlainText(self.data_processor.decode_data(send_data))
# 接收换行
if self.checkBoxCRLF.isChecked():
self.plainTextEditReceive.insertPlainText('\r\n')
self.set_cursor_to_end()
self.serial_manager.write_data(port_name, send_data)
self.set_cursor_to_end()
if not selected_channels and self.stackedWidget.currentIndex() == 0:
# QMessageBox.critical(self, '网络异常', '没有打开的网络!')
logging.debug('没有打开的网络!')
self.plainTextEditReceive.insertPlainText('没有打开的网络!')
return
else:
for channal_name in selected_channels:
if text is None:
input_s = self.plainTextEditSend.toPlainText()
else:
input_s = text
# 判断是否为非空字符串
if input_s != "":
# HEX发送
if self.checkBoxHexSend.isChecked():
send_data = self.data_processor.format_hexstr_space_to_bytes(input_s)
if send_data is None:
return None
if self.checkBoxAddCRC.isChecked():
crc = self.data_processor.crc_rtu(send_data)
send_data = send_data + crc
# ASCII发送
else:
send_data = self.data_processor.encode_data(input_s)
# 以下显示设置
# 时间显示
if self.checkBoxAddDate.isChecked():
nowTime = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
self.plainTextEditReceive.insertPlainText(f" {nowTime} ")
# HEX接收显示
if self.checkBoxHexReceive.isChecked():
self.plainTextEditReceive.insertPlainText(
self.data_processor.format_bytes_to_hexstr_space(send_data))
# ASCII接收显示
else:
self.plainTextEditReceive.insertPlainText(self.data_processor.decode_data(send_data))
# 接收换行
if self.checkBoxCRLF.isChecked():
self.plainTextEditReceive.insertPlainText('\r\n')
self.set_cursor_to_end()
self.net_manager.write_data(send_data, channal_name)
def data_receive_net(self):
for net_name in self.net_manager.net_connections:
data = self.net_manager.read_data(net_name)
if data:
# logging.debug(f"Received data from <{data[1][0]}:{data[1][1]} > {data[0]}")
channel_name = data[1][0] + ':' + str(data[1][1])
# hex 格式
if self.checkBoxHexReceive.isChecked():
data_bytes = data[0]
decoded_data = self.data_processor.format_bytes_to_hexstr_space(data_bytes)
data_hex_str = self.data_processor.format_bytes_to_hexstr(data_bytes)
# 保存日志
if self.checkBoxAutoSaveLog.isChecked():
self.data_processor.auto_save_log_hex(data_bytes)
if self.checkBoxAutoSaveDB.isChecked():
self.data_processor.auto_save_hex_to_db(data_bytes)
else:
decoded_data = self.data_processor.decode_data(data[0])
logging.debug(f"Received data from {decoded_data} ")
# 保存日志
if self.checkBoxAutoSaveLog.isChecked():
self.data_processor.auto_save_log_asc(decoded_data)
# 更新UI显示接收的数据
if not self.find_port_net_row(channel_name):
self.add_item_net_channel(channel_name)
ind = self.find_port_net_row(channel_name)
if ind is None:
return # 找不到对应的行,跳过处理
else:
self.set_background_color_net_channel(ind, 0, 'green')
if 'data_hex_str' in locals():
try:
self.update_net_table_status(ind, 0, data_hex_str[0:8], 'green')
except Exception as e:
logging.error(f"Error updating net table status: {e}")
if self.tableWidgetNetChannel.item(ind, 0).checkState() == 2:
if 'data_hex_str' in locals():
self.disp_hex_receive(data_hex_str) # 显示在表格中
self.set_cursor_to_end() # 光标移动到末尾
if self.checkBoxAddDate.isChecked():
nowTime = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
self.plainTextEditReceive.insertPlainText(f" {nowTime} ")
self.plainTextEditReceive.insertPlainText(f"<{data[1][0]}:{data[1][1]} > ")
if self.checkBoxCRLF.isChecked():
decoded_data += '\r\n'
self.plainTextEditReceive.insertPlainText(decoded_data)
self.set_cursor_to_end() # 光标移动到末尾
def check_channel_status(self):
all_channel_name = self.find_all_net_ports()
for channel_name in all_channel_name:
self.update_net_table_status(self.find_port_net_row(channel_name), 0, '', 'white')
def data_send_channel_warning(self):
selected_name = self.find_selected_ser_ports()
selected_num = len(selected_name)
selceted_channel = self.find_selected_net_ports()
selccted_net_num = len(selceted_channel)
if selected_num == 0 and selccted_net_num == 0:
QMessageBox.critical(self, '串口异常', '请先选择串口或网口通道!')
self.plainTextEditReceive.insertPlainText('请先选择串口或网口通道!\r\n')
logging.debug('请先选择串口或网口通道!')
return None
if selected_num + selccted_net_num > 1:
reply = QMessageBox.question(None, "警告", "发送所有选中通道,是否继续执行?",
QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
if reply != QMessageBox.Yes:
return None
return QMessageBox.Yes
def data_send_form_textEdit(self):
reply = self.data_send_channel_warning()
if reply == QMessageBox.Yes:
self.data_send()
def air_tempera_calibration(self):
# 开始温度标定
reply = self.data_send_channel_warning()
if reply == QMessageBox.Yes:
temp_str = self.lineEditTempra.text()
self.calibration_manager.calibrate_air_tempera(temp_str)
def air_tempera_calib_result(self):
reply = self.data_send_channel_warning()
if reply == QMessageBox.Yes:
self.calibration_manager.check_air_tempera()
def laser_tempera_calibration(self):
reply = self.data_send_channel_warning()
if reply == QMessageBox.Yes:
self.calibration_manager.calibrate_laser_tempra()
def stop_laser_tempera_calibration(self):
self.calibration_manager.stop_calibrate_laser_tempra()
def laser_tempera_calib_result(self):
reply = self.data_send_channel_warning()
if reply == QMessageBox.Yes:
self.calibration_manager.check_laser_tempra()
def laser_work_tempra_result(self):
reply = self.data_send_channel_warning()
if reply == QMessageBox.Yes:
self.calibration_manager.check_work_tempra()
def laser_tempera_start_calibration(self):
reply = self.data_send_channel_warning()
if reply == QMessageBox.Yes:
text_str = self.lineEditTempraLaserStart.text()
self.calibration_manager.calibrate_laser_tempera_start(text_str)
def laser_tempera_start_result(self):
reply = self.data_send_channel_warning()
if reply == QMessageBox.Yes:
self.calibration_manager.check_laser_tempera_start()
def conc_ppm_calibration(self):
reply = self.data_send_channel_warning()
if reply == QMessageBox.Yes:
ppm_str = self.lineEditConcen.text()
self.calibration_manager.calibrate_conc_ppm(ppm_str)
def conc_ppm_calib_result(self):
reply = self.data_send_channel_warning()
if reply == QMessageBox.Yes:
self.calibration_manager.check_conc_ppm()
def reset_conc_ppm_calibration(self):
reply = self.data_send_channel_warning()
if reply == QMessageBox.Yes:
self.calibration_manager.calibrate_reset_conc_ppm()
def reset_conc_ppm_result(self):
reply = self.data_send_channel_warning()
if reply == QMessageBox.Yes:
self.calibration_manager.check_reset_conc_ppm()
def set_out_hex_length(self):
reply = self.data_send_channel_warning()
if reply == QMessageBox.Yes:
length_str = self.lineEditHexLenth.text()
self.calibration_manager.set_out_hex_length(length_str)
def get_out_hex_length(self):
reply = self.data_send_channel_warning()
if reply == QMessageBox.Yes:
self.calibration_manager.get_out_hex_length()
def adjust_sidebar(self):
if self.widget_2.isHidden():
self.widget_2.show()
else:
self.widget_2.hide()
def send_data_clear(self):
self.plainTextEditSend.clear()
def receive_data_clear(self):
self.plainTextEditReceive.clear()
def onButtonClick(self, idx):
# 槽函数处理按钮点击事件
if idx not in self.double_click_timers:
self.double_click_timers[idx] = None
if self.double_click_timers[idx] is None:
self.double_click_timers[idx] = True
QTimer.singleShot(200, lambda: self.onButtonSigleClick(idx))
else:
self.onButtonDoubleClick(idx)
def onButtonSigleClick(self, idx):
index_str = f"{idx:02}"
if self.double_click_timers[idx]:
self.double_click_timers[idx] = None
lineEdit = getattr(self, f"lineEditQuick_{index_str}")
print(f"Text in lineEdit_{idx}: {lineEdit.text()}")
text = lineEdit.text()
if self.checkBox_return.isChecked():
text = text + "\r\n"
reply = self.data_send_channel_warning()
if reply == QMessageBox.Yes:
self.data_send(text)
def onButtonDoubleClick(self, idx):
# 槽函数处理按钮双击事件
self.double_click_timers[idx] = None
index_str = f"{idx:02}"
print(f"Double click detected on Button {index_str}.")
button = getattr(self, f"pushButtonQuick_{index_str}")
new_name, ok = QInputDialog.getText(self, 'Button Rename', 'Enter new button name:')
if ok and new_name:
button.setText(new_name)
def check_cfg_ini(self):
row_count = self.tableWidget.rowCount()
if not self.config_manager.read_config():
self.config_manager.create_default_config(row_count)
else:
self.config_manager.read_config()
# 更主界面首发配置
self.set_checkBoxHexReceive(int(self.config_manager.get_hex_receive()))
self.set_checkBoxHexSend(int(self.config_manager.get_hex_send()))
self.set_checkBoxCRLF(int(self.config_manager.get_cr_lf()))
self.set_checkBoxAddDate(int(self.config_manager.get_add_date()))
self.set_checkBoxAutoSaveDB(int(self.config_manager.get_auto_sav_db()))
self.set_checkBoxAutoSaveLog(int(self.config_manager.get_auto_sav_log()))
self.set_checkBoxAddCRC(int(self.config_manager.get_add_crc_rtu()))
# 更新串口配置
self.serial_window.set_port(self.config_manager.get_port())
self.serial_window.set_baudrate(self.config_manager.get_baudrate())
self.serial_window.set_parity(self.config_manager.get_parity())
# 更新网口配置
self.net_window.set_port(self.config_manager.get_net_port())
# 更新modbus 配种
self.data_processor.update_funcode(self.config_manager.get_funcode())
self.data_processor.update_position(self.config_manager.get_position())
self.data_processor.update_log_size(self.config_manager.get_log_time())
# 更新显示界面配置
for i in range(row_count):
row_str = self.config_manager.get_dishex_row(i)
row_str_split = row_str.split('|')
len_split = len(row_str_split)
for j in range(len_split - 1):
self.set_disp_hex(i, j, row_str_split[j])
self.set_disp_select(i, int(row_str_split[len_split - 1]))
# 更新快捷键配置
for i in range(self.quick_num):
idx = f"{i:02}"
quick_str = self.config_manager.get_quick_row(i)
quick_str_split = quick_str.split('|')
if len(quick_str_split) == 2:
# 打印字典查看结果,并赋值
getattr(self, f'pushButtonQuick_{idx}').setText(quick_str_split[1])
getattr(self, f'lineEditQuick_{idx}').setText(quick_str_split[0])
def save_cfg_ini(self):
# 主界面保存配置
self.config_manager.set_hex_receive(str(self.get_checkBoxHexReceive()))
self.config_manager.set_hex_send(str(self.get_checkBoxHexSend()))
self.config_manager.set_cr_lf(str(self.get_checkBoxCRLF()))
self.config_manager.set_add_date(str(self.get_checkBoxAddDate()))
self.config_manager.set_auto_sav_db(str(self.get_checkBoxAutoSaveDB()))
self.config_manager.set_auto_sav_log(str(self.get_checkBoxAutoSaveLog()))
self.config_manager.set_add_crc_rtu(str(self.get_checkBoxAddCRC()))
# 保存当串口
self.config_manager.set_port(self.serial_window.get_port())
self.config_manager.set_baudrate(str(self.serial_window.get_baudrate()))
self.config_manager.set_parity(self.serial_window.get_parity())
# 保存网口
self.config_manager.set_net_port(self.net_window.get_port())
# 保存数据显示页面配置
row_count = self.tableWidget.rowCount()
for i in range(row_count):
rowstr = []
for col in range(self.tableWidget.columnCount() - 2):
item = self.tableWidget.item(i, col)
if item is not None:
rowstr.append(item.text())
else:
rowstr.append('') # 如果单元格为空,则添加空字符串
selectStr = str(self.tableWidget.item(i, 0).checkState())
row_string = '|'.join(rowstr) + f'|{selectStr}'
self.config_manager.set_dishex_row(i, row_string)
# 保存快捷键配置
for i in range(self.quick_num): # 假设有20个按钮
index_str = f"{i:02}" # 确保编号为两位数字形式
lineEditName = f"lineEditQuick_{index_str}"
buttonName = f"pushButtonQuick_{index_str}" # 格式化按钮名称,确保两位数
set_text = getattr(self, lineEditName).text() + "|" + getattr(self, buttonName).text()
self.config_manager.set_quick_buttion(i, set_text)
if __name__ == "__main__":
app = QApplication(sys.argv)
window = LaserMethanePT()
window.show()
sys.exit(app.exec_())