python/LaserMehtanePT_RB/pythonProject-v2/calibration_manager.py
2025-03-14 15:46:15 +08:00

324 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import time
from PyQt5.QtCore import QTimer, pyqtSignal, QObject
from PyQt5.QtGui import QColor
class CalibrationManager(QObject):
query_finished = pyqtSignal(str, bytes)
# 定义常量
WRITE_SINGLE_REGISTER = 0x06
READ_HOLDING_REGISTERS = 0x03
QUERY_INTERVAL = 1000 # 查询间隔时间,单位为毫秒
def __init__(self, serial_manager, data_processor, ui):
super().__init__()
self.serial_manager = serial_manager
self.data_processor = data_processor
self.ui = ui
self.num = 0
self.modbus_data = [0] * 20
self.query_modbus_bytes = None
self.query_timer = QTimer(self.ui)
self.query_timer.timeout.connect(self.check_query_value) # 定时查询
self.query_finished.connect(self.process_query_value) # 处理数据
def write_plain_text_edit(self, text, log_level='info'):
"""统一处理 plainTextEditStastusCalib 的文本写入和日志记录"""
self.ui.plainTextEditStastusCalib.appendPlainText(text)
if log_level == 'info':
logging.info(text)
elif log_level == 'error':
logging.error(text)
elif log_level == 'debug':
logging.debug(text)
def calibrate_air_tempera(self, temp_str):
"""开始温度标定"""
self.write_plain_text_edit('################\r\n环境温度,开始标定')
if not temp_str.isdigit():
self.write_plain_text_edit('环境温度输入错误', log_level='error')
return
temp_int = int(float(temp_str) * 100)
temp_hex = temp_int.to_bytes(2, 'big')
self.write_register(0x01, 0x2C, temp_hex)
time.sleep(1)
self.check_air_tempera()
def check_air_tempera(self):
self.reset_port_colors()
self.write_plain_text_edit('################\r\n空气温度标定状态:')
self.read_register(0x01, 0x2C, 0x00, 0x01)
def calibrate_laser_tempra(self):
self.write_plain_text_edit('################')
self.write_plain_text_edit('laser温度标定')
self.write_register(0x01, 0x2D, b'\x00\x01')
time.sleep(1)
self.check_laser_tempra()
def stop_calibrate_laser_tempra(self):
self.write_plain_text_edit('################')
self.write_register(0x01, 0x2D, b'\x00\x00')
self.write_plain_text_edit('laser温度标定结束')
def check_laser_tempra(self):
self.reset_port_colors()
self.write_plain_text_edit('查询中...')
self.read_register(0x01, 0x2D, 0x00, 0x01)
def check_work_tempra(self):
self.reset_port_colors()
self.write_plain_text_edit('查询当前激光工作点')
self.read_register(0x00, 0x01, 0x00, 0x01)
def calibrate_laser_tempera_start(self, temp_str):
self.write_plain_text_edit('################')
self.write_plain_text_edit('laser温度标定起始点修改')
if not temp_str.isdigit():
self.write_plain_text_edit('环境温度输入错误')
return
temp_int = int(float(temp_str) * 100)
temp_hex = temp_int.to_bytes(2, 'big')
self.write_register(0x00, 0x16, temp_hex)
time.sleep(1)
self.check_laser_tempera_start()
def check_laser_tempera_start(self):
self.reset_port_colors()
self.write_plain_text_edit('################')
self.write_plain_text_edit('laser温度标定起始点查询')
self.read_register(0x00, 0x16, 0x00, 0x01)
def calibrate_conc_ppm(self, ppm_str):
self.write_plain_text_edit('################')
self.write_plain_text_edit('浓度标定')
if not ppm_str.isdigit():
self.write_plain_text_edit('浓度输入错误')
return
ppm_int = int(float(ppm_str))
ppm_hex = ppm_int.to_bytes(2, 'big')
self.write_register(0x01, 0x2E, ppm_hex)
time.sleep(1)
self.check_conc_ppm()
def check_conc_ppm(self):
self.reset_port_colors()
self.write_plain_text_edit('################')
self.write_plain_text_edit('浓度标定,查询')
self.read_register(0x01, 0x2E, 0x00, 0x01)
def calibrate_reset_conc_ppm(self):
self.write_plain_text_edit('################')
self.write_plain_text_edit('浓度标定,重置')
self.write_register(0x01, 0x2F, b'\x00\x00')
time.sleep(1)
self.check_reset_conc_ppm()
def check_reset_conc_ppm(self):
self.reset_port_colors()
self.write_plain_text_edit('################')
self.write_plain_text_edit('浓度标定,重置')
self.read_register(0x00, 0x64, 0x00, 0x01)
def set_out_hex_length(self,length_str):
"""设置输出的hex长度"""
self.write_plain_text_edit('设置输出的hex长度')
if not length_str.isdigit():
self.write_plain_text_edit('参数输入错误')
return
length_int = int(float(length_str))
length_hex = length_int.to_bytes(2, 'big')
self.write_register(0x00, 0x17, length_hex)
time.sleep(1)
self.get_out_hex_length()
def get_out_hex_length(self):
self.reset_port_colors()
self.write_plain_text_edit('查询输出的hex长度')
self.read_register(0x00, 0x17, 0x00, 0x01)
def write_register(self, address_hi, address_lo, value):
"""写入寄存器"""
self.modbus_data[0:6] = [0x01, self.WRITE_SINGLE_REGISTER, address_hi, address_lo, value[0],
value[1]]
self.send_bytes(bytes(self.modbus_data[:6]))
def read_register(self, address_hi, address_lo, add_num_hi, add_num_lo):
"""读取寄存器"""
self.modbus_data[0:6] = [0x01, self.READ_HOLDING_REGISTERS, address_hi, address_lo, add_num_hi, add_num_lo]
self.query_modbus_bytes = bytes(self.modbus_data[:6])
self.send_bytes(self.query_modbus_bytes)
self.write_plain_text_edit('查询中...')
self.num = 0
self.start_query_timer()
def send_bytes(self, modbus_bytes):
crc = self.data_processor.crc_rtu(modbus_bytes)
modbus_bytes = modbus_bytes + crc
modbus_send_str = self.data_processor.format_bytes_to_hexstr_space(modbus_bytes)
self.ui.data_send(modbus_send_str)
def validate_response(self, buffer):
"""验证响应"""
self.write_plain_text_edit(f'开始捕获:{buffer.hex().upper()}')
start_index = buffer.find(b'\x01\x03')
if start_index != -1 and len(buffer) >= start_index + 5:
self.write_plain_text_edit(f'捕获成功,开始验证:{buffer.hex().upper()}')
byte_count = buffer[start_index + 2]
frame_length = start_index + 5 + byte_count
if len(buffer) >= frame_length:
crc = self.data_processor.crc_rtu(buffer[start_index:frame_length - 2])
if buffer[frame_length - 2:frame_length] == crc:
self.write_plain_text_edit('验证成功')
return buffer[start_index:frame_length]
else:
self.write_plain_text_edit('验证失败')
self.write_plain_text_edit('捕获失败')
return False
def check_query_value(self):
self.num += 1
self.write_plain_text_edit(f'执行查询次数:{self.num}', log_level='debug')
if self.num > 10:
self.num = 0
timer_status = self.stop_query()
self.write_plain_text_edit('查询超时')
self.set_cursor_to_end()
return False
channel_ports = self.ui.find_selected_ser_ports()
channel_nets = self.ui.find_selected_net_ports()
if channel_ports is None:
channel_ports = []
if channel_nets is None:
channel_nets = []
if len(channel_ports) == 0 and len(channel_nets) == 0:
self.stop_query()
self.write_plain_text_edit('查询结束,在通道处查看结果')
# 如果只有一个通道,重新勾选
channel_all_nets = self.ui.find_all_net_ports()
channel_all_ports = self.ui.find_all_ser_ports()
if len(channel_all_nets) == 1:
self.ui.select_net_channel_single(channel_all_nets[0], 2)
if len(channel_all_ports) == 1:
self.ui.select_ser_channel_single(channel_all_ports[0], 2)
return False
for port_name, buffer in self.ui.buffers.items():
if port_name in channel_ports or port_name in channel_nets:
buffer_valid = self.validate_response(buffer)
if buffer_valid:
self.write_plain_text_edit('发射信号')
self.query_finished.emit(port_name, buffer_valid)
self.ui.buffers[port_name] = b''
def process_query_value(self, port_name, buffer):
"""处理查询数据"""
self.write_plain_text_edit(f'数据通道:{port_name},数据: {buffer.hex().upper()}', log_level='debug')
if len(buffer) > 6:
status = self.parse_query_value(self.query_modbus_bytes, buffer)
ind = self.ui.find_port_ser_row(port_name)
ind_net = self.ui.find_port_net_row(port_name)
if status:
if ind is not None:
self.ui.update_ser_table_status(ind, 2, status[1], status[2])
self.write_plain_text_edit(f'{port_name}{status[0]}')
self.ui.select_ser_channel_single(port_name, 0)
if ind_net is not None:
self.ui.update_net_table_status(ind_net, 2, status[1], status[2])
self.write_plain_text_edit(f'{port_name}{status[0]}')
self.ui.select_net_channel_single(port_name, 0)
else:
self.write_plain_text_edit(f'{port_name}:查询失败')
def parse_query_value(self, modbus_bytes, buffer):
"""解析查询结果"""
if self.query_modbus_bytes in [
b'\x01\x03\x01\x2C\x00\x01', # 查询环境温度标定
b'\x01\x03\x01\x2D\x00\x01', # 查询激光温度标定
b'\x01\x03\x01\x2E\x00\x01' # 查询浓度标定
]:
status_map = {
b'\x01\x03\x02\x00\x00': ('未标定', '未标定', QColor(255, 255, 255)),
b'\x01\x03\x02\x00\x01': ('标定中', '标定中', QColor(255, 255, 0)),
b'\x01\x03\x02\x00\x02': ('标定成功', '标定成功', QColor(0, 255, 0)),
b'\x01\x03\x02\x00\x03': ('标定失败', '标定失败', QColor(255, 0, 0)),
b'\x01\x03\x02\x00\x04': ('标定失败,浓度太低', '浓度太低', QColor(255, 0, 0)),
b'\x01\x03\x02\x00\x05': ('标定失败,间距太小', '标定间隔太小', QColor(255, 0, 0))
}
for key, value in status_map.items():
if key in buffer:
self.write_plain_text_edit(f'标定识别,查询指令:{key.hex().upper()} ,查询结果{buffer.hex().upper()}')
return value
return None
elif modbus_bytes == b'\x01\x03\x00\x16\x00\x01':
bytes_count = buffer[2]
query_data = buffer[3:3 + bytes_count]
air_tempera = int.from_bytes(query_data, 'big') / 100
return f'环境温度:查询结果:{air_tempera}', f'{air_tempera}', QColor(0, 255, 0)
elif modbus_bytes == b'\x01\x03\x01\x2F\x00\x01':
bytes_count = buffer[2]
query_data = buffer[3:3 + bytes_count]
conc_flag = int.from_bytes(query_data, 'big')
return f'浓度标定:重置结果:{conc_flag}', f'{conc_flag}', QColor(0, 255, 0)
elif modbus_bytes == b'\x01\x03\x00\x64\x00\x01':
bytes_count = buffer[2]
query_data = buffer[3:3 + bytes_count]
conc_calib_flag = int.from_bytes(query_data, 'big')
return f'浓度标定:查询结果:{conc_calib_flag}', f'{conc_calib_flag}', QColor(0, 255, 0)
elif modbus_bytes == b'\x01\x03\x00\x01\x00\x01':
bytes_count = buffer[2]
query_data = buffer[3:3 + bytes_count]
laser_work_tempera = int.from_bytes(query_data, 'big') / 100
return f'激光温度:查询结果:{laser_work_tempera}', f'{laser_work_tempera}', QColor(0, 255, 0)
elif modbus_bytes == b'\x01\x03\x00\x17\x00\x01':
bytes_count = buffer[2]
query_data = buffer[3:3 + bytes_count]
laser_work_tempera = int.from_bytes(query_data, 'big')
return f'激光温度:查询结果:{laser_work_tempera}', f'{laser_work_tempera}', QColor(0, 255, 0)
else:
return None
def start_query_timer(self):
"""启动查询定时器"""
if self.query_timer.isActive():
self.query_timer.stop()
self.query_timer.start(self.QUERY_INTERVAL)
self.write_plain_text_edit('timer启动', log_level='debug')
def stop_query(self):
"""停止查询定时器"""
if self.query_timer and self.query_timer.isActive():
self.query_timer.stop()
self.write_plain_text_edit('timer已经关闭')
return 'timer已经关闭'
def reset_port_colors(self):
"""重置串口通道颜色"""
for port_name in self.serial_manager.buffers.keys():
ind = self.ui.find_port_ser_row(port_name)
if ind is not None:
self.ui.reset_ser_port_color(ind)
selected_channels = self.ui.find_selected_net_ports()
for channel_name in selected_channels:
ind = self.ui.find_port_net_row(channel_name)
if ind is not None:
self.ui.reset_net_port_color(ind)
def set_cursor_to_end(self):
textCursor = self.ui.plainTextEditStastusCalib.textCursor()
textCursor.movePosition(textCursor.End)
self.ui.plainTextEditStastusCalib.setTextCursor(textCursor)