#!/usr/bin/python3 # -*- coding: utf-8 -*- # dbc文件调用 import time import can import cantools import rospy from msg_gen.msg import Control import signal import os import yaml is_exit = False wirecontrol_can_channel = None bus = None messages = [ { 'message_id': 0x0C01A7A5, 'message_name': 'New_ZT_A7A5_zhuanxiang', 'signals': { 'Steering_wheel_steering_angle': 100, 'Steering_wheel_angular_speed': 350, 'Steering_superimposed_torque': 17.8, 'Reserve_2': 0, 'Reserve_1': 0, 'Operating_mode_command': 1, 'life': 0, 'ECU_status': 1, 'Corner_state':1 } }, { 'message_id': 0x0C02A7A5, 'message_name': 'New_ZT_A7A5_qudong', 'signals': { 'Accelerator_anti_false_stepping': 0, 'Braking_torque': -5000, 'Decision_system_failure': 0, 'Driving_torque': 0, 'Gear_request': 2, 'life_2': 0 } }, { 'message_id': 0x0C03A7A5, 'message_name': 'New_ZT_A7A5_zhidong', 'signals': { 'Target_deceleration': -1/2048, 'Reserve_3': 0, 'Parking': 1, 'life3': 0, 'control_model': 16 } }, { 'message_id': 0x0C04A7A5, 'message_name': 'New_ZT_A7A5_dengguang', 'signals': { 'Wiper_control': 0, 'Horn_control': 0, 'Mittertor': 0, 'front_door': 0, 'backup_ight': 0, 'Hazard_warning_lamp': 0, 'High_and_low_beam_lamp': 0, 'cornering_lamp': 0, 'Driving_mode': 1, 'Self_inspection': 1, 'Reserve_8': 0, 'Reserve_7': 0, 'Reserve_6': 0, 'Reserve_5': 0, 'Reserve_4': 0 } } ] #加载dbc文件 database = None def init_config(): global wirecontrol_can_channel global database script_dir = os.path.dirname(os.path.abspath(__file__)) project_root = os.path.abspath(os.path.join(script_dir, '../../..')) all_path_list_yaml = os.path.join(project_root, 'ConfigFile', 'yaml', 'AllPathList.yaml') with open(all_path_list_yaml, 'r', encoding='utf-8') as f: config = yaml.safe_load(f) print("找到AllPathList.yaml文件") adas_dbc_file = config.get('adas_dbc', '') wirecontrol_yaml_path = config.get('wirecontrol_yaml', '') if adas_dbc_file and os.path.exists(adas_dbc_file): print(f"成功读取DBC文件绝对路径: {adas_dbc_file}") else: raise FileNotFoundError(f"配置中未找到有效路径或文件不存在: {adas_dbc_file}") if wirecontrol_yaml_path and os.path.exists(wirecontrol_yaml_path): print(f"成功读取YAML文件绝对路径: {wirecontrol_yaml_path}") try: with open(wirecontrol_yaml_path, 'r', encoding='utf-8') as f: wc = yaml.safe_load(f) if wc is not None and 'wirecontrol_can_channel' in wc: wirecontrol_can_channel = wc['wirecontrol_can_channel'] print(f"从配置文件读取的CAN通道: {wirecontrol_can_channel}") else: print("配置文件中未找到wirecontrol_can_channel参数") raise KeyError('wirecontrol_can_channel') except Exception as e: print(f"读取配置文件时发生错误: {str(e)}") raise else: raise FileNotFoundError(f"配置中未找到有效路径或文件不存在: {wirecontrol_yaml_path}") database = cantools.database.load_file(adas_dbc_file) def init_bus(): can_interface = "socketcan" return can.interface.Bus(channel=wirecontrol_can_channel, interface=can_interface) def signal_handler(signal, frame): global is_exit is_exit = True def call_back_front_control(Control_DataMsg): global messages messages[0]['signals']['Steering_wheel_steering_angle'] = Control_DataMsg.wheelAngle messages[1]['signals']['Driving_torque'] = Control_DataMsg.AccPedal_per # messages[1]['signals']['Driving_torque'] = 1 print("control_DataMsg.GearCommand:",Control_DataMsg.GearCommand) if(Control_DataMsg.GearCommand == 0):# Neutral messages[1]['signals']['Gear_request'] = 0 if(Control_DataMsg.GearCommand == 1):# Drive messages[1]['signals']['Gear_request'] = 1 if(Control_DataMsg.GearCommand == -1):# Reverse messages[1]['signals']['Gear_request'] = 2 if(Control_DataMsg.GearCommand == -2):# Park messages[1]['signals']['Gear_request'] = 0 messages[2]['signals']['Target_deceleration'] = Control_DataMsg.BrakeDecel if(Control_DataMsg.light_right): messages[3]['signals']['cornering_lamp'] = 2 elif(Control_DataMsg.light_left): messages[3]['signals']['cornering_lamp'] = 1 else: messages[3]['signals']['cornering_lamp'] = 0 messages[3]['signals']['High_and_low_beam_lamp'] = Control_DataMsg.light_near #发送can信号 def send_can_messages(messages): # 发送多个CAN消息 for message in messages: # 创建CAN消息对象 msg = database.get_message_by_name(message['message_name']) if(message['message_id'] == 0x0C01A7A5): print("Steer:\t", message['signals']['Steering_wheel_steering_angle']) if(message['message_id'] == 0x0C02A7A5): print("Torque:\t", message['signals']['Driving_torque']) print("Gear:\t", message['signals']['Gear_request']) if(message['message_id'] == 0x0C03A7A5): print("Break:\t", message['signals']['Target_deceleration']) print('----------------------------') data = msg.encode(message['signals']) # print(data) data1 = list(data) # data = bytes(message['signals']) # print(data1) can_message = can.Message(arbitration_id=msg.frame_id, data=data1, is_extended_id=True) # 发送CAN消息 bus.send(can_message) def main(): global is_exit global bus signal.signal(signal.SIGINT, signal_handler) rospy.init_node('wirecontrol_pub', anonymous=False) rospy.Subscriber('/To_WireContorl_publisher', Control, call_back_front_control) pub_control = rospy.Publisher('Controlrviz_msg', Control, queue_size=10) ControlMsg = Control() init_config() bus = init_bus() while not is_exit: send_can_messages(messages) pub_control.publish(ControlMsg) time.sleep(0.01) # 关闭CAN总线 bus.shutdown() print('\nall threads were ended by Ctrl-C') if __name__ == '__main__': main()