import serial import serial.tools.list_ports class SerialManager: def __init__(self, buffers): self.serials = {} self.buffers = buffers def open_port(self, port_name, baudrate=9600,bytesize = 8,parity = 'N',stopbits = 1,flowcontrol = 'None'): """打开串口""" try: if port_name in self.serials and self.serials[port_name].is_open: print(f"串口 {port_name} 已经打开,正在关闭...") self.serials[port_name].close() ser = serial.Serial(port_name, baudrate, bytesize, parity, stopbits, flowcontrol) self.serials[port_name] = ser self.buffers[port_name] = b'' # 初始化缓冲区 print(f"创建串口对象 {port_name} 成功,当前状态: is_open={ser.is_open}") return True except Exception as e: print(f"串口打开失败: {e}") return False def close_port(self, port_name): """关闭串口""" try: if port_name in self.serials and self.serials[port_name].is_open: self.serials[port_name].close() del self.serials[port_name] del self.buffers[port_name] return True except Exception as e: print(f"串口关闭失败: {e}") return False def port_all_close(self): """关闭所有串口""" for port_name in self.serials: self.serials[port_name].close() del self.buffers[port_name] self.serials = {} def reopen_port(self, port_name): """重新打开串口""" try: if port_name in self.serials: self.serials[port_name].close() self.serials[port_name].open() return True else: print(f"串口 {port_name} 不存在或未打开") return False except Exception as e: print(f"串口重新打开失败: {e}") def read_data(self, port_name): """读取串口数据""" try: ser = self.serials[port_name] num = ser.in_waiting if num > 0: data = ser.read(num) if port_name not in self.buffers: self.buffers[port_name] = b'' # 初始化为空字 self.buffers[port_name] += data # 设置 buffer 的最大大小(元素数量) max_buffer_length = 256 # 根据需要调整最大长度 # 计算当前 buffer 的总长度 current_length = len(self.buffers[port_name]) # 检查并限制 buffer 的长度 if current_length > max_buffer_length: # 移除最早添加的字节,直到总长度不超过 max_buffer_length while current_length > max_buffer_length and self.buffers[port_name]: removed_byte = self.buffers[port_name][0:1] self.buffers[port_name] = self.buffers[port_name][1:] current_length -= len(removed_byte) return data except Exception as e: print(f"读取串口数据失败: {e}") return False def write_data(self, port_name, data): """向串口发送数据""" try: ser = self.serials[port_name] ser.write(data) return True except Exception as e: print(f"写入串口数据失败: {e}") return False