import logging import socket class NetManager: def __init__(self, buffers): self.net_connections = {} self.buffers = buffers def open_net(self, net_name): net_name_split = net_name.split(':') udp_addr = (net_name_split[0], int(net_name_split[1])) try: if net_name in self.net_connections: self.close_net(net_name) self.net_connections[net_name] = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.net_connections[net_name].bind(udp_addr) # 设置套接字为非阻塞模式 self.net_connections[net_name].setblocking(False) logging.info(f'{net_name}开始监听') return True except Exception as e: logging.error(f'{net_name}打开失败') logging.error(e) return False def close_net(self, net_name): try: if net_name in self.net_connections: self.net_connections[net_name].close() del self.net_connections[net_name] del self.buffers[net_name] logging.info(f'{net_name}已关闭') return True except Exception as e: logging.error(f'{net_name}关闭失败') logging.error(e) return False def net_all_close(self): for net_name in self.net_connections: self.net_connections[net_name].close() logging.info(f'{net_name}已关闭') def read_data(self, net_name): try: if net_name in self.net_connections: data = self.net_connections[net_name].recvfrom(1024) channel_name = data[1][0] + ':' + str(data[1][1]) if channel_name not in self.buffers: self.buffers[channel_name] = b'' # 初始化为空字 self.buffers[channel_name] += data[0] # 设置 buffer 的最大大小(元素数量) max_buffer_length = 256 # 根据需要调整最大长度 # 计算当前 buffer 的总长度 current_length = len(self.buffers[channel_name]) # 检查并限制 buffer 的长度 if current_length > max_buffer_length: # 移除最早添加的字节,直到总长度不超过 max_buffer_length while current_length > max_buffer_length and self.buffers[channel_name]: removed_byte = self.buffers[channel_name][0:1] self.buffers[channel_name] = self.buffers[channel_name][1:] current_length -= len(removed_byte) return data else: return '' except Exception as e: # logging.error(f'{net_name}读取失败') # logging.error(e) return '' def write_data(self, data, channel_name): for net_name in self.net_connections: channel_name_split = channel_name.split(':') udp_addr = (channel_name_split[0], int(channel_name_split[1])) try: if net_name in self.net_connections: # 发送前清除一下buffer self.buffers[channel_name] = b'' self.net_connections[net_name].sendto(data, udp_addr) logging.info(f'{net_name}写入成功') return True else: return False except Exception as e: logging.error(f'{net_name}写入失败') logging.error(e) return False