python/PyMaiHak/PyHex.py

153 lines
5.7 KiB
Python
Raw Normal View History

2024-11-20 13:04:28 +08:00
# -*- coding: utf-8 -*-
"""
Created on Thu Sep 26 13:44:27 2024
@author: WANGXIBAO
"""
from PyQt5 import QtWidgets,QtGui
import sys,socket, os,datetime
from configparser import ConfigParser
from PyHexUI import Ui_Form
from PyQt5.QtCore import QTimer
class PyHexQT(QtWidgets.QWidget,Ui_Form):
def __init__(self):
super(PyHexQT, self).__init__()
self.setupUi(self)
self.udp = "pyHex.ini" # 配置文件默认路径
# 检测配置文件
self.CheckCfgIniData()
# 从配置文件中读取IP和端口
config = ConfigParser()
config.read(self.udp, encoding='utf-8')
self.IP = config.get('NetConfig', 'IP')
self.port = config.get('NetConfig', 'port')
self.bytes_start = int(config.get('NetConfig', 'bytes_start'))
self.bytes_length = int(config.get('NetConfig', 'bytes_length'))
self.num_scale = int(config.get('NetConfig', 'num_scale'))
self.log_time = int(config.get('NetConfig', 'log_time'))
self.log_count = 0
self.log_buffer = []
# 定时器接收数据
self.timer = QTimer()
self.timer.timeout.connect(self.data_receive)
self.main()
#%% 重写关闭按钮
def closeEvent(self, event):
#关闭界面前保存快捷区域的命令和名称
if hasattr(self, 'udp_socket'):
self.timer.stop()
self.udp_socket.close()
print("关闭网络连接")
# 调用父类的关闭事件处理函数
super().closeEvent(event)
def main(self):
udp_addr = (self.IP, int(self.port))
self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
print("绑定地址", udp_addr)
# 绑定端口
try:
self.udp_socket.bind(udp_addr)
print("开始监听。。。")
except Exception as e:
print(f"Error reading configuration: {e}")
return None
# 等待接收对方发送的数据
# 打开串口接收定时器周期为1ms
self.timer.start(20)
def data_receive(self):
try:
recv_data = self.udp_socket.recvfrom(1024) # 1024表示本次接收的最大字节数
# 打印接收到的数据
line_utf8 = recv_data[0].hex()
out_s = ''
out_s = ' '.join(['{:02X}'.format(b) for b in bytes.fromhex(line_utf8)])
# 获取到text光标
textCursor = self.textEdit.textCursor()
# 滚动到底部
textCursor.movePosition(textCursor.End)
# 设置光标到text中去
self.textEdit.setTextCursor(textCursor)
self.textEdit.insertPlainText(out_s)
self.textEdit.insertPlainText('\r\n')
print("[From %s:%d]:%s" % (recv_data[1][0], recv_data[1][1], out_s))
sav_str = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")[:23] +" " +out_s +"\r\n"
#将数据加入缓存区域
self.log_buffer.append(sav_str)
self.log_count +=1
#print(self.log_count)
if self.log_count>self.log_time:
self.data_save()
#数据转化
num_hex_str = line_utf8[self.bytes_start*2:(self.bytes_start*2 +self.bytes_length*2 )]
num_out = int(num_hex_str,16)/self.num_scale
self.textBrowser.clear()
# 设置字体和颜色
char_format = QtGui.QTextCharFormat()
char_format.setFont(QtGui.QFont("SimSun", 28)) # 设置字体为 SimSun字号为 28
char_format.setForeground(QtGui.QBrush(QtGui.QColor("red"))) # 设置文字颜色为红色
# 创建 QTextCursor 对象
cursor = self.textBrowser.textCursor() # 插入带有格式的文本
cursor.insertText(str(num_out), char_format)
except socket.error as e:
#print(f"Socket error: {e}")
pass
def data_save(self):
sav_name = datetime.datetime.now().strftime("%Y-%m-%d")+'.log'
with open(sav_name, mode='a', newline='',encoding='utf-8', errors='replace') as file:
file.writelines(self.log_buffer)
self.log_buffer.clear()
self.log_count = 0
def CheckCfgIniData(self):
if not os.path.exists(self.udp):
config = ConfigParser()
config.add_section('NetConfig')
config.set('NetConfig', 'IP', '127.0.0.1')
config.set('NetConfig', 'port', '9000')
config.set('NetConfig', 'bytes_start', '3')
config.set('NetConfig', 'bytes_length', '4')
config.set('NetConfig', 'num_scale', '1000')
config.set('NetConfig', 'log_time', '1')
with open(self.udp, 'w', encoding='utf-8') as f:
config.write(f)
config = ConfigParser()
try:
config.read(self.udp, encoding='utf-8')
IP = config.get('NetConfig', 'IP')
port = config.get('NetConfig', 'port')
print(f"Configuration read successfully: {IP}, {port}")
bytes_start = int(config.get('NetConfig', 'bytes_start'))
bytes_length = int(config.get('NetConfig', 'bytes_length'))
num_scale = int(config.get('NetConfig', 'num_scale'))
except Exception as e:
print(f"Error reading configuration: {e}")
# if __name__ == '__main__':
# main()
# Ui_Form()
if __name__ == '__main__':
app = QtWidgets.QApplication(sys.argv)
myshow = PyHexQT()
myshow.show()
sys.exit(app.exec_())