删除 catkin_truck /wirecontrol_pub.py

This commit is contained in:
G
2026-02-24 08:23:53 +08:00
parent 465123e5e6
commit e4d9c9adc8

View File

@@ -1,207 +0,0 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# dbc文件调用
import time
import can
import cantools
import rospy
from msg_gen.msg import Control
import signal
import os
import yaml
is_exit = False
wirecontrol_can_channel = None
bus = None
messages = [
{
'message_id': 0x0C01A7A5,
'message_name': 'New_ZT_A7A5_zhuanxiang',
'signals': {
'Steering_wheel_steering_angle': 100,
'Steering_wheel_angular_speed': 350,
'Steering_superimposed_torque': 17.8,
'Reserve_2': 0,
'Reserve_1': 0,
'Operating_mode_command': 1,
'life': 0,
'ECU_status': 1,
'Corner_state':1
}
},
{
'message_id': 0x0C02A7A5,
'message_name': 'New_ZT_A7A5_qudong',
'signals': {
'Accelerator_anti_false_stepping': 0,
'Braking_torque': -5000,
'Decision_system_failure': 0,
'Driving_torque': 0,
'Gear_request': 2,
'life_2': 0
}
},
{
'message_id': 0x0C03A7A5,
'message_name': 'New_ZT_A7A5_zhidong',
'signals': {
'Target_deceleration': -1/2048,
'Reserve_3': 0,
'Parking': 1,
'life3': 0,
'control_model': 16
}
},
{
'message_id': 0x0C04A7A5,
'message_name': 'New_ZT_A7A5_dengguang',
'signals': {
'Wiper_control': 0,
'Horn_control': 0,
'Mittertor': 0,
'front_door': 0,
'backup_ight': 0,
'Hazard_warning_lamp': 0,
'High_and_low_beam_lamp': 0,
'cornering_lamp': 0,
'Driving_mode': 1,
'Self_inspection': 1,
'Reserve_8': 0,
'Reserve_7': 0,
'Reserve_6': 0,
'Reserve_5': 0,
'Reserve_4': 0
}
}
]
#加载dbc文件
database = None
def init_config():
global wirecontrol_can_channel
global database
script_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.abspath(os.path.join(script_dir, '../../..'))
all_path_list_yaml = os.path.join(project_root, 'ConfigFile', 'yaml', 'AllPathList.yaml')
with open(all_path_list_yaml, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
print("找到AllPathList.yaml文件")
adas_dbc_file = config.get('adas_dbc', '')
wirecontrol_yaml_path = config.get('wirecontrol_yaml', '')
if adas_dbc_file and os.path.exists(adas_dbc_file):
print(f"成功读取DBC文件绝对路径: {adas_dbc_file}")
else:
raise FileNotFoundError(f"配置中未找到有效路径或文件不存在: {adas_dbc_file}")
if wirecontrol_yaml_path and os.path.exists(wirecontrol_yaml_path):
print(f"成功读取YAML文件绝对路径: {wirecontrol_yaml_path}")
try:
with open(wirecontrol_yaml_path, 'r', encoding='utf-8') as f:
wc = yaml.safe_load(f)
if wc is not None and 'wirecontrol_can_channel' in wc:
wirecontrol_can_channel = wc['wirecontrol_can_channel']
print(f"从配置文件读取的CAN通道: {wirecontrol_can_channel}")
else:
print("配置文件中未找到wirecontrol_can_channel参数")
raise KeyError('wirecontrol_can_channel')
except Exception as e:
print(f"读取配置文件时发生错误: {str(e)}")
raise
else:
raise FileNotFoundError(f"配置中未找到有效路径或文件不存在: {wirecontrol_yaml_path}")
database = cantools.database.load_file(adas_dbc_file)
def init_bus():
can_interface = "socketcan"
return can.interface.Bus(channel=wirecontrol_can_channel, interface=can_interface)
def signal_handler(signal, frame):
global is_exit
is_exit = True
def call_back_front_control(Control_DataMsg):
global messages
messages[0]['signals']['Steering_wheel_steering_angle'] = Control_DataMsg.wheelAngle
messages[1]['signals']['Driving_torque'] = Control_DataMsg.AccPedal_per
# messages[1]['signals']['Driving_torque'] = 1
print("control_DataMsg.GearCommand:",Control_DataMsg.GearCommand)
if(Control_DataMsg.GearCommand == 0):# Neutral
messages[1]['signals']['Gear_request'] = 0
if(Control_DataMsg.GearCommand == 1):# Drive
messages[1]['signals']['Gear_request'] = 1
if(Control_DataMsg.GearCommand == -1):# Reverse
messages[1]['signals']['Gear_request'] = 2
if(Control_DataMsg.GearCommand == -2):# Park
messages[1]['signals']['Gear_request'] = 0
messages[2]['signals']['Target_deceleration'] = Control_DataMsg.BrakeDecel
if(Control_DataMsg.light_right):
messages[3]['signals']['cornering_lamp'] = 2
elif(Control_DataMsg.light_left):
messages[3]['signals']['cornering_lamp'] = 1
else:
messages[3]['signals']['cornering_lamp'] = 0
messages[3]['signals']['High_and_low_beam_lamp'] = Control_DataMsg.light_near
#发送can信号
def send_can_messages(messages):
# 发送多个CAN消息
for message in messages:
# 创建CAN消息对象
msg = database.get_message_by_name(message['message_name'])
if(message['message_id'] == 0x0C01A7A5):
print("Steer:\t", message['signals']['Steering_wheel_steering_angle'])
if(message['message_id'] == 0x0C02A7A5):
print("Torque:\t", message['signals']['Driving_torque'])
print("Gear:\t", message['signals']['Gear_request'])
if(message['message_id'] == 0x0C03A7A5):
print("Break:\t", message['signals']['Target_deceleration'])
print('----------------------------')
data = msg.encode(message['signals'])
# print(data)
data1 = list(data)
# data = bytes(message['signals'])
# print(data1)
can_message = can.Message(arbitration_id=msg.frame_id, data=data1, is_extended_id=True)
# 发送CAN消息
bus.send(can_message)
def main():
global is_exit
global bus
signal.signal(signal.SIGINT, signal_handler)
rospy.init_node('wirecontrol_pub', anonymous=False)
rospy.Subscriber('/To_WireContorl_publisher', Control, call_back_front_control)
pub_control = rospy.Publisher('Controlrviz_msg', Control, queue_size=10)
ControlMsg = Control()
init_config()
bus = init_bus()
while not is_exit:
send_can_messages(messages)
pub_control.publish(ControlMsg)
time.sleep(0.01)
# 关闭CAN总线
bus.shutdown()
print('\nall threads were ended by Ctrl-C')
if __name__ == '__main__':
main()