获取角度修正文件
1 概述
角度修正文件是系统标定、点云解算和空间定位的方向基准。为确保测量精度,禾赛对每台激光雷达出厂时进行独立角度标定,每台的标定结果不同,该唯一修正数据记录为“角度修正文件”并随激光雷达提供。
激光雷达连接方式分为以太网连接和串口连接;角度文件格式包括CSV 格式和DAT 格式;具体型号参见表格。
激光雷达型号与连接方式对应关系表:
| 连接方式 | 以太网连接 | 串口连接 |
|---|---|---|
| 产品系列 | Pandar、OT、QT、XT、AT | JT |
| 适用产品 | Pandar40P、Pandar64、Pandar128E3X、OT128、PandarQT、QT128C2X、XT32、XT16、XT32M2X、AT128E2X、AT128P、ATX | JT16 |
激光雷达型号与文件格式对应关系表:
| 文件格式 | CSV 格式 | DAT 格式 |
|---|---|---|
| 产品系列 | Pandar、OT、QT、XT | AT、JT |
| 适用产品 | Pandar40P、Pandar64、Pandar128E3X、OT128、PandarQT、QT128C2X、XT32、XT16、XT32M2X | AT128E2X、AT128P、ATX、JT16 |
2 使用方法
2.1 使用 PandarView 2 客户端
2.2 使用 API 指令
2.2.1 以太网连接方式
CSV 格式
DAT 格式
2.2.2 串口连接方式
串口连接
本文将分别介绍两种方法获取角度修正文件的相应实例,可根据实际情况选择其中一种或多种进行使用。
3 实例介绍
注:以下实例基于 XT32 、AT128P 和 JT16 的用户及指令手册实现,实际各个型号激光雷达的配置可能有所不同,具体内容请参考对应型号的用户手册。
3.1 使用 PandarView 2 客户端获取角度修正文件
1. 访问禾赛官网 下载并安装 PandarView 2 客户端;
2. 将激光雷达连接至电脑主机,确保其能够与主机正常建立通信;(以太网连接方法详见如何配置以太网;串口连接方法详见相关激光雷达用户手册)
3. 打开 PandarView 2 客户端,点击左上角工具栏 [ Listen for Data] 按钮 ,激光雷达连接方式分为以太网连接和串口连接,两种方式配置方式略有不同,请按照激光雷达实际参数对应填写。
注:串口连接时 PandarView 2 版本须大于 2.1.7,弹出页选择 Serial Port 选项并填写 RS485 串口地址与波特率,参考型号:JT16
完成上述参数配置后点击[ OK ] 确认;
4. 点击工具栏 [ Correction ] 按钮,进入 Angle Correction 栏并点击 Export,弹出的窗口中输入文件名并选择保存位置,然后点击确认以完成导出。
注:Ubuntu 系统下保存文件时必须添加正确的文件后缀 (.csv/.dat)
3.2 使用 API 指令获取角度修正文件
3.2.1 以太网连接方式
CSV 格式
参考型号:XT32
1. 本机与连接激光雷达后,使用 IDE 打开下方脚本;
import socket, ssl, pprint, time
import random
import struct
default_host = "192.168.1.201" # Change this to your Lidar IP
default_port = 9347 # Change this to your Lidar PTC Port
class PTC:
def __init__(self, host=default_host, port=default_port):
self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
lp = random.randint(10000, 30000)
self.s.bind(('0.0.0.0', lp))
self.s.settimeout(30)
self.s.connect((host, port))
def closeSocket(self):
self.s.shutdown(1)
# self.s.close()
def ByteToHex(self, h):
return ''.join(["%02x" % x for x in h]).strip()
def read_bytes(self, payload_size):
chunks = []
bytes_received = 0
while bytes_received < payload_size:
chunk = self.s.recv(payload_size - bytes_received)
if chunk == b"":
raise RuntimeError("Socket has been unexpectedly closed")
chunks.append(chunk)
bytes_received = bytes_received + len(chunk)
return b"".join(chunks)
def sender(self, cmd_code, payload):
"""
:param cmd_code:
:param payload: payload should input like '015d010207'
:return:
"""
if cmd_code in [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e',
'f']:
cmd_code = "0" + str(cmd_code)
print("payload :")
print(payload)
if not payload or payload.upper() == "NONE":
payload_len = 0
p = '4774' + str(cmd_code) + "00" + struct.pack('>L', payload_len).hex()
else:
payload_len = len(bytes.fromhex(payload))
p = '4774' + str(cmd_code) + "00" + struct.pack('>L', payload_len).hex() + payload
data = bytes.fromhex(p)
self.s.send(data)
response = self.s.recv(8)
print("response: ")
print(response)
r_cmd = bytes.hex(response[2:3])
r_returnCode = bytes.hex(response[3:4])
if bytes.hex(response[4:8]) == "\x00\x00\x00\x00":
r_length = 0
response_payload = ""
else:
r_length = int(bytes.hex(response[4:8]), 16)
response_payload = self.read_bytes(r_length)
print("command is: %s, get return code: %s, return length: %s, \nreturn string:\n%s" % (
r_cmd, r_returnCode, r_length, response_payload))
final_response = {
"response_command": r_cmd,
"response_return_code": r_returnCode,
"response_payload_length": r_length,
"response_payload": response_payload
}
return final_response
if __name__ == "__main__":
ss = PTC()
print(ss.sender('05', None)) # Example command to 'Get Current Fault Log', change the command code and payload as needed
# save it as a CSV file
response = ss.sender('05', None)
correction_data = response.get("response_payload", b"")
with open("angle_correction.csv", "wb") as f:
f.write(correction_data)
print("The data has been saved as angle_correction.csv.")
ss.closeSocket()
2. 根据实际情况配置本机 Python 环境;在脚本中找到第 5、6 行代码:
default_host = "192.168.1.201" 、default_port = 9347→ 将 "192.168.1.201" 和 9347 修改为激光雷达的实际 IP 地址和端口号,然后运行代码。
运行成功后,将在当前目录下生成名为 angle_correction.csv 的角度文件。
DAT 格式
参考型号:AT128P
1. 本机与连接激光雷达后,使用 IDE 打开下方脚本;
from socket import *
sock = socket(AF_INET,SOCK_STREAM)
message = bytearray()
message1 = bytearray()
SN = bytearray()
# header
message.append(0x47)
message.append(0x74)
# command
##################################################################
message.append(0x05) #0x05 for Get Angle Correction File
##################################################################
# return code
message.append(0x00)
# Tail
message.append(0x00)
message.append(0x00)
message.append(0x00)
message.append(0x00) #0x01 payload data length for the command
message.append(0x00) #payload data value, 0x01 means PTP STATUS
sock.connect(("192.168.1.201",9347))#the ip of device and Port you need to call(fixed)
sock.send(message)
response=sock.recv(8)
r_cmd = int.from_bytes(response[2:3], 'big')
r_returnCode = int.from_bytes(response[3:4], 'big')
r_length = int.from_bytes(response[4:8], 'big')
response_payload=b''
while len(response_payload) < r_length:
response_payload += sock.recv(r_length)
print(response_payload)
with open("corrections.dat","wb") as f:
f.write(response_payload)
f.close()
2. 根据实际情况配置本机 Python 环境,在代码中找到第 24 行代码: sock.connect(("192.168.1.201",9347)) → 将 "192.168.1.201" 和 9347 修改为激光雷达的实际 IP 地址和端口号,然后运行代码。;
运行成功后,将在当前目录下生成名为 corrections.dat 的角度文件。
3.2.2 串口连接方式
串口连接
参考型号:JT16
1. 根据激光雷达用户手册完成与本机连接后,使用 IDE 打开下方脚本;
import struct
import csv
import serial
import time
from datetime import datetime
from threading import Thread, Event
def parse_angle_data(valid_data):
# Ensure valid_data length is 68 bytes
if len(valid_data) != 68:
raise ValueError("Valid data length must be 68 bytes")
# Initialize lists to store angle values
azimuth_values = []
elevation_values = []
# Parse Azimuth data (first 32 bytes)
for i in range(16):
azimuth_bytes = valid_data[i * 2:(i + 1) * 2]
azimuth_value = struct.unpack('<h', azimuth_bytes)[0] * 0.01 # Little-endian, short type, unit 0.01°
azimuth_values.append(azimuth_value)
# Parse Elevation data (next 32 bytes)
for i in range(16):
elevation_bytes = valid_data[32 + i * 2:32 + (i + 1) * 2]
elevation_value = struct.unpack('<h', elevation_bytes)[0] * 0.01 # Little-endian, short type, unit 0.01°
elevation_values.append(elevation_value)
return azimuth_values, elevation_values
def save_to_csv(elevation_values, azimuth_values, output_file):
with open(output_file, 'w', newline='') as csvfile:
fieldnames = ['Channel', 'Elevation (°)', 'Azimuth (°)']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for i in range(16):
writer.writerow({
'Channel': i + 1,
'Elevation (°)': elevation_values[i],
'Azimuth (°)': azimuth_values[i]
})
def get_angle_correction(send_port: str, receive_port: str, baudrate_UART: int, baudrate_485: int):
# Command configuration
COMMAND = bytes([
0x24, 0x4C, 0x44, 0x43, 0x4D, 0x44, 0x2C, 0x03,
0x02, 0x07, 0x00, 0x95, 0xB3, 0x2C, 0x0A,
0xD3, 0x86, 0x1F, 0xA3, 0xEE, 0xFF
])
# Response pattern configuration
START_MARKER = bytes([0x24, 0x4C, 0x44, 0x41, 0x43, 0x4B,
0x2C, 0x48, 0x02, 0x07, 0x00])
DATA_START_OFFSET = 12 # Start position of valid data
DATA_LENGTH = 68 # Length of data to extract
ERROR_CODE_OFFSET = 80 # Position of error code
TOTAL_TIMEOUT = 8 # Total timeout in seconds
PRE_READ_TIME = 0.5 # Pre-read time in seconds
class AsyncReader:
def __init__(self, ser):
self.ser = ser
self.buffer = bytearray()
self.stop_event = Event()
self.thread = None
def start(self):
self.thread = Thread(target=self._read_loop)
self.thread.start()
def _read_loop(self):
while not self.stop_event.is_set():
if self.ser.in_waiting > 0:
chunk = self.ser.read(self.ser.in_waiting)
self.buffer.extend(chunk)
def stop(self):
self.stop_event.set()
if self.thread.is_alive():
self.thread.join(timeout=1)
def get_buffer(self):
return bytes(self.buffer)
try:
# Generate timestamped filenames
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
output_file = f"JT16_angle_correction_{timestamp}.dat"
log_file = f"JT16_angle_log_{timestamp}.txt"
csv_file = f"JT16_angle_correction_{timestamp}.csv"
# Open receiving port first
with serial.Serial(receive_port, baudrate_485, timeout=0) as ser_recv: # Non-blocking mode
reader = AsyncReader(ser_recv)
reader.start()
# Start receiving data in advance
print(f"[{timestamp}] Starting pre-reception...")
time.sleep(PRE_READ_TIME) # Pre-receive for 500ms
# Send command
with serial.Serial(send_port, baudrate_UART, timeout=1) as ser_send:
print(f"[{timestamp}] Sending command to {send_port}...")
ser_send.write(COMMAND)
ser_send.flush()
# Main processing loop
start_time = time.time()
found = False
with open(log_file, 'w') as log:
log.write(f"Angle Correction Process Log - {timestamp}\n")
log.write("=" * 50 + "\n")
while (time.time() - start_time) < TOTAL_TIMEOUT:
# Get current buffer
current_buffer = reader.get_buffer()
# Find start marker
pos = current_buffer.find(START_MARKER)
if pos != -1:
# Calculate required end position
required_end = pos + DATA_START_OFFSET + DATA_LENGTH + 1
if len(current_buffer) >= required_end:
# Extract response segment
response = current_buffer[pos:required_end]
# Verify data integrity
if (response[11] == 0x44 and
response[ERROR_CODE_OFFSET] == 0x00):
# Extract valid data
valid_data = response[DATA_START_OFFSET:DATA_START_OFFSET + DATA_LENGTH]
# Save as DAT file
with open(output_file, 'wb') as f:
f.write(valid_data)
# # Parse angle data
# azimuth_values, elevation_values = parse_angle_data(valid_data)
#
# # Save as CSV file
# save_to_csv(elevation_values, azimuth_values, csv_file)
log.write(f"Data saved to {output_file}\n")
print(f"Correction file saved successfully: {output_file}")
print(f"CSV file saved successfully: {csv_file}")
found = True
break
# Clean up processed buffer
reader.buffer = reader.buffer[required_end:]
time.sleep(0.01) # Reduce CPU usage
reader.stop()
if not found:
error_msg = "No valid response received" if len(current_buffer) == 0 \
else f"Received {len(current_buffer)} bytes but no valid data found"
log.write(f"Error: {error_msg}\n")
raise TimeoutError(error_msg)
except Exception as e:
error_msg = f"{type(e).__name__}: {str(e)}"
print(f"An error occurred: {error_msg}")
with open(log_file, 'a') as log:
log.write(f"Error: {error_msg}\n")
raise
if __name__ == "__main__":
# Enter UART port, RS485 port, UART baud rate, RS485 baud rate respectively
# Example: Windows: COM*; Ubuntu: /dev/ttyUSB*
get_angle_correction("COM4", "COM5", 9600, 3000000)
2.请根据实际情况配置本机 Python 环境,并修改脚本最后一行中的 RS485 和 RS232 端口号及波特率。注意同一串口在 Windows 和 Ubuntu 系统中的设备名称不同,例如:Windows 系统显示为 COM*,Ubuntu 系统显示为 /dev/ttyUSB*。
3. 运行该脚本可在当前目录生成JT16_angle_correction_ *当前时间*.dat的角度修正文件以及对应的 log 文件。