代码简化

Signed-off-by: STT
This commit is contained in:
STT 2021-12-01 10:25:14 +08:00
parent c223291005
commit 85704aead2
1 changed files with 39 additions and 77 deletions

View File

@ -4,8 +4,6 @@ import struct
from PySide2.QtGui import QPolygon
import socket
from SelfBuiltModul.func import to_signed_number
class GroundControlStation:
def __init__(self, com):
@ -14,7 +12,6 @@ class GroundControlStation:
'desired_latitude': 0.0, 'desired_longitude': 0.0, 'rudder': 0, 'thrust': 0, 'ignition': 0,
'buffer_err': 0}
self.gcs = serial.Serial(com, 57600, timeout=0, write_timeout=0)
self.buffer = []
self.errors = 0
@ -32,6 +29,7 @@ class GroundControlStation:
packet_data_length = self.buffer[2]
packet_length = packet_data_length + 5
if len(self.buffer) < packet_length:
break
if (self.buffer[3] | self.buffer[4] << 8) != calculate_crc16_ccitt(self.buffer[5:], packet_data_length):
@ -43,55 +41,44 @@ class GroundControlStation:
packet_data = self.buffer[5:packet_length]
if packet_id == 0 and packet_data_length == 8: # 心跳包
print('心跳包'+str(time.time()))
print('心跳包' + str(time.time()))
self.heart_beat['timestamp'] = time.time()
self.heart_beat['buffer_err'] = self.errors
del self.buffer[:packet_length]
continue
elif (packet_data[0] | packet_data[1] << 8) != usv.control.pid['id']: # 数据过滤
print('数据过滤')
del self.buffer[:packet_length]
continue
elif packet_id == 1 and packet_data_length == 40: # Command
print('命令包')
command = struct.unpack('<Bffddhhb', bytes(packet_data[10:packet_data_length]))
self.command['timestamp'] = time.time()
self.command['setting'] = packet_data[10]
self.command['desired_heading'] = packet_data[11] | packet_data[12] << 8 | packet_data[13] << 16 | \
packet_data[14] << 24
self.command['desired_speed'] = packet_data[15] | packet_data[16] << 8 | packet_data[17] << 16 | \
packet_data[18] << 24
self.command['desired_latitude'] = packet_data[19] | packet_data[20] << 8 | packet_data[21] << 16 | \
packet_data[22] << 24 | packet_data[23] << 32 | packet_data[
24] << 40 | packet_data[25] << 48 | packet_data[26] << 56
self.command['desired_longitude'] = packet_data[27] | packet_data[28] << 8 | packet_data[29] << 16 | \
packet_data[30] << 24 | packet_data[31] << 32 | packet_data[
32] << 40 | packet_data[33] << 48 | packet_data[34] << 56
self.command['desired_rudder'] = to_signed_number(packet_data[35] | packet_data[36] << 8, 2)
self.command['desired_thrust'] = to_signed_number(packet_data[37] | packet_data[38] << 8, 2)
self.command['ignition'] = packet_data[39]
self.command['setting'] = command[0]
self.command['desired_heading'] = command[1]
self.command['desired_speed'] = command[2]
self.command['desired_latitude'] = command[3]
self.command['desired_longitude'] = command[4]
self.command['desired_rudder'] = command[5]
self.command['desired_thrust'] = command[6]
self.command['ignition'] = command[7]
del self.buffer[:packet_length]
continue
elif packet_id == 2 and packet_data_length == 10: # 参数请求
print('参数请求包')
send_packet_list = [0, 17, 46] # 参数包id=17包长46
send_packet_data_list = [usv.control.pid['id'], time.time(), usv.control.pid['heading_p'],
usv.control.pid['heading_i'], usv.control.pid['heading_d'],
usv.control.pid['speed_p'], usv.control.pid['speed_i'],
usv.control.pid['speed_d'], usv.control.pid['position_p'],
usv.control.pid['position_i'], usv.control.pid['position_d'], ]
data_bytes = struct.pack('<Hdfffffffff', send_packet_data_list[0], send_packet_data_list[1],
send_packet_data_list[2], send_packet_data_list[3], send_packet_data_list[4],
send_packet_data_list[5], send_packet_data_list[6], send_packet_data_list[7],
send_packet_data_list[8], send_packet_data_list[9], send_packet_data_list[10])
data_bytes = struct.pack('<Hdfffffffff', usv.control.pid['id'], time.time(),
usv.control.pid['heading_p'], usv.control.pid['heading_i'],
usv.control.pid['heading_d'], usv.control.pid['speed_p'],
usv.control.pid['speed_i'], usv.control.pid['speed_d'],
usv.control.pid['position_p'], usv.control.pid['position_i'],
usv.control.pid['position_d'])
crc16 = calculate_crc16_ccitt(data_bytes, len(data_bytes))
send_packet_list.append(crc16 & 0xff)
send_packet_list.append(crc16 // 0x100)
send_packet_list[0] = calculate_header_lrc(send_packet_list)
head_bytes = struct.pack('<BBBBB', send_packet_list[0], send_packet_list[1],
send_packet_list[2], send_packet_list[3], send_packet_list[4], )
send_data = head_bytes + data_bytes
header_bytes = calculate_header_bytes(17, 46, crc16) # 参数包id=17包长46
send_data = header_bytes + data_bytes
self.gcs.write(send_data)
del self.buffer[:packet_length]
continue
@ -109,23 +96,15 @@ class GroundControlStation:
usv.control.pid['position_p'] = pid[6]
usv.control.pid['position_i'] = pid[7]
usv.control.pid['position_d'] = pid[8]
# 设置回应
send_packet_list = [0, 18, 11] # 回应包id=18包长11
send_packet_data_list = [usv.control.pid['id'], time.time(), 0xff]
data_bytes = struct.pack('<HdB', send_packet_data_list[0], send_packet_data_list[1],
send_packet_data_list[2])
data_bytes = struct.pack('<HdB', usv.control.pid['id'], time.time(), 0xff)
crc16 = calculate_crc16_ccitt(data_bytes, len(data_bytes))
send_packet_list.append(crc16 & 0xff)
send_packet_list.append(crc16 // 0x100)
send_packet_list[0] = calculate_header_lrc(send_packet_list)
head_bytes = struct.pack('<BBBBB', send_packet_list[0], send_packet_list[1],
send_packet_list[2], send_packet_list[3], send_packet_list[4], )
send_data = head_bytes + data_bytes
header_bytes = calculate_header_bytes(18, 11, crc16) # 回应包id=18包长11
send_data = header_bytes + data_bytes
self.gcs.write(send_data)
del self.buffer[:packet_length]
continue
elif packet_id == 4:
print('路点包')
data_type = packet_data[10]
@ -136,26 +115,17 @@ class GroundControlStation:
if packet_data_length == 11 and data_type == 2:
pass
# 返回成功信号
send_packet_list = [0, 19, 11] # 回应包id=19包长11
send_packet_data_list = [usv.control.pid['id'], time.time(), 0xff]
data_bytes = struct.pack('<HdB', send_packet_data_list[0], send_packet_data_list[1],
send_packet_data_list[2])
data_bytes = struct.pack('<HdB', usv.control.pid['id'], time.time(), 0xff)
crc16 = calculate_crc16_ccitt(data_bytes, len(data_bytes))
send_packet_list.append(crc16 & 0xff)
send_packet_list.append(crc16 // 0x100)
send_packet_list[0] = calculate_header_lrc(send_packet_list)
head_bytes = struct.pack('<BBBBB', send_packet_list[0], send_packet_list[1],
send_packet_list[2], send_packet_list[3], send_packet_list[4], )
send_data = head_bytes + data_bytes
header_bytes = calculate_header_bytes(19, 11, crc16) # 回应包id=19包长11
send_data = header_bytes + data_bytes
self.gcs.write(send_data)
del self.buffer[:packet_length]
continue
def send_status(self, usv):
# print('发送状态包')
send_packet_list = [0, 16, 100] # 回应包id=16包长100
send_packet_data_list = [usv.control.pid['id'], time.time(), usv.control.status,
data_bytes = struct.pack('<HdBdddffffffffffffHHffhhb', usv.control.pid['id'], time.time(), usv.control.status,
usv.navigation.data['location']['latitude'],
usv.navigation.data['location']['longitude'],
usv.navigation.data['location']['altitude'], usv.navigation.data['posture']['roll'],
@ -166,27 +136,19 @@ class GroundControlStation:
usv.navigation.data['accelerometer']['X'], usv.navigation.data['accelerometer']['Y'],
usv.navigation.data['accelerometer']['Z'], usv.navigation.data['system_status'],
usv.navigation.data['filter_status'], usv.control.depth, usv.control.battery,
usv.control.data['rudder'], usv.control.data['thrust'], usv.control.data['ignition']]
data_bytes = struct.pack('<HdBdddffffffffffffHHffhhb', send_packet_data_list[0], send_packet_data_list[1],
send_packet_data_list[2], send_packet_data_list[3], send_packet_data_list[4],
send_packet_data_list[5], send_packet_data_list[6], send_packet_data_list[7],
send_packet_data_list[8], send_packet_data_list[9], send_packet_data_list[10],
send_packet_data_list[11], send_packet_data_list[12], send_packet_data_list[13],
send_packet_data_list[14], send_packet_data_list[15], send_packet_data_list[16],
send_packet_data_list[17], send_packet_data_list[18], send_packet_data_list[19],
send_packet_data_list[20], send_packet_data_list[21], send_packet_data_list[22],
send_packet_data_list[23], send_packet_data_list[24])
usv.control.data['rudder'], usv.control.data['thrust'], usv.control.data['ignition'])
crc16 = calculate_crc16_ccitt(data_bytes, len(data_bytes))
send_packet_list.append(crc16 & 0xff)
send_packet_list.append(crc16 // 0x100)
send_packet_list[0] = calculate_header_lrc(send_packet_list)
head_bytes = struct.pack('<BBBBB', send_packet_list[0], send_packet_list[1],
send_packet_list[2], send_packet_list[3], send_packet_list[4], )
send_data = head_bytes + data_bytes
header_bytes = calculate_header_bytes(16, 100, crc16) # 回应包id=16包长100
send_data = header_bytes + data_bytes
self.gcs.write(send_data)
def calculate_header_bytes(id, length, crc16):
header_list = [0, id, length, crc16 & 0xff, crc16 // 0x100]
header_list[0] = calculate_header_lrc(header_list)
return struct.pack('<BBBBB', header_list[0], header_list[1], header_list[2], header_list[3], header_list[4])
def calculate_header_lrc(data_list):
return ((((data_list[1] + data_list[2] + data_list[3] + data_list[4]) & 0xff) ^ 0xff) + 1) & 0xff