commit e07f4bb24b778f639f7fcdecaf8fc6dd0ca04308 Author: lidongding <864877956@qq.com> Date: Mon Feb 23 15:13:29 2026 +0800 上传文件至「catkin_truck 」 diff --git a/catkin_truck /catkin_truck.zip b/catkin_truck /catkin_truck.zip new file mode 100644 index 0000000..0200baf Binary files /dev/null and b/catkin_truck /catkin_truck.zip differ diff --git a/catkin_truck /shakehand.py b/catkin_truck /shakehand.py new file mode 100644 index 0000000..c5c0c3b --- /dev/null +++ b/catkin_truck /shakehand.py @@ -0,0 +1,214 @@ +#!/usr/bin/python3 +# -*- coding: utf-8 -*- + +import time +import can +import cantools +import rospy +import signal +import os +import threading +import std_msgs + +is_exit = False + +messages = [ + { + 'message_id': 0x1820FF2F, #messages1 + 'message_name': 'Handshaking', + 'signals': { + 'HandShake_name1': 0x74, + 'HandShake_name2': 0x63, + 'HandShake_name3': 0x64, + 'HandShake_name4': 0x72, + 'HandShake_name5': 0x69, + 'HandShake_name6': 0x30, + 'HandShake_name7': 0x30, + 'HandShake_name8': 0x31 + } + }, + { + 'message_id': 0x0C20112F, # messages4 + 'message_name': 'ElectronicHandbrake_State', + 'signals': { + 'EleHandbrakeStatus': 0, + 'Ele_activate_Status': 0, + 'Ele_checksum': 0, + 'Ele_count': 1, + 'reserve14': 0, + 'reserve15': 0, + 'reserve16': 0, + 'reserve17': 0, + 'reserve18': 0 + } + }, + { + 'message_id': 0x0C20302F, # messages7 + 'message_name': 'GearPosition', + 'signals': { + 'GearCommand': 0, + 'GearHighest': 6, + 'gear_activate': 0, + 'gear_checksum': 0, + 'gear_count': 1, + 'reserve19': 0, + 'reserve20': 0, + 'reserve21': 0, + 'reserve22': 0 + } + }, + { + 'message_id': 0x0C20102F, #messages3 + 'message_name': 'Brake_decel', + 'signals': { + 'BrakeDecel': 0, + 'Decel_activate_Status': 0, + 'Decel_checksum': 0, + 'Decel_count': 1, + 'reserve5':225, + 'reserve6':225, + 'reserve7':225, + 'reserve8':225 + } + },#41 74 FF FF FF FF 00 3F + { + 'message_id': 0x0C20122F, #messages5 + 'message_name': 'RetarderTorque_Percent', + 'signals': { + 'Retarder_per': 0, + 'Retarder_activate': 0, + 'Retarder_checksum': 0, + 'Retarder_count': 1, + 'reserve23':0, + 'reserve24':0, + 'reserve25':0, + 'reserve26':0, + 'reserve27':0 + + } + }, + { + 'message_id': 0x0C20002F, # messages2 + 'message_name': 'AccPedal_percent', + 'signals': { + 'AccPedal_per': 0, + 'Acc_activate_Status': 0, + 'Acc_activate': 0, + 'Acc_checksum': 0, + 'Acc_count': 1, + 'reserve1': 0, + 'reserve2': 0, + 'reserve3': 0, + 'reserve4': 0 + } + }, + { + 'message_id': 0x0C20212F, #messages6 + 'message_name': 'SteerAngle_Control', + 'signals': { + 'Curvature_command': 0, + 'steer_control': 0, + 'steer_activate': 0, + 'steer_checksum': 0, + 'steer_count': 1, + 'steercontrol_model': 1, + 'reserve28': 0 + + } + }, + { + 'message_id': 0x0C20402F, #messages8 + 'message_name': 'Bucket_State', + 'signals': { + 'bucket_command': 0, + 'bucket_activate': 0, + 'bucket_checksum': 0, + 'bucket_count': 1, + 'bucketspeed_model':0, + 'reserve11':0, + 'reserve12':0, + 'reserve13':0, + 'reserve9':0 + } + } +] +#加载dbc文件 +db_file = os.path.join(os.path.abspath('.'), 'src/wirecontrol/config/kk_wiredcontrol.dbc') + +database = cantools.database.load_file(db_file) + +can_interface = 'socketcan' #根据can接口类型进行更改 +bus = can.interface.Bus(channel= 'can0', bustype = can_interface, bitrate = 500000) + +cnt = 0 +hands_flag = False + +def signal_handler(signal, frame): + global is_exit + is_exit = True + +def callback_from_rviz(msg): + global hands_flag + print("hand brake!!!!!") + hands_flag = False + +def send_can_messages(messages): + global cnt + if cnt >= 15: + cnt = 1 + else: + cnt += 1 + # 发送多个CAN消息 + for message in messages: + # 创建CAN消息对象 + print(message) + msg = database.get_message_by_name(message['message_name']) + data = msg.encode(message['signals']) + data1 = list(data) + if message['message_id'] != 0x1820FF2F: + len0 = len(data1) - 1 + i = 0 + sum = 0 + while i < len0: + sum = sum + data1[i] + i = i + 1 + sum = sum + cnt + data1[len0] = (cnt << 4) | ((sum >> 4) + sum) & 0x0f + else: + pass + can_message = can.Message(arbitration_id=msg.frame_id, data=data1, is_extended_id=True) + + # 发送CAN消息 + bus.send(can_message) + +def shakehand_Task(): + global hands_flag + global messages + + cnt1 = 0 + while cnt1 <= 0.5: + cnt1 += 0.01 + send_can_messages(messages) + time.sleep(0.01) + print("shakehand task finish!") + +def main(): + global is_exit + global hands_flag + + signal.signal(signal.SIGINT, signal_handler) + rospy.init_node('wirecontrol_sh', anonymous=False) + rospy.Subscriber("/Handshack", std_msgs.msg.String, callback_from_rviz) + + while not is_exit: + if hands_flag == False: + hands_flag = True + ShakehandTask_threading = threading.Thread(target = shakehand_Task) + ShakehandTask_threading.start() + + time.sleep(1) + rospy.spin() + + +if __name__ == '__main__': + main() diff --git a/catkin_truck /wirecontrol_pub.py b/catkin_truck /wirecontrol_pub.py new file mode 100644 index 0000000..ea179b3 --- /dev/null +++ b/catkin_truck /wirecontrol_pub.py @@ -0,0 +1,207 @@ +#!/usr/bin/python3 +# -*- coding: utf-8 -*- + +# dbc文件调用 +import time +import can +import cantools +import rospy +from msg_gen.msg import Control +import signal +import os +import yaml + + + +is_exit = False + +wirecontrol_can_channel = None +bus = None + +messages = [ + { + 'message_id': 0x0C01A7A5, + 'message_name': 'New_ZT_A7A5_zhuanxiang', + 'signals': { + 'Steering_wheel_steering_angle': 100, + 'Steering_wheel_angular_speed': 350, + 'Steering_superimposed_torque': 17.8, + 'Reserve_2': 0, + 'Reserve_1': 0, + 'Operating_mode_command': 1, + 'life': 0, + 'ECU_status': 1, + 'Corner_state':1 + } + }, + { + 'message_id': 0x0C02A7A5, + 'message_name': 'New_ZT_A7A5_qudong', + 'signals': { + 'Accelerator_anti_false_stepping': 0, + 'Braking_torque': -5000, + 'Decision_system_failure': 0, + 'Driving_torque': 0, + 'Gear_request': 2, + 'life_2': 0 + } + }, + { + 'message_id': 0x0C03A7A5, + 'message_name': 'New_ZT_A7A5_zhidong', + 'signals': { + 'Target_deceleration': -1/2048, + 'Reserve_3': 0, + 'Parking': 1, + 'life3': 0, + 'control_model': 16 + } + }, + { + 'message_id': 0x0C04A7A5, + 'message_name': 'New_ZT_A7A5_dengguang', + 'signals': { + 'Wiper_control': 0, + 'Horn_control': 0, + 'Mittertor': 0, + 'front_door': 0, + 'backup_ight': 0, + 'Hazard_warning_lamp': 0, + 'High_and_low_beam_lamp': 0, + 'cornering_lamp': 0, + 'Driving_mode': 1, + 'Self_inspection': 1, + 'Reserve_8': 0, + 'Reserve_7': 0, + 'Reserve_6': 0, + 'Reserve_5': 0, + 'Reserve_4': 0 + } + } + +] +#加载dbc文件 +database = None + +def init_config(): + global wirecontrol_can_channel + global database + + script_dir = os.path.dirname(os.path.abspath(__file__)) + project_root = os.path.abspath(os.path.join(script_dir, '../../..')) + all_path_list_yaml = os.path.join(project_root, 'ConfigFile', 'yaml', 'AllPathList.yaml') + + with open(all_path_list_yaml, 'r', encoding='utf-8') as f: + config = yaml.safe_load(f) + print("找到AllPathList.yaml文件") + + adas_dbc_file = config.get('adas_dbc', '') + wirecontrol_yaml_path = config.get('wirecontrol_yaml', '') + + if adas_dbc_file and os.path.exists(adas_dbc_file): + print(f"成功读取DBC文件绝对路径: {adas_dbc_file}") + else: + raise FileNotFoundError(f"配置中未找到有效路径或文件不存在: {adas_dbc_file}") + + if wirecontrol_yaml_path and os.path.exists(wirecontrol_yaml_path): + print(f"成功读取YAML文件绝对路径: {wirecontrol_yaml_path}") + try: + with open(wirecontrol_yaml_path, 'r', encoding='utf-8') as f: + wc = yaml.safe_load(f) + if wc is not None and 'wirecontrol_can_channel' in wc: + wirecontrol_can_channel = wc['wirecontrol_can_channel'] + print(f"从配置文件读取的CAN通道: {wirecontrol_can_channel}") + else: + print("配置文件中未找到wirecontrol_can_channel参数") + raise KeyError('wirecontrol_can_channel') + except Exception as e: + print(f"读取配置文件时发生错误: {str(e)}") + raise + else: + raise FileNotFoundError(f"配置中未找到有效路径或文件不存在: {wirecontrol_yaml_path}") + + database = cantools.database.load_file(adas_dbc_file) + +def init_bus(): + can_interface = "socketcan" + return can.interface.Bus(channel=wirecontrol_can_channel, interface=can_interface) + +def signal_handler(signal, frame): + global is_exit + is_exit = True + +def call_back_front_control(Control_DataMsg): + global messages + messages[0]['signals']['Steering_wheel_steering_angle'] = Control_DataMsg.wheelAngle + messages[1]['signals']['Driving_torque'] = Control_DataMsg.AccPedal_per + # messages[1]['signals']['Driving_torque'] = 1 + print("control_DataMsg.GearCommand:",Control_DataMsg.GearCommand) + if(Control_DataMsg.GearCommand == 0):# Neutral + messages[1]['signals']['Gear_request'] = 0 + if(Control_DataMsg.GearCommand == 1):# Drive + messages[1]['signals']['Gear_request'] = 1 + if(Control_DataMsg.GearCommand == -1):# Reverse + messages[1]['signals']['Gear_request'] = 2 + if(Control_DataMsg.GearCommand == -2):# Park + messages[1]['signals']['Gear_request'] = 0 + + messages[2]['signals']['Target_deceleration'] = Control_DataMsg.BrakeDecel + + if(Control_DataMsg.light_right): + messages[3]['signals']['cornering_lamp'] = 2 + elif(Control_DataMsg.light_left): + messages[3]['signals']['cornering_lamp'] = 1 + else: + messages[3]['signals']['cornering_lamp'] = 0 + + messages[3]['signals']['High_and_low_beam_lamp'] = Control_DataMsg.light_near + + + +#发送can信号 +def send_can_messages(messages): + # 发送多个CAN消息 + for message in messages: + # 创建CAN消息对象 + msg = database.get_message_by_name(message['message_name']) + if(message['message_id'] == 0x0C01A7A5): + print("Steer:\t", message['signals']['Steering_wheel_steering_angle']) + if(message['message_id'] == 0x0C02A7A5): + print("Torque:\t", message['signals']['Driving_torque']) + print("Gear:\t", message['signals']['Gear_request']) + if(message['message_id'] == 0x0C03A7A5): + print("Break:\t", message['signals']['Target_deceleration']) + print('----------------------------') + data = msg.encode(message['signals']) + # print(data) + data1 = list(data) + # data = bytes(message['signals']) + # print(data1) + can_message = can.Message(arbitration_id=msg.frame_id, data=data1, is_extended_id=True) + + # 发送CAN消息 + bus.send(can_message) + + + +def main(): + global is_exit + global bus + signal.signal(signal.SIGINT, signal_handler) + rospy.init_node('wirecontrol_pub', anonymous=False) + rospy.Subscriber('/To_WireContorl_publisher', Control, call_back_front_control) + pub_control = rospy.Publisher('Controlrviz_msg', Control, queue_size=10) + ControlMsg = Control() + init_config() + bus = init_bus() + while not is_exit: + send_can_messages(messages) + pub_control.publish(ControlMsg) + time.sleep(0.01) + # 关闭CAN总线 + bus.shutdown() + print('\nall threads were ended by Ctrl-C') + + +if __name__ == '__main__': + main() diff --git a/catkin_truck /wirecontrol_sub.py b/catkin_truck /wirecontrol_sub.py new file mode 100644 index 0000000..6f5e20e --- /dev/null +++ b/catkin_truck /wirecontrol_sub.py @@ -0,0 +1,173 @@ +#!/usr/bin/python3 +# -*- coding: utf-8 -*- + +# 作者: 郭帆、张贺达 + +# 功能描述: +# 通过can通道接受线控反馈数据 +# 根据can协议(dbc文件)进行数据解析 +# 解析后的数据按照wirecontrol_feed_back_msg数据格式进行发布 + +#最后修改日期:2023-08-23 +import time +import can +import cantools +import rospy +import rosparam +import signal +import os +import yaml + +from msg_gen.msg import ToControl +from msg_gen.msg import Power +from msg_gen.msg import State +from msg_gen.msg import Break +from msg_gen.msg import Engine +from msg_gen.msg import Driving +from msg_gen.msg import MileMeter + + +Running_Flag = True + +def signal_handler(signal, frame): + global Running_Flag + Running_Flag = False + +def main(): + wirecontrol_can_channel = None + global Running_Flag + + signal.signal(signal.SIGINT, signal_handler) + rospy.init_node("wirecontrol_sub") + # 按照 GPS 接收脚本的方式读取 AllPathList.yaml + script_dir = os.path.dirname(os.path.abspath(__file__)) + project_root = os.path.abspath(os.path.join(script_dir, '../../..')) + all_path_list_yaml = os.path.join(project_root, 'ConfigFile', 'yaml', 'AllPathList.yaml') + + with open(all_path_list_yaml, 'r', encoding='utf-8') as f: + config = yaml.safe_load(f) + print("找到AllPathList.yaml文件") + + feedback_dbc_file = config.get('NewZt_zhd_1005_dbc', '') + wirecontrol_yaml_path = config.get('wirecontrol_yaml', '') + + if feedback_dbc_file and os.path.exists(feedback_dbc_file): + print(f"成功读取DBC文件绝对路径: {feedback_dbc_file}") + else: + print(f"配置中未找到有效路径或文件不存在: {feedback_dbc_file}") + return + + if wirecontrol_yaml_path and os.path.exists(wirecontrol_yaml_path): + print(f"成功读取YAML文件绝对路径: {wirecontrol_yaml_path}") + try: + with open(wirecontrol_yaml_path, 'r', encoding='utf-8') as f: + wc = yaml.safe_load(f) + if wc is not None and 'wirecontrol_can_channel' in wc: + wirecontrol_can_channel = wc['wirecontrol_can_channel'] + print(f"从配置文件读取的CAN通道: {wirecontrol_can_channel}") + else: + print("配置文件中未找到wirecontrol_can_channel参数") + return + except Exception as e: + print(f"读取配置文件时发生错误: {str(e)}") + return + else: + print(f"配置中未找到有效路径或文件不存在: {wirecontrol_yaml_path}") + return + + pub_tocontrol = rospy.Publisher('/ToControl_msg', ToControl, queue_size=10) + pub_v2n_1 = rospy.Publisher('/Power_msg', Power, queue_size=10) + pub_v2n_2 = rospy.Publisher('/State_msg', State, queue_size=10) + pub_v2n_3 = rospy.Publisher('/Break_msg', Break, queue_size=10) + pub_v2n_4 = rospy.Publisher('/Engine_msg', Engine, queue_size=10) + pub_v2n_5 = rospy.Publisher('/Driving_msg', Driving, queue_size=10) + pub_v2n_6 = rospy.Publisher('/MileMeter_msg', MileMeter, queue_size=10) + + ToControlMsg = ToControl() + power0 = Power() + state0 = State() + Break0 = Break() + Engine0 = Engine() + Drive0 = Driving() + MileMeter0 = MileMeter() + + # 验证DBC文件是否存在(上方已验证路径有效,这里直接加载) + db = cantools.db.load_file(feedback_dbc_file) + + # 与 GPS 接收脚本一致,使用 bustype 参数 + try: + bus = can.interface.Bus(channel=wirecontrol_can_channel, bustype='socketcan') + connect_flag = True + except: + connect_flag = False + + # 打印DBC文件中已知的消息ID,用于调试 + known_ids = list(db._frame_id_to_message.keys()) + print(f"DBC文件中已知的消息ID: {[hex(id) for id in known_ids]}") + + # 我们关心的特定ID + target_ids = [0x0C01A5A7, 0x0C02A5A7] + print(f"我们关心的目标ID: {[hex(id) for id in target_ids]}") + + try: + while Running_Flag and connect_flag and not rospy.is_shutdown(): + message = bus.recv() + + # 跳过无效消息 + if message is None: + continue + + if message.arbitration_id not in target_ids: + # 静默跳过非目标ID + # print("esc") + continue + try: + if message.arbitration_id == 0x0C01A5A7: + decoded_message = db.decode_message(message.arbitration_id, message.data) + # print(decoded_message) + ToControlMsg.wheel_angle = decoded_message["steer_angle_fb"] + # print("getin") + + ToControlMsg.LinkState = True + # print(f"解码 0x0C01A5A7: 方向盘角度 = {ToControlMsg.wheel_angle}") + + elif message.arbitration_id == 0x0C02A5A7: + decoded_message = db.decode_message(message.arbitration_id, message.data) + ToControlMsg.Car_Control_Mode = decoded_message["Veh_running_state"] + ToControlMsg.EPB_Parking_State = decoded_message["Parking_Status"] + if (decoded_message["Gear_signal"] == 2): + decoded_message["Gear_signal"] = -1 #todo + ToControlMsg.current_Gear = decoded_message["Gear_signal"] + ToControlMsg.Gas_pedal_percentage = decoded_message["ACC_pedal"] + ToControlMsg.Break_pedal_percentage = decoded_message["Break_padel"] + ToControlMsg.Speed_byGear = decoded_message["V_speed"] + # print(f"解码 0x0C02A5A7: 速度 = {ToControlMsg.Speed_byGear}, 档位 = {ToControlMsg.current_Gear}") + + # 发布消息 + pub_tocontrol.publish(ToControlMsg) + pub_v2n_1.publish(power0) + pub_v2n_2.publish(state0) + pub_v2n_3.publish(Break0) + pub_v2n_4.publish(Engine0) + pub_v2n_5.publish(Drive0) + pub_v2n_6.publish(MileMeter0) + + except KeyError as e: + print(f"KeyError: 消息ID 0x{message.arbitration_id:X} 在DBC中未找到对应的定义") + print(f"错误详情: {e}") + continue + except Exception as e: + print(f"解码消息时出错 (ID: 0x{message.arbitration_id:X}): {e}") + continue + + except KeyboardInterrupt: + print("程序被用户中断") + except Exception as e: + print(f"程序运行出错: {e}") + finally: + if 'bus' in locals(): + bus.shutdown() + print("程序退出") + +if __name__ == '__main__': + main() \ No newline at end of file