324 lines
14 KiB
Python
324 lines
14 KiB
Python
![]() |
import logging
|
|||
|
import time
|
|||
|
from PyQt5.QtCore import QTimer, pyqtSignal, QObject
|
|||
|
from PyQt5.QtGui import QColor
|
|||
|
|
|||
|
|
|||
|
class CalibrationManager(QObject):
|
|||
|
query_finished = pyqtSignal(str, bytes)
|
|||
|
|
|||
|
# 定义常量
|
|||
|
WRITE_SINGLE_REGISTER = 0x06
|
|||
|
READ_HOLDING_REGISTERS = 0x03
|
|||
|
QUERY_INTERVAL = 1000 # 查询间隔时间,单位为毫秒
|
|||
|
|
|||
|
def __init__(self, serial_manager, data_processor, ui):
|
|||
|
super().__init__()
|
|||
|
self.serial_manager = serial_manager
|
|||
|
self.data_processor = data_processor
|
|||
|
self.ui = ui
|
|||
|
|
|||
|
self.num = 0
|
|||
|
self.modbus_data = [0] * 20
|
|||
|
self.query_modbus_bytes = None
|
|||
|
self.query_timer = QTimer(self.ui)
|
|||
|
self.query_timer.timeout.connect(self.check_query_value) # 定时查询
|
|||
|
self.query_finished.connect(self.process_query_value) # 处理数据
|
|||
|
|
|||
|
def write_plain_text_edit(self, text, log_level='info'):
|
|||
|
"""统一处理 plainTextEditStastusCalib 的文本写入和日志记录"""
|
|||
|
self.ui.plainTextEditStastusCalib.appendPlainText(text)
|
|||
|
if log_level == 'info':
|
|||
|
logging.info(text)
|
|||
|
elif log_level == 'error':
|
|||
|
logging.error(text)
|
|||
|
elif log_level == 'debug':
|
|||
|
logging.debug(text)
|
|||
|
|
|||
|
def calibrate_air_tempera(self, temp_str):
|
|||
|
"""开始温度标定"""
|
|||
|
self.write_plain_text_edit('################\r\n环境温度,开始标定')
|
|||
|
if not temp_str.isdigit():
|
|||
|
self.write_plain_text_edit('环境温度输入错误', log_level='error')
|
|||
|
return
|
|||
|
temp_int = int(float(temp_str) * 100)
|
|||
|
temp_hex = temp_int.to_bytes(2, 'big')
|
|||
|
self.write_register(0x01, 0x2C, temp_hex)
|
|||
|
time.sleep(1)
|
|||
|
self.check_air_tempera()
|
|||
|
|
|||
|
def check_air_tempera(self):
|
|||
|
self.reset_port_colors()
|
|||
|
self.write_plain_text_edit('################\r\n空气温度标定状态:')
|
|||
|
self.read_register(0x01, 0x2C, 0x00, 0x01)
|
|||
|
|
|||
|
def calibrate_laser_tempra(self):
|
|||
|
self.write_plain_text_edit('################')
|
|||
|
self.write_plain_text_edit('laser温度标定')
|
|||
|
self.write_register(0x01, 0x2D, b'\x00\x01')
|
|||
|
time.sleep(1)
|
|||
|
self.check_laser_tempra()
|
|||
|
|
|||
|
def stop_calibrate_laser_tempra(self):
|
|||
|
self.write_plain_text_edit('################')
|
|||
|
self.write_register(0x01, 0x2D, b'\x00\x00')
|
|||
|
self.write_plain_text_edit('laser温度标定结束')
|
|||
|
|
|||
|
def check_laser_tempra(self):
|
|||
|
self.reset_port_colors()
|
|||
|
self.write_plain_text_edit('查询中...')
|
|||
|
self.read_register(0x01, 0x2D, 0x00, 0x01)
|
|||
|
|
|||
|
def check_work_tempra(self):
|
|||
|
self.reset_port_colors()
|
|||
|
self.write_plain_text_edit('查询当前激光工作点')
|
|||
|
self.read_register(0x00, 0x01, 0x00, 0x01)
|
|||
|
|
|||
|
def calibrate_laser_tempera_start(self, temp_str):
|
|||
|
self.write_plain_text_edit('################')
|
|||
|
self.write_plain_text_edit('laser温度标定起始点修改')
|
|||
|
if not temp_str.isdigit():
|
|||
|
self.write_plain_text_edit('环境温度输入错误')
|
|||
|
return
|
|||
|
temp_int = int(float(temp_str) * 100)
|
|||
|
temp_hex = temp_int.to_bytes(2, 'big')
|
|||
|
self.write_register(0x00, 0x16, temp_hex)
|
|||
|
time.sleep(1)
|
|||
|
self.check_laser_tempera_start()
|
|||
|
|
|||
|
def check_laser_tempera_start(self):
|
|||
|
self.reset_port_colors()
|
|||
|
self.write_plain_text_edit('################')
|
|||
|
self.write_plain_text_edit('laser温度标定起始点查询')
|
|||
|
self.read_register(0x00, 0x16, 0x00, 0x01)
|
|||
|
|
|||
|
def calibrate_conc_ppm(self, ppm_str):
|
|||
|
self.write_plain_text_edit('################')
|
|||
|
self.write_plain_text_edit('浓度标定')
|
|||
|
if not ppm_str.isdigit():
|
|||
|
self.write_plain_text_edit('浓度输入错误')
|
|||
|
return
|
|||
|
ppm_int = int(float(ppm_str))
|
|||
|
ppm_hex = ppm_int.to_bytes(2, 'big')
|
|||
|
self.write_register(0x01, 0x2E, ppm_hex)
|
|||
|
time.sleep(1)
|
|||
|
self.check_conc_ppm()
|
|||
|
|
|||
|
def check_conc_ppm(self):
|
|||
|
self.reset_port_colors()
|
|||
|
self.write_plain_text_edit('################')
|
|||
|
self.write_plain_text_edit('浓度标定,查询')
|
|||
|
self.read_register(0x01, 0x2E, 0x00, 0x01)
|
|||
|
|
|||
|
def calibrate_reset_conc_ppm(self):
|
|||
|
self.write_plain_text_edit('################')
|
|||
|
self.write_plain_text_edit('浓度标定,重置')
|
|||
|
self.write_register(0x01, 0x2F, b'\x00\x00')
|
|||
|
time.sleep(1)
|
|||
|
self.check_reset_conc_ppm()
|
|||
|
|
|||
|
def check_reset_conc_ppm(self):
|
|||
|
self.reset_port_colors()
|
|||
|
self.write_plain_text_edit('################')
|
|||
|
self.write_plain_text_edit('浓度标定,重置')
|
|||
|
self.read_register(0x00, 0x64, 0x00, 0x01)
|
|||
|
|
|||
|
def set_out_hex_length(self,length_str):
|
|||
|
"""设置输出的hex长度"""
|
|||
|
self.write_plain_text_edit('设置输出的hex长度')
|
|||
|
if not length_str.isdigit():
|
|||
|
self.write_plain_text_edit('参数输入错误')
|
|||
|
return
|
|||
|
length_int = int(float(length_str))
|
|||
|
length_hex = length_int.to_bytes(2, 'big')
|
|||
|
self.write_register(0x00, 0x17, length_hex)
|
|||
|
time.sleep(1)
|
|||
|
self.get_out_hex_length()
|
|||
|
def get_out_hex_length(self):
|
|||
|
self.reset_port_colors()
|
|||
|
self.write_plain_text_edit('查询输出的hex长度')
|
|||
|
self.read_register(0x00, 0x17, 0x00, 0x01)
|
|||
|
|
|||
|
|
|||
|
def write_register(self, address_hi, address_lo, value):
|
|||
|
"""写入寄存器"""
|
|||
|
self.modbus_data[0:6] = [0x01, self.WRITE_SINGLE_REGISTER, address_hi, address_lo, value[0],
|
|||
|
value[1]]
|
|||
|
self.send_bytes(bytes(self.modbus_data[:6]))
|
|||
|
|
|||
|
def read_register(self, address_hi, address_lo, add_num_hi, add_num_lo):
|
|||
|
"""读取寄存器"""
|
|||
|
self.modbus_data[0:6] = [0x01, self.READ_HOLDING_REGISTERS, address_hi, address_lo, add_num_hi, add_num_lo]
|
|||
|
self.query_modbus_bytes = bytes(self.modbus_data[:6])
|
|||
|
|
|||
|
self.send_bytes(self.query_modbus_bytes)
|
|||
|
self.write_plain_text_edit('查询中...')
|
|||
|
self.num = 0
|
|||
|
self.start_query_timer()
|
|||
|
|
|||
|
def send_bytes(self, modbus_bytes):
|
|||
|
crc = self.data_processor.crc_rtu(modbus_bytes)
|
|||
|
modbus_bytes = modbus_bytes + crc
|
|||
|
modbus_send_str = self.data_processor.format_bytes_to_hexstr_space(modbus_bytes)
|
|||
|
self.ui.data_send(modbus_send_str)
|
|||
|
|
|||
|
def validate_response(self, buffer):
|
|||
|
"""验证响应"""
|
|||
|
self.write_plain_text_edit(f'开始捕获:{buffer.hex().upper()}')
|
|||
|
start_index = buffer.find(b'\x01\x03')
|
|||
|
if start_index != -1 and len(buffer) >= start_index + 5:
|
|||
|
self.write_plain_text_edit(f'捕获成功,开始验证:{buffer.hex().upper()}')
|
|||
|
byte_count = buffer[start_index + 2]
|
|||
|
frame_length = start_index + 5 + byte_count
|
|||
|
if len(buffer) >= frame_length:
|
|||
|
crc = self.data_processor.crc_rtu(buffer[start_index:frame_length - 2])
|
|||
|
if buffer[frame_length - 2:frame_length] == crc:
|
|||
|
self.write_plain_text_edit('验证成功')
|
|||
|
return buffer[start_index:frame_length]
|
|||
|
else:
|
|||
|
self.write_plain_text_edit('验证失败')
|
|||
|
self.write_plain_text_edit('捕获失败')
|
|||
|
return False
|
|||
|
|
|||
|
def check_query_value(self):
|
|||
|
self.num += 1
|
|||
|
self.write_plain_text_edit(f'执行查询次数:{self.num}', log_level='debug')
|
|||
|
if self.num > 10:
|
|||
|
self.num = 0
|
|||
|
timer_status = self.stop_query()
|
|||
|
self.write_plain_text_edit('查询超时')
|
|||
|
self.set_cursor_to_end()
|
|||
|
return False
|
|||
|
|
|||
|
channel_ports = self.ui.find_selected_ser_ports()
|
|||
|
channel_nets = self.ui.find_selected_net_ports()
|
|||
|
if channel_ports is None:
|
|||
|
channel_ports = []
|
|||
|
if channel_nets is None:
|
|||
|
channel_nets = []
|
|||
|
|
|||
|
if len(channel_ports) == 0 and len(channel_nets) == 0:
|
|||
|
self.stop_query()
|
|||
|
self.write_plain_text_edit('查询结束,在通道处查看结果')
|
|||
|
# 如果只有一个通道,重新勾选
|
|||
|
channel_all_nets = self.ui.find_all_net_ports()
|
|||
|
channel_all_ports = self.ui.find_all_ser_ports()
|
|||
|
if len(channel_all_nets) == 1:
|
|||
|
self.ui.select_net_channel_single(channel_all_nets[0], 2)
|
|||
|
if len(channel_all_ports) == 1:
|
|||
|
self.ui.select_ser_channel_single(channel_all_ports[0], 2)
|
|||
|
|
|||
|
return False
|
|||
|
|
|||
|
for port_name, buffer in self.ui.buffers.items():
|
|||
|
if port_name in channel_ports or port_name in channel_nets:
|
|||
|
buffer_valid = self.validate_response(buffer)
|
|||
|
if buffer_valid:
|
|||
|
self.write_plain_text_edit('发射信号')
|
|||
|
self.query_finished.emit(port_name, buffer_valid)
|
|||
|
self.ui.buffers[port_name] = b''
|
|||
|
|
|||
|
|
|||
|
def process_query_value(self, port_name, buffer):
|
|||
|
"""处理查询数据"""
|
|||
|
self.write_plain_text_edit(f'数据通道:{port_name},数据: {buffer.hex().upper()}', log_level='debug')
|
|||
|
if len(buffer) > 6:
|
|||
|
status = self.parse_query_value(self.query_modbus_bytes, buffer)
|
|||
|
ind = self.ui.find_port_ser_row(port_name)
|
|||
|
ind_net = self.ui.find_port_net_row(port_name)
|
|||
|
|
|||
|
if status:
|
|||
|
if ind is not None:
|
|||
|
self.ui.update_ser_table_status(ind, 2, status[1], status[2])
|
|||
|
self.write_plain_text_edit(f'{port_name}:{status[0]}')
|
|||
|
self.ui.select_ser_channel_single(port_name, 0)
|
|||
|
if ind_net is not None:
|
|||
|
self.ui.update_net_table_status(ind_net, 2, status[1], status[2])
|
|||
|
self.write_plain_text_edit(f'{port_name}:{status[0]}')
|
|||
|
self.ui.select_net_channel_single(port_name, 0)
|
|||
|
else:
|
|||
|
self.write_plain_text_edit(f'{port_name}:查询失败')
|
|||
|
|
|||
|
def parse_query_value(self, modbus_bytes, buffer):
|
|||
|
"""解析查询结果"""
|
|||
|
if self.query_modbus_bytes in [
|
|||
|
b'\x01\x03\x01\x2C\x00\x01', # 查询环境温度标定
|
|||
|
b'\x01\x03\x01\x2D\x00\x01', # 查询激光温度标定
|
|||
|
b'\x01\x03\x01\x2E\x00\x01' # 查询浓度标定
|
|||
|
]:
|
|||
|
status_map = {
|
|||
|
b'\x01\x03\x02\x00\x00': ('未标定', '未标定', QColor(255, 255, 255)),
|
|||
|
b'\x01\x03\x02\x00\x01': ('标定中', '标定中', QColor(255, 255, 0)),
|
|||
|
b'\x01\x03\x02\x00\x02': ('标定成功', '标定成功', QColor(0, 255, 0)),
|
|||
|
b'\x01\x03\x02\x00\x03': ('标定失败', '标定失败', QColor(255, 0, 0)),
|
|||
|
b'\x01\x03\x02\x00\x04': ('标定失败,浓度太低', '浓度太低', QColor(255, 0, 0)),
|
|||
|
b'\x01\x03\x02\x00\x05': ('标定失败,间距太小', '标定间隔太小', QColor(255, 0, 0))
|
|||
|
}
|
|||
|
for key, value in status_map.items():
|
|||
|
if key in buffer:
|
|||
|
self.write_plain_text_edit(f'标定识别,查询指令:{key.hex().upper()} ,查询结果{buffer.hex().upper()}')
|
|||
|
return value
|
|||
|
|
|||
|
return None
|
|||
|
|
|||
|
elif modbus_bytes == b'\x01\x03\x00\x16\x00\x01':
|
|||
|
bytes_count = buffer[2]
|
|||
|
query_data = buffer[3:3 + bytes_count]
|
|||
|
air_tempera = int.from_bytes(query_data, 'big') / 100
|
|||
|
return f'环境温度:查询结果:{air_tempera}', f'{air_tempera}', QColor(0, 255, 0)
|
|||
|
elif modbus_bytes == b'\x01\x03\x01\x2F\x00\x01':
|
|||
|
bytes_count = buffer[2]
|
|||
|
query_data = buffer[3:3 + bytes_count]
|
|||
|
conc_flag = int.from_bytes(query_data, 'big')
|
|||
|
return f'浓度标定:重置结果:{conc_flag}', f'{conc_flag}', QColor(0, 255, 0)
|
|||
|
elif modbus_bytes == b'\x01\x03\x00\x64\x00\x01':
|
|||
|
bytes_count = buffer[2]
|
|||
|
query_data = buffer[3:3 + bytes_count]
|
|||
|
conc_calib_flag = int.from_bytes(query_data, 'big')
|
|||
|
return f'浓度标定:查询结果:{conc_calib_flag}', f'{conc_calib_flag}', QColor(0, 255, 0)
|
|||
|
elif modbus_bytes == b'\x01\x03\x00\x01\x00\x01':
|
|||
|
bytes_count = buffer[2]
|
|||
|
query_data = buffer[3:3 + bytes_count]
|
|||
|
laser_work_tempera = int.from_bytes(query_data, 'big') / 100
|
|||
|
return f'激光温度:查询结果:{laser_work_tempera}', f'{laser_work_tempera}', QColor(0, 255, 0)
|
|||
|
elif modbus_bytes == b'\x01\x03\x00\x17\x00\x01':
|
|||
|
bytes_count = buffer[2]
|
|||
|
query_data = buffer[3:3 + bytes_count]
|
|||
|
laser_work_tempera = int.from_bytes(query_data, 'big')
|
|||
|
return f'激光温度:查询结果:{laser_work_tempera}', f'{laser_work_tempera}', QColor(0, 255, 0)
|
|||
|
|
|||
|
else:
|
|||
|
return None
|
|||
|
|
|||
|
def start_query_timer(self):
|
|||
|
"""启动查询定时器"""
|
|||
|
if self.query_timer.isActive():
|
|||
|
self.query_timer.stop()
|
|||
|
self.query_timer.start(self.QUERY_INTERVAL)
|
|||
|
self.write_plain_text_edit('timer启动', log_level='debug')
|
|||
|
|
|||
|
def stop_query(self):
|
|||
|
"""停止查询定时器"""
|
|||
|
if self.query_timer and self.query_timer.isActive():
|
|||
|
self.query_timer.stop()
|
|||
|
self.write_plain_text_edit('timer已经关闭')
|
|||
|
return 'timer已经关闭'
|
|||
|
|
|||
|
def reset_port_colors(self):
|
|||
|
"""重置串口通道颜色"""
|
|||
|
for port_name in self.serial_manager.buffers.keys():
|
|||
|
ind = self.ui.find_port_ser_row(port_name)
|
|||
|
if ind is not None:
|
|||
|
self.ui.reset_ser_port_color(ind)
|
|||
|
|
|||
|
selected_channels = self.ui.find_selected_net_ports()
|
|||
|
for channel_name in selected_channels:
|
|||
|
ind = self.ui.find_port_net_row(channel_name)
|
|||
|
if ind is not None:
|
|||
|
self.ui.reset_net_port_color(ind)
|
|||
|
|
|||
|
def set_cursor_to_end(self):
|
|||
|
textCursor = self.ui.plainTextEditStastusCalib.textCursor()
|
|||
|
textCursor.movePosition(textCursor.End)
|
|||
|
self.ui.plainTextEditStastusCalib.setTextCursor(textCursor)
|