跳到主要内容

获取角度修正文件

1 概述

角度修正文件是系统标定、点云解算和空间定位的方向基准。为确保测量精度,禾赛对每台激光雷达出厂时进行独立角度标定,每台的标定结果不同,该唯一修正数据记录为“角度修正文件”并随激光雷达提供。

激光雷达连接方式分为以太网连接串口连接;角度文件格式包括CSV 格式DAT 格式;具体型号参见表格。

激光雷达型号与连接方式对应关系表:

连接方式以太网连接串口连接
产品系列Pandar、OT、QT、XT、ATJT
适用产品Pandar40P、Pandar64、Pandar128E3X、OT128、PandarQT、QT128C2X、XT32、XT16、XT32M2X、AT128E2X、AT128P、ATXJT16

激光雷达型号与文件格式对应关系表:

文件格式CSV 格式DAT 格式
产品系列Pandar、OT、QT、XTAT、JT
适用产品Pandar40P、Pandar64、Pandar128E3X、OT128、PandarQT、QT128C2X、XT32、XT16、XT32M2XAT128E2X、AT128P、ATX、JT16

2 使用方法

2.1 使用 PandarView 2 客户端

2.2 使用 API 指令

2.2.1 以太网连接方式

CSV 格式
DAT 格式

2.2.2 串口连接方式

串口连接

本文将分别介绍两种方法获取角度修正文件的相应实例,可根据实际情况选择其中一种或多种进行使用。

3 实例介绍

注:以下实例基于 XT32 、AT128P 和 JT16 的用户及指令手册实现,实际各个型号激光雷达的配置可能有所不同,具体内容请参考对应型号的用户手册。

3.1 使用 PandarView 2 客户端获取角度修正文件

1. 访问禾赛官网 下载并安装 PandarView 2 客户端;

2. 将激光雷达连接至电脑主机,确保其能够与主机正常建立通信;(以太网连接方法详见如何配置以太网;串口连接方法详见相关激光雷达用户手册)

3. 打开 PandarView 2 客户端,点击左上角工具栏 [ Listen for Data] 按钮 ,激光雷达连接方式分为以太网连接串口连接,两种方式配置方式略有不同,请按照激光雷达实际参数对应填写。

注:串口连接时 PandarView 2 版本须大于 2.1.7,弹出页选择 Serial Port 选项并填写 RS485 串口地址与波特率,参考型号:JT16

连接配置界面

完成上述参数配置后点击[ OK ] 确认;

4. 点击工具栏 [ Correction ] 按钮,进入 Angle Correction 栏并点击 Export,弹出的窗口中输入文件名并选择保存位置,然后点击确认以完成导出。

注:Ubuntu 系统下保存文件时必须添加正确的文件后缀 (.csv/.dat)

文件导出界面

3.2 使用 API 指令获取角度修正文件

3.2.1 以太网连接方式

CSV 格式

参考型号:XT32

1. 本机与连接激光雷达后,使用 IDE 打开下方脚本;

import socket, ssl, pprint, time
import random
import struct

default_host = "192.168.1.201" # Change this to your Lidar IP
default_port = 9347 # Change this to your Lidar PTC Port

class PTC:
def __init__(self, host=default_host, port=default_port):
self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
lp = random.randint(10000, 30000)
self.s.bind(('0.0.0.0', lp))
self.s.settimeout(30)
self.s.connect((host, port))

def closeSocket(self):
self.s.shutdown(1)
# self.s.close()

def ByteToHex(self, h):
return ''.join(["%02x" % x for x in h]).strip()

def read_bytes(self, payload_size):
chunks = []
bytes_received = 0
while bytes_received < payload_size:
chunk = self.s.recv(payload_size - bytes_received)
if chunk == b"":
raise RuntimeError("Socket has been unexpectedly closed")
chunks.append(chunk)
bytes_received = bytes_received + len(chunk)

return b"".join(chunks)

def sender(self, cmd_code, payload):
"""
:param cmd_code:
:param payload: payload should input like '015d010207'
:return:
"""
if cmd_code in [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e',
'f']:
cmd_code = "0" + str(cmd_code)
print("payload :")
print(payload)
if not payload or payload.upper() == "NONE":
payload_len = 0
p = '4774' + str(cmd_code) + "00" + struct.pack('>L', payload_len).hex()
else:
payload_len = len(bytes.fromhex(payload))
p = '4774' + str(cmd_code) + "00" + struct.pack('>L', payload_len).hex() + payload
data = bytes.fromhex(p)
self.s.send(data)
response = self.s.recv(8)
print("response: ")
print(response)
r_cmd = bytes.hex(response[2:3])
r_returnCode = bytes.hex(response[3:4])
if bytes.hex(response[4:8]) == "\x00\x00\x00\x00":
r_length = 0
response_payload = ""
else:
r_length = int(bytes.hex(response[4:8]), 16)
response_payload = self.read_bytes(r_length)
print("command is: %s, get return code: %s, return length: %s, \nreturn string:\n%s" % (
r_cmd, r_returnCode, r_length, response_payload))
final_response = {
"response_command": r_cmd,
"response_return_code": r_returnCode,
"response_payload_length": r_length,
"response_payload": response_payload
}
return final_response

if __name__ == "__main__":
ss = PTC()
print(ss.sender('05', None)) # Example command to 'Get Current Fault Log', change the command code and payload as needed

# save it as a CSV file
response = ss.sender('05', None)
correction_data = response.get("response_payload", b"")
with open("angle_correction.csv", "wb") as f:
f.write(correction_data)
print("The data has been saved as angle_correction.csv.")

ss.closeSocket()

2. 根据实际情况配置本机 Python 环境;在脚本中找到第 5、6 行代码:

default_host = "192.168.1.201"default_port = 9347→ 将 "192.168.1.201"9347 修改为激光雷达的实际 IP 地址和端口号,然后运行代码。

运行成功后,将在当前目录下生成名为 angle_correction.csv 的角度文件。

DAT 格式

参考型号:AT128P

1. 本机与连接激光雷达后,使用 IDE 打开下方脚本;

from socket import *

sock = socket(AF_INET,SOCK_STREAM)

message = bytearray()
message1 = bytearray()
SN = bytearray()

# header

message.append(0x47)
message.append(0x74)

# command

##################################################################
message.append(0x05) #0x05 for Get Angle Correction File
##################################################################

# return code

message.append(0x00)

# Tail

message.append(0x00)
message.append(0x00)
message.append(0x00)
message.append(0x00) #0x01 payload data length for the command
message.append(0x00) #payload data value, 0x01 means PTP STATUS

sock.connect(("192.168.1.201",9347))#the ip of device and Port you need to call(fixed)

sock.send(message)
response=sock.recv(8)
r_cmd = int.from_bytes(response[2:3], 'big')
r_returnCode = int.from_bytes(response[3:4], 'big')
r_length = int.from_bytes(response[4:8], 'big')
response_payload=b''
while len(response_payload) < r_length:
response_payload += sock.recv(r_length)

print(response_payload)
with open("corrections.dat","wb") as f:
f.write(response_payload)
f.close()

2. 根据实际情况配置本机 Python 环境,在代码中找到第 24 行代码: sock.connect(("192.168.1.201",9347)) → 将 "192.168.1.201"9347 修改为激光雷达的实际 IP 地址和端口号,然后运行代码。;

运行成功后,将在当前目录下生成名为 corrections.dat 的角度文件。

3.2.2 串口连接方式

串口连接

参考型号:JT16

1. 根据激光雷达用户手册完成与本机连接后,使用 IDE 打开下方脚本;

import struct
import csv
import serial
import time
from datetime import datetime
from threading import Thread, Event


def parse_angle_data(valid_data):
# Ensure valid_data length is 68 bytes
if len(valid_data) != 68:
raise ValueError("Valid data length must be 68 bytes")

# Initialize lists to store angle values
azimuth_values = []
elevation_values = []

# Parse Azimuth data (first 32 bytes)
for i in range(16):
azimuth_bytes = valid_data[i * 2:(i + 1) * 2]
azimuth_value = struct.unpack('<h', azimuth_bytes)[0] * 0.01 # Little-endian, short type, unit 0.01°
azimuth_values.append(azimuth_value)

# Parse Elevation data (next 32 bytes)
for i in range(16):
elevation_bytes = valid_data[32 + i * 2:32 + (i + 1) * 2]
elevation_value = struct.unpack('<h', elevation_bytes)[0] * 0.01 # Little-endian, short type, unit 0.01°
elevation_values.append(elevation_value)

return azimuth_values, elevation_values


def save_to_csv(elevation_values, azimuth_values, output_file):
with open(output_file, 'w', newline='') as csvfile:
fieldnames = ['Channel', 'Elevation (°)', 'Azimuth (°)']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)

writer.writeheader()
for i in range(16):
writer.writerow({
'Channel': i + 1,
'Elevation (°)': elevation_values[i],
'Azimuth (°)': azimuth_values[i]
})


def get_angle_correction(send_port: str, receive_port: str, baudrate_UART: int, baudrate_485: int):
# Command configuration
COMMAND = bytes([
0x24, 0x4C, 0x44, 0x43, 0x4D, 0x44, 0x2C, 0x03,

0x02, 0x07, 0x00, 0x95, 0xB3, 0x2C, 0x0A,

0xD3, 0x86, 0x1F, 0xA3, 0xEE, 0xFF
])

# Response pattern configuration
START_MARKER = bytes([0x24, 0x4C, 0x44, 0x41, 0x43, 0x4B,
0x2C, 0x48, 0x02, 0x07, 0x00])
DATA_START_OFFSET = 12 # Start position of valid data
DATA_LENGTH = 68 # Length of data to extract
ERROR_CODE_OFFSET = 80 # Position of error code
TOTAL_TIMEOUT = 8 # Total timeout in seconds
PRE_READ_TIME = 0.5 # Pre-read time in seconds

class AsyncReader:
def __init__(self, ser):
self.ser = ser
self.buffer = bytearray()
self.stop_event = Event()
self.thread = None

def start(self):
self.thread = Thread(target=self._read_loop)
self.thread.start()

def _read_loop(self):
while not self.stop_event.is_set():
if self.ser.in_waiting > 0:
chunk = self.ser.read(self.ser.in_waiting)
self.buffer.extend(chunk)

def stop(self):
self.stop_event.set()
if self.thread.is_alive():
self.thread.join(timeout=1)

def get_buffer(self):
return bytes(self.buffer)

try:
# Generate timestamped filenames
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
output_file = f"JT16_angle_correction_{timestamp}.dat"
log_file = f"JT16_angle_log_{timestamp}.txt"
csv_file = f"JT16_angle_correction_{timestamp}.csv"
# Open receiving port first
with serial.Serial(receive_port, baudrate_485, timeout=0) as ser_recv: # Non-blocking mode
reader = AsyncReader(ser_recv)
reader.start()

# Start receiving data in advance
print(f"[{timestamp}] Starting pre-reception...")
time.sleep(PRE_READ_TIME) # Pre-receive for 500ms
# Send command
with serial.Serial(send_port, baudrate_UART, timeout=1) as ser_send:
print(f"[{timestamp}] Sending command to {send_port}...")
ser_send.write(COMMAND)
ser_send.flush()

# Main processing loop
start_time = time.time()
found = False

with open(log_file, 'w') as log:
log.write(f"Angle Correction Process Log - {timestamp}\n")
log.write("=" * 50 + "\n")

while (time.time() - start_time) < TOTAL_TIMEOUT:
# Get current buffer
current_buffer = reader.get_buffer()

# Find start marker
pos = current_buffer.find(START_MARKER)
if pos != -1:
# Calculate required end position
required_end = pos + DATA_START_OFFSET + DATA_LENGTH + 1

if len(current_buffer) >= required_end:
# Extract response segment
response = current_buffer[pos:required_end]

# Verify data integrity
if (response[11] == 0x44 and
response[ERROR_CODE_OFFSET] == 0x00):
# Extract valid data
valid_data = response[DATA_START_OFFSET:DATA_START_OFFSET + DATA_LENGTH]

# Save as DAT file
with open(output_file, 'wb') as f:
f.write(valid_data)

# # Parse angle data
# azimuth_values, elevation_values = parse_angle_data(valid_data)
#
# # Save as CSV file
# save_to_csv(elevation_values, azimuth_values, csv_file)

log.write(f"Data saved to {output_file}\n")
print(f"Correction file saved successfully: {output_file}")
print(f"CSV file saved successfully: {csv_file}")
found = True
break

# Clean up processed buffer
reader.buffer = reader.buffer[required_end:]

time.sleep(0.01) # Reduce CPU usage

reader.stop()

if not found:
error_msg = "No valid response received" if len(current_buffer) == 0 \
else f"Received {len(current_buffer)} bytes but no valid data found"
log.write(f"Error: {error_msg}\n")
raise TimeoutError(error_msg)

except Exception as e:
error_msg = f"{type(e).__name__}: {str(e)}"
print(f"An error occurred: {error_msg}")
with open(log_file, 'a') as log:
log.write(f"Error: {error_msg}\n")
raise


if __name__ == "__main__":
# Enter UART port, RS485 port, UART baud rate, RS485 baud rate respectively
# Example: Windows: COM*; Ubuntu: /dev/ttyUSB*
get_angle_correction("COM4", "COM5", 9600, 3000000)

2.请根据实际情况配置本机 Python 环境,并修改脚本最后一行中的 RS485RS232 端口号及波特率。注意同一串口在 Windows 和 Ubuntu 系统中的设备名称不同,例如:Windows 系统显示为 COM*,Ubuntu 系统显示为 /dev/ttyUSB*。

3. 运行该脚本可在当前目录生成JT16_angle_correction_ *当前时间*.dat的角度修正文件以及对应的 log 文件。