153 lines
5.7 KiB
Python
153 lines
5.7 KiB
Python
![]() |
# -*- coding: utf-8 -*-
|
|||
|
"""
|
|||
|
Created on Thu Sep 26 13:44:27 2024
|
|||
|
|
|||
|
@author: WANGXIBAO
|
|||
|
"""
|
|||
|
from PyQt5 import QtWidgets,QtGui
|
|||
|
import sys,socket, os,datetime
|
|||
|
from configparser import ConfigParser
|
|||
|
from PyHexUI import Ui_Form
|
|||
|
from PyQt5.QtCore import QTimer
|
|||
|
|
|||
|
class PyHexQT(QtWidgets.QWidget,Ui_Form):
|
|||
|
def __init__(self):
|
|||
|
super(PyHexQT, self).__init__()
|
|||
|
self.setupUi(self)
|
|||
|
self.udp = "pyHex.ini" # 配置文件默认路径
|
|||
|
# 检测配置文件
|
|||
|
self.CheckCfgIniData()
|
|||
|
|
|||
|
# 从配置文件中读取IP和端口
|
|||
|
config = ConfigParser()
|
|||
|
config.read(self.udp, encoding='utf-8')
|
|||
|
self.IP = config.get('NetConfig', 'IP')
|
|||
|
self.port = config.get('NetConfig', 'port')
|
|||
|
self.bytes_start = int(config.get('NetConfig', 'bytes_start'))
|
|||
|
self.bytes_length = int(config.get('NetConfig', 'bytes_length'))
|
|||
|
self.num_scale = int(config.get('NetConfig', 'num_scale'))
|
|||
|
self.log_time = int(config.get('NetConfig', 'log_time'))
|
|||
|
self.log_count = 0
|
|||
|
self.log_buffer = []
|
|||
|
# 定时器接收数据
|
|||
|
self.timer = QTimer()
|
|||
|
self.timer.timeout.connect(self.data_receive)
|
|||
|
|
|||
|
|
|||
|
self.main()
|
|||
|
|
|||
|
#%% 重写关闭按钮
|
|||
|
def closeEvent(self, event):
|
|||
|
#关闭界面前保存快捷区域的命令和名称
|
|||
|
if hasattr(self, 'udp_socket'):
|
|||
|
self.timer.stop()
|
|||
|
self.udp_socket.close()
|
|||
|
print("关闭网络连接")
|
|||
|
# 调用父类的关闭事件处理函数
|
|||
|
super().closeEvent(event)
|
|||
|
|
|||
|
def main(self):
|
|||
|
|
|||
|
udp_addr = (self.IP, int(self.port))
|
|||
|
self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|||
|
print("绑定地址", udp_addr)
|
|||
|
# 绑定端口
|
|||
|
try:
|
|||
|
self.udp_socket.bind(udp_addr)
|
|||
|
print("开始监听。。。")
|
|||
|
except Exception as e:
|
|||
|
print(f"Error reading configuration: {e}")
|
|||
|
return None
|
|||
|
# 等待接收对方发送的数据
|
|||
|
# 打开串口接收定时器,周期为1ms
|
|||
|
self.timer.start(20)
|
|||
|
|
|||
|
|
|||
|
def data_receive(self):
|
|||
|
try:
|
|||
|
recv_data = self.udp_socket.recvfrom(1024) # 1024表示本次接收的最大字节数
|
|||
|
# 打印接收到的数据
|
|||
|
line_utf8 = recv_data[0].hex()
|
|||
|
|
|||
|
out_s = ''
|
|||
|
out_s = ' '.join(['{:02X}'.format(b) for b in bytes.fromhex(line_utf8)])
|
|||
|
|
|||
|
# 获取到text光标
|
|||
|
textCursor = self.textEdit.textCursor()
|
|||
|
# 滚动到底部
|
|||
|
textCursor.movePosition(textCursor.End)
|
|||
|
# 设置光标到text中去
|
|||
|
self.textEdit.setTextCursor(textCursor)
|
|||
|
|
|||
|
self.textEdit.insertPlainText(out_s)
|
|||
|
self.textEdit.insertPlainText('\r\n')
|
|||
|
print("[From %s:%d]:%s" % (recv_data[1][0], recv_data[1][1], out_s))
|
|||
|
|
|||
|
|
|||
|
sav_str = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")[:23] +" " +out_s +"\r\n"
|
|||
|
#将数据加入缓存区域
|
|||
|
self.log_buffer.append(sav_str)
|
|||
|
self.log_count +=1
|
|||
|
#print(self.log_count)
|
|||
|
if self.log_count>self.log_time:
|
|||
|
self.data_save()
|
|||
|
|
|||
|
#数据转化
|
|||
|
|
|||
|
num_hex_str = line_utf8[self.bytes_start*2:(self.bytes_start*2 +self.bytes_length*2 )]
|
|||
|
num_out = int(num_hex_str,16)/self.num_scale
|
|||
|
|
|||
|
self.textBrowser.clear()
|
|||
|
|
|||
|
# 设置字体和颜色
|
|||
|
char_format = QtGui.QTextCharFormat()
|
|||
|
char_format.setFont(QtGui.QFont("SimSun", 28)) # 设置字体为 SimSun,字号为 28
|
|||
|
char_format.setForeground(QtGui.QBrush(QtGui.QColor("red"))) # 设置文字颜色为红色
|
|||
|
# 创建 QTextCursor 对象
|
|||
|
cursor = self.textBrowser.textCursor() # 插入带有格式的文本
|
|||
|
cursor.insertText(str(num_out), char_format)
|
|||
|
except socket.error as e:
|
|||
|
#print(f"Socket error: {e}")
|
|||
|
pass
|
|||
|
|
|||
|
def data_save(self):
|
|||
|
sav_name = datetime.datetime.now().strftime("%Y-%m-%d")+'.log'
|
|||
|
with open(sav_name, mode='a', newline='',encoding='utf-8', errors='replace') as file:
|
|||
|
file.writelines(self.log_buffer)
|
|||
|
self.log_buffer.clear()
|
|||
|
self.log_count = 0
|
|||
|
|
|||
|
def CheckCfgIniData(self):
|
|||
|
if not os.path.exists(self.udp):
|
|||
|
config = ConfigParser()
|
|||
|
config.add_section('NetConfig')
|
|||
|
config.set('NetConfig', 'IP', '127.0.0.1')
|
|||
|
config.set('NetConfig', 'port', '9000')
|
|||
|
config.set('NetConfig', 'bytes_start', '3')
|
|||
|
config.set('NetConfig', 'bytes_length', '4')
|
|||
|
config.set('NetConfig', 'num_scale', '1000')
|
|||
|
config.set('NetConfig', 'log_time', '1')
|
|||
|
with open(self.udp, 'w', encoding='utf-8') as f:
|
|||
|
config.write(f)
|
|||
|
|
|||
|
config = ConfigParser()
|
|||
|
try:
|
|||
|
config.read(self.udp, encoding='utf-8')
|
|||
|
IP = config.get('NetConfig', 'IP')
|
|||
|
port = config.get('NetConfig', 'port')
|
|||
|
print(f"Configuration read successfully: {IP}, {port}")
|
|||
|
bytes_start = int(config.get('NetConfig', 'bytes_start'))
|
|||
|
bytes_length = int(config.get('NetConfig', 'bytes_length'))
|
|||
|
num_scale = int(config.get('NetConfig', 'num_scale'))
|
|||
|
|
|||
|
except Exception as e:
|
|||
|
print(f"Error reading configuration: {e}")
|
|||
|
|
|||
|
# if __name__ == '__main__':
|
|||
|
# main()
|
|||
|
# Ui_Form()
|
|||
|
if __name__ == '__main__':
|
|||
|
app = QtWidgets.QApplication(sys.argv)
|
|||
|
myshow = PyHexQT()
|
|||
|
myshow.show()
|
|||
|
sys.exit(app.exec_())
|