diff --git a/catkin_truck /wirecontrol_pub.py b/catkin_truck /wirecontrol_pub.py deleted file mode 100644 index ea179b3..0000000 --- a/catkin_truck /wirecontrol_pub.py +++ /dev/null @@ -1,207 +0,0 @@ -#!/usr/bin/python3 -# -*- coding: utf-8 -*- - -# dbc文件调用 -import time -import can -import cantools -import rospy -from msg_gen.msg import Control -import signal -import os -import yaml - - - -is_exit = False - -wirecontrol_can_channel = None -bus = None - -messages = [ - { - 'message_id': 0x0C01A7A5, - 'message_name': 'New_ZT_A7A5_zhuanxiang', - 'signals': { - 'Steering_wheel_steering_angle': 100, - 'Steering_wheel_angular_speed': 350, - 'Steering_superimposed_torque': 17.8, - 'Reserve_2': 0, - 'Reserve_1': 0, - 'Operating_mode_command': 1, - 'life': 0, - 'ECU_status': 1, - 'Corner_state':1 - } - }, - { - 'message_id': 0x0C02A7A5, - 'message_name': 'New_ZT_A7A5_qudong', - 'signals': { - 'Accelerator_anti_false_stepping': 0, - 'Braking_torque': -5000, - 'Decision_system_failure': 0, - 'Driving_torque': 0, - 'Gear_request': 2, - 'life_2': 0 - } - }, - { - 'message_id': 0x0C03A7A5, - 'message_name': 'New_ZT_A7A5_zhidong', - 'signals': { - 'Target_deceleration': -1/2048, - 'Reserve_3': 0, - 'Parking': 1, - 'life3': 0, - 'control_model': 16 - } - }, - { - 'message_id': 0x0C04A7A5, - 'message_name': 'New_ZT_A7A5_dengguang', - 'signals': { - 'Wiper_control': 0, - 'Horn_control': 0, - 'Mittertor': 0, - 'front_door': 0, - 'backup_ight': 0, - 'Hazard_warning_lamp': 0, - 'High_and_low_beam_lamp': 0, - 'cornering_lamp': 0, - 'Driving_mode': 1, - 'Self_inspection': 1, - 'Reserve_8': 0, - 'Reserve_7': 0, - 'Reserve_6': 0, - 'Reserve_5': 0, - 'Reserve_4': 0 - } - } - -] -#加载dbc文件 -database = None - -def init_config(): - global wirecontrol_can_channel - global database - - script_dir = os.path.dirname(os.path.abspath(__file__)) - project_root = os.path.abspath(os.path.join(script_dir, '../../..')) - all_path_list_yaml = os.path.join(project_root, 'ConfigFile', 'yaml', 'AllPathList.yaml') - - with open(all_path_list_yaml, 'r', encoding='utf-8') as f: - config = yaml.safe_load(f) - print("找到AllPathList.yaml文件") - - adas_dbc_file = config.get('adas_dbc', '') - wirecontrol_yaml_path = config.get('wirecontrol_yaml', '') - - if adas_dbc_file and os.path.exists(adas_dbc_file): - print(f"成功读取DBC文件绝对路径: {adas_dbc_file}") - else: - raise FileNotFoundError(f"配置中未找到有效路径或文件不存在: {adas_dbc_file}") - - if wirecontrol_yaml_path and os.path.exists(wirecontrol_yaml_path): - print(f"成功读取YAML文件绝对路径: {wirecontrol_yaml_path}") - try: - with open(wirecontrol_yaml_path, 'r', encoding='utf-8') as f: - wc = yaml.safe_load(f) - if wc is not None and 'wirecontrol_can_channel' in wc: - wirecontrol_can_channel = wc['wirecontrol_can_channel'] - print(f"从配置文件读取的CAN通道: {wirecontrol_can_channel}") - else: - print("配置文件中未找到wirecontrol_can_channel参数") - raise KeyError('wirecontrol_can_channel') - except Exception as e: - print(f"读取配置文件时发生错误: {str(e)}") - raise - else: - raise FileNotFoundError(f"配置中未找到有效路径或文件不存在: {wirecontrol_yaml_path}") - - database = cantools.database.load_file(adas_dbc_file) - -def init_bus(): - can_interface = "socketcan" - return can.interface.Bus(channel=wirecontrol_can_channel, interface=can_interface) - -def signal_handler(signal, frame): - global is_exit - is_exit = True - -def call_back_front_control(Control_DataMsg): - global messages - messages[0]['signals']['Steering_wheel_steering_angle'] = Control_DataMsg.wheelAngle - messages[1]['signals']['Driving_torque'] = Control_DataMsg.AccPedal_per - # messages[1]['signals']['Driving_torque'] = 1 - print("control_DataMsg.GearCommand:",Control_DataMsg.GearCommand) - if(Control_DataMsg.GearCommand == 0):# Neutral - messages[1]['signals']['Gear_request'] = 0 - if(Control_DataMsg.GearCommand == 1):# Drive - messages[1]['signals']['Gear_request'] = 1 - if(Control_DataMsg.GearCommand == -1):# Reverse - messages[1]['signals']['Gear_request'] = 2 - if(Control_DataMsg.GearCommand == -2):# Park - messages[1]['signals']['Gear_request'] = 0 - - messages[2]['signals']['Target_deceleration'] = Control_DataMsg.BrakeDecel - - if(Control_DataMsg.light_right): - messages[3]['signals']['cornering_lamp'] = 2 - elif(Control_DataMsg.light_left): - messages[3]['signals']['cornering_lamp'] = 1 - else: - messages[3]['signals']['cornering_lamp'] = 0 - - messages[3]['signals']['High_and_low_beam_lamp'] = Control_DataMsg.light_near - - - -#发送can信号 -def send_can_messages(messages): - # 发送多个CAN消息 - for message in messages: - # 创建CAN消息对象 - msg = database.get_message_by_name(message['message_name']) - if(message['message_id'] == 0x0C01A7A5): - print("Steer:\t", message['signals']['Steering_wheel_steering_angle']) - if(message['message_id'] == 0x0C02A7A5): - print("Torque:\t", message['signals']['Driving_torque']) - print("Gear:\t", message['signals']['Gear_request']) - if(message['message_id'] == 0x0C03A7A5): - print("Break:\t", message['signals']['Target_deceleration']) - print('----------------------------') - data = msg.encode(message['signals']) - # print(data) - data1 = list(data) - # data = bytes(message['signals']) - # print(data1) - can_message = can.Message(arbitration_id=msg.frame_id, data=data1, is_extended_id=True) - - # 发送CAN消息 - bus.send(can_message) - - - -def main(): - global is_exit - global bus - signal.signal(signal.SIGINT, signal_handler) - rospy.init_node('wirecontrol_pub', anonymous=False) - rospy.Subscriber('/To_WireContorl_publisher', Control, call_back_front_control) - pub_control = rospy.Publisher('Controlrviz_msg', Control, queue_size=10) - ControlMsg = Control() - init_config() - bus = init_bus() - while not is_exit: - send_can_messages(messages) - pub_control.publish(ControlMsg) - time.sleep(0.01) - # 关闭CAN总线 - bus.shutdown() - print('\nall threads were ended by Ctrl-C') - - -if __name__ == '__main__': - main()