Files
linshi/catkin_truck /wirecontrol_sub.py

173 lines
6.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# 作者: 郭帆、张贺达
# 功能描述:
# 通过can通道接受线控反馈数据
# 根据can协议dbc文件进行数据解析
# 解析后的数据按照wirecontrol_feed_back_msg数据格式进行发布
#最后修改日期:2023-08-23
import time
import can
import cantools
import rospy
import rosparam
import signal
import os
import yaml
from msg_gen.msg import ToControl
from msg_gen.msg import Power
from msg_gen.msg import State
from msg_gen.msg import Break
from msg_gen.msg import Engine
from msg_gen.msg import Driving
from msg_gen.msg import MileMeter
Running_Flag = True
def signal_handler(signal, frame):
global Running_Flag
Running_Flag = False
def main():
wirecontrol_can_channel = None
global Running_Flag
signal.signal(signal.SIGINT, signal_handler)
rospy.init_node("wirecontrol_sub")
# 按照 GPS 接收脚本的方式读取 AllPathList.yaml
script_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.abspath(os.path.join(script_dir, '../../..'))
all_path_list_yaml = os.path.join(project_root, 'ConfigFile', 'yaml', 'AllPathList.yaml')
with open(all_path_list_yaml, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
print("找到AllPathList.yaml文件")
feedback_dbc_file = config.get('NewZt_zhd_1005_dbc', '')
wirecontrol_yaml_path = config.get('wirecontrol_yaml', '')
if feedback_dbc_file and os.path.exists(feedback_dbc_file):
print(f"成功读取DBC文件绝对路径: {feedback_dbc_file}")
else:
print(f"配置中未找到有效路径或文件不存在: {feedback_dbc_file}")
return
if wirecontrol_yaml_path and os.path.exists(wirecontrol_yaml_path):
print(f"成功读取YAML文件绝对路径: {wirecontrol_yaml_path}")
try:
with open(wirecontrol_yaml_path, 'r', encoding='utf-8') as f:
wc = yaml.safe_load(f)
if wc is not None and 'wirecontrol_can_channel' in wc:
wirecontrol_can_channel = wc['wirecontrol_can_channel']
print(f"从配置文件读取的CAN通道: {wirecontrol_can_channel}")
else:
print("配置文件中未找到wirecontrol_can_channel参数")
return
except Exception as e:
print(f"读取配置文件时发生错误: {str(e)}")
return
else:
print(f"配置中未找到有效路径或文件不存在: {wirecontrol_yaml_path}")
return
pub_tocontrol = rospy.Publisher('/ToControl_msg', ToControl, queue_size=10)
pub_v2n_1 = rospy.Publisher('/Power_msg', Power, queue_size=10)
pub_v2n_2 = rospy.Publisher('/State_msg', State, queue_size=10)
pub_v2n_3 = rospy.Publisher('/Break_msg', Break, queue_size=10)
pub_v2n_4 = rospy.Publisher('/Engine_msg', Engine, queue_size=10)
pub_v2n_5 = rospy.Publisher('/Driving_msg', Driving, queue_size=10)
pub_v2n_6 = rospy.Publisher('/MileMeter_msg', MileMeter, queue_size=10)
ToControlMsg = ToControl()
power0 = Power()
state0 = State()
Break0 = Break()
Engine0 = Engine()
Drive0 = Driving()
MileMeter0 = MileMeter()
# 验证DBC文件是否存在上方已验证路径有效这里直接加载
db = cantools.db.load_file(feedback_dbc_file)
# 与 GPS 接收脚本一致,使用 bustype 参数
try:
bus = can.interface.Bus(channel=wirecontrol_can_channel, bustype='socketcan')
connect_flag = True
except:
connect_flag = False
# 打印DBC文件中已知的消息ID用于调试
known_ids = list(db._frame_id_to_message.keys())
print(f"DBC文件中已知的消息ID: {[hex(id) for id in known_ids]}")
# 我们关心的特定ID
target_ids = [0x0C01A5A7, 0x0C02A5A7]
print(f"我们关心的目标ID: {[hex(id) for id in target_ids]}")
try:
while Running_Flag and connect_flag and not rospy.is_shutdown():
message = bus.recv()
# 跳过无效消息
if message is None:
continue
if message.arbitration_id not in target_ids:
# 静默跳过非目标ID
# print("esc")
continue
try:
if message.arbitration_id == 0x0C01A5A7:
decoded_message = db.decode_message(message.arbitration_id, message.data)
# print(decoded_message)
ToControlMsg.wheel_angle = decoded_message["steer_angle_fb"]
# print("getin")
ToControlMsg.LinkState = True
# print(f"解码 0x0C01A5A7: 方向盘角度 = {ToControlMsg.wheel_angle}")
elif message.arbitration_id == 0x0C02A5A7:
decoded_message = db.decode_message(message.arbitration_id, message.data)
ToControlMsg.Car_Control_Mode = decoded_message["Veh_running_state"]
ToControlMsg.EPB_Parking_State = decoded_message["Parking_Status"]
if (decoded_message["Gear_signal"] == 2):
decoded_message["Gear_signal"] = -1 #todo
ToControlMsg.current_Gear = decoded_message["Gear_signal"]
ToControlMsg.Gas_pedal_percentage = decoded_message["ACC_pedal"]
ToControlMsg.Break_pedal_percentage = decoded_message["Break_padel"]
ToControlMsg.Speed_byGear = decoded_message["V_speed"]
# print(f"解码 0x0C02A5A7: 速度 = {ToControlMsg.Speed_byGear}, 档位 = {ToControlMsg.current_Gear}")
# 发布消息
pub_tocontrol.publish(ToControlMsg)
pub_v2n_1.publish(power0)
pub_v2n_2.publish(state0)
pub_v2n_3.publish(Break0)
pub_v2n_4.publish(Engine0)
pub_v2n_5.publish(Drive0)
pub_v2n_6.publish(MileMeter0)
except KeyError as e:
print(f"KeyError: 消息ID 0x{message.arbitration_id:X} 在DBC中未找到对应的定义")
print(f"错误详情: {e}")
continue
except Exception as e:
print(f"解码消息时出错 (ID: 0x{message.arbitration_id:X}): {e}")
continue
except KeyboardInterrupt:
print("程序被用户中断")
except Exception as e:
print(f"程序运行出错: {e}")
finally:
if 'bus' in locals():
bus.shutdown()
print("程序退出")
if __name__ == '__main__':
main()