diff --git a/catkin_truck /wirecontrol_sub.py b/catkin_truck /wirecontrol_sub.py deleted file mode 100644 index 6f5e20e..0000000 --- a/catkin_truck /wirecontrol_sub.py +++ /dev/null @@ -1,173 +0,0 @@ -#!/usr/bin/python3 -# -*- coding: utf-8 -*- - -# 作者: 郭帆、张贺达 - -# 功能描述: -# 通过can通道接受线控反馈数据 -# 根据can协议(dbc文件)进行数据解析 -# 解析后的数据按照wirecontrol_feed_back_msg数据格式进行发布 - -#最后修改日期:2023-08-23 -import time -import can -import cantools -import rospy -import rosparam -import signal -import os -import yaml - -from msg_gen.msg import ToControl -from msg_gen.msg import Power -from msg_gen.msg import State -from msg_gen.msg import Break -from msg_gen.msg import Engine -from msg_gen.msg import Driving -from msg_gen.msg import MileMeter - - -Running_Flag = True - -def signal_handler(signal, frame): - global Running_Flag - Running_Flag = False - -def main(): - wirecontrol_can_channel = None - global Running_Flag - - signal.signal(signal.SIGINT, signal_handler) - rospy.init_node("wirecontrol_sub") - # 按照 GPS 接收脚本的方式读取 AllPathList.yaml - script_dir = os.path.dirname(os.path.abspath(__file__)) - project_root = os.path.abspath(os.path.join(script_dir, '../../..')) - all_path_list_yaml = os.path.join(project_root, 'ConfigFile', 'yaml', 'AllPathList.yaml') - - with open(all_path_list_yaml, 'r', encoding='utf-8') as f: - config = yaml.safe_load(f) - print("找到AllPathList.yaml文件") - - feedback_dbc_file = config.get('NewZt_zhd_1005_dbc', '') - wirecontrol_yaml_path = config.get('wirecontrol_yaml', '') - - if feedback_dbc_file and os.path.exists(feedback_dbc_file): - print(f"成功读取DBC文件绝对路径: {feedback_dbc_file}") - else: - print(f"配置中未找到有效路径或文件不存在: {feedback_dbc_file}") - return - - if wirecontrol_yaml_path and os.path.exists(wirecontrol_yaml_path): - print(f"成功读取YAML文件绝对路径: {wirecontrol_yaml_path}") - try: - with open(wirecontrol_yaml_path, 'r', encoding='utf-8') as f: - wc = yaml.safe_load(f) - if wc is not None and 'wirecontrol_can_channel' in wc: - wirecontrol_can_channel = wc['wirecontrol_can_channel'] - print(f"从配置文件读取的CAN通道: {wirecontrol_can_channel}") - else: - print("配置文件中未找到wirecontrol_can_channel参数") - return - except Exception as e: - print(f"读取配置文件时发生错误: {str(e)}") - return - else: - print(f"配置中未找到有效路径或文件不存在: {wirecontrol_yaml_path}") - return - - pub_tocontrol = rospy.Publisher('/ToControl_msg', ToControl, queue_size=10) - pub_v2n_1 = rospy.Publisher('/Power_msg', Power, queue_size=10) - pub_v2n_2 = rospy.Publisher('/State_msg', State, queue_size=10) - pub_v2n_3 = rospy.Publisher('/Break_msg', Break, queue_size=10) - pub_v2n_4 = rospy.Publisher('/Engine_msg', Engine, queue_size=10) - pub_v2n_5 = rospy.Publisher('/Driving_msg', Driving, queue_size=10) - pub_v2n_6 = rospy.Publisher('/MileMeter_msg', MileMeter, queue_size=10) - - ToControlMsg = ToControl() - power0 = Power() - state0 = State() - Break0 = Break() - Engine0 = Engine() - Drive0 = Driving() - MileMeter0 = MileMeter() - - # 验证DBC文件是否存在(上方已验证路径有效,这里直接加载) - db = cantools.db.load_file(feedback_dbc_file) - - # 与 GPS 接收脚本一致,使用 bustype 参数 - try: - bus = can.interface.Bus(channel=wirecontrol_can_channel, bustype='socketcan') - connect_flag = True - except: - connect_flag = False - - # 打印DBC文件中已知的消息ID,用于调试 - known_ids = list(db._frame_id_to_message.keys()) - print(f"DBC文件中已知的消息ID: {[hex(id) for id in known_ids]}") - - # 我们关心的特定ID - target_ids = [0x0C01A5A7, 0x0C02A5A7] - print(f"我们关心的目标ID: {[hex(id) for id in target_ids]}") - - try: - while Running_Flag and connect_flag and not rospy.is_shutdown(): - message = bus.recv() - - # 跳过无效消息 - if message is None: - continue - - if message.arbitration_id not in target_ids: - # 静默跳过非目标ID - # print("esc") - continue - try: - if message.arbitration_id == 0x0C01A5A7: - decoded_message = db.decode_message(message.arbitration_id, message.data) - # print(decoded_message) - ToControlMsg.wheel_angle = decoded_message["steer_angle_fb"] - # print("getin") - - ToControlMsg.LinkState = True - # print(f"解码 0x0C01A5A7: 方向盘角度 = {ToControlMsg.wheel_angle}") - - elif message.arbitration_id == 0x0C02A5A7: - decoded_message = db.decode_message(message.arbitration_id, message.data) - ToControlMsg.Car_Control_Mode = decoded_message["Veh_running_state"] - ToControlMsg.EPB_Parking_State = decoded_message["Parking_Status"] - if (decoded_message["Gear_signal"] == 2): - decoded_message["Gear_signal"] = -1 #todo - ToControlMsg.current_Gear = decoded_message["Gear_signal"] - ToControlMsg.Gas_pedal_percentage = decoded_message["ACC_pedal"] - ToControlMsg.Break_pedal_percentage = decoded_message["Break_padel"] - ToControlMsg.Speed_byGear = decoded_message["V_speed"] - # print(f"解码 0x0C02A5A7: 速度 = {ToControlMsg.Speed_byGear}, 档位 = {ToControlMsg.current_Gear}") - - # 发布消息 - pub_tocontrol.publish(ToControlMsg) - pub_v2n_1.publish(power0) - pub_v2n_2.publish(state0) - pub_v2n_3.publish(Break0) - pub_v2n_4.publish(Engine0) - pub_v2n_5.publish(Drive0) - pub_v2n_6.publish(MileMeter0) - - except KeyError as e: - print(f"KeyError: 消息ID 0x{message.arbitration_id:X} 在DBC中未找到对应的定义") - print(f"错误详情: {e}") - continue - except Exception as e: - print(f"解码消息时出错 (ID: 0x{message.arbitration_id:X}): {e}") - continue - - except KeyboardInterrupt: - print("程序被用户中断") - except Exception as e: - print(f"程序运行出错: {e}") - finally: - if 'bus' in locals(): - bus.shutdown() - print("程序退出") - -if __name__ == '__main__': - main() \ No newline at end of file