ROS1从入门到精通 6: 参数服务器与动态参数(配置管理最佳实践)
【ROS1从入门到精通】第6篇:参数服务器与动态参数(配置管理最佳实践)
🎯 本文目标:深入理解ROS参数服务器机制,掌握参数的设置、获取和管理方法,学会使用动态参数进行运行时配置,能够设计和实现灵活的配置管理系统。
📑 目录
- 参数服务器概述
- 参数基本操作
- 参数文件管理
- 动态参数配置
- 参数命名空间
- Launch文件参数配置
- 参数服务器进阶
- 配置管理最佳实践
- 实战案例:智能配置管理系统
- 总结与展望
1. 参数服务器概述
1.1 什么是参数服务器?
参数服务器(Parameter Server)是ROS Master提供的共享字典服务,用于存储和管理节点间共享的配置参数。它采用键值对(key-value)的方式存储数据,支持多种数据类型。
1.2 参数服务器特点
| 特性 | 说明 |
|---|---|
| 全局共享 | 所有节点都可以访问 |
| 持久存储 | 参数在Master运行期间持续存在 |
| 多种类型 | 支持整数、浮点、字符串、列表、字典等 |
| 层次结构 | 支持命名空间和嵌套结构 |
| 动态更新 | 可以在运行时修改参数 |
| 默认值支持 | 获取参数时可指定默认值 |
1.3 参数类型
ROS参数服务器支持以下数据类型:
# 基本类型
int_param = 42 # 整数
float_param = 3.14 # 浮点数
string_param = "hello" # 字符串
bool_param = True # 布尔值
# 复合类型
list_param = [1, 2, 3] # 列表
dict_param = {'a': 1, 'b': 2} # 字典
# 嵌套结构
nested_param = {
'sensor': {
'camera': {'resolution': [640, 480], 'fps': 30},
'lidar': {'range': 30.0, 'angle': 360}
}
}
2. 参数基本操作
2.1 C++参数操作
// parameter_operations.cpp
#include
#include
#include
#include
class ParameterManager {
public:
ParameterManager() : nh_("~") {
// 获取参数的各种方式
demonstrateParameterOperations();
}
void demonstrateParameterOperations() {
// 1. 基本参数获取
getBasicParameters();
// 2. 带默认值的参数获取
getParametersWithDefaults();
// 3. 必须参数获取
getRequiredParameters();
// 4. 设置参数
setParameters();
// 5. 删除参数
deleteParameters();
// 6. 参数存在性检查
checkParameterExistence();
// 7. 获取所有参数
getAllParameters();
// 8. 参数搜索
searchParameters();
// 9. 复杂数据结构
handleComplexParameters();
}
private:
void getBasicParameters() {
ROS_INFO("=== Basic Parameter Operations ===");
// 获取不同类型的参数
int int_param;
double double_param;
std::string string_param;
bool bool_param;
// 从全局参数服务器获取
ros::param::get("/global_int", int_param);
ros::param::get("/global_double", double_param);
ros::param::get("/global_string", string_param);
ros::param::get("/global_bool", bool_param);
// 从私有命名空间获取
nh_.getParam("private_int", int_param);
nh_.getParam("private_double", double_param);
nh_.getParam("private_string", string_param);
nh_.getParam("private_bool", bool_param);
ROS_INFO("Int: %d, Double: %.2f, String: %s, Bool: %s",
int_param, double_param, string_param.c_str(),
bool_param ? "true" : "false");
}
void getParametersWithDefaults() {
ROS_INFO("=== Parameters with Defaults ===");
// 方法1: param函数(推荐)
int max_retries;
double timeout;
std::string robot_name;
nh_.param("max_retries", max_retries, 3);
nh_.param("timeout", timeout, 5.0);
nh_.param<std::string>("robot_name", robot_name, "default_robot");
// 方法2: getParam with fallback
if (!nh_.getParam("optional_param", max_retries)) {
max_retries = 10; // 默认值
}
// 方法3: ros::param::param
ros::param::param<double>("/global_timeout", timeout, 10.0);
ROS_INFO("Max retries: %d, Timeout: %.1f, Robot: %s",
max_retries, timeout, robot_name.c_str());
}
void getRequiredParameters() {
ROS_INFO("=== Required Parameters ===");
// 必须参数(如果不存在则报错)
std::string config_file;
if (!nh_.getParam("config_file", config_file)) {
ROS_FATAL("Required parameter 'config_file' not found!");
ros::shutdown();
return;
}
// 使用断言检查
double critical_threshold;
ROS_ASSERT(nh_.getParam("critical_threshold", critical_threshold));
ROS_INFO("Config file: %s, Threshold: %.2f",
config_file.c_str(), critical_threshold);
}
void setParameters() {
ROS_INFO("=== Setting Parameters ===");
// 设置不同类型的参数
nh_.setParam("my_int", 42);
nh_.setParam("my_double", 3.14);
nh_.setParam("my_string", "hello");
nh_.setParam("my_bool", true);
// 设置全局参数
ros::param::set("/global_param", "global_value");
// 设置嵌套参数
nh_.setParam("robot/speed", 1.0);
nh_.setParam("robot/name", "robot_01");
// 设置向量参数
std::vector<double> gains = {1.0, 0.1, 0.01};
nh_.setParam("pid_gains", gains);
// 设置映射参数
std::map<std::string, double> sensors;
sensors["temperature"] = 25.5;
sensors["pressure"] = 101.3;
// 注意:C++ API不直接支持map,需要逐个设置
nh_.setParam("sensors/temperature", sensors["temperature"]);
nh_.setParam("sensors/pressure", sensors["pressure"]);
ROS_INFO("Parameters set successfully");
}
void deleteParameters() {
ROS_INFO("=== Deleting Parameters ===");
// 删除参数
if (ros::param::has("/temp_param")) {
ros::param::del("/temp_param");
ROS_INFO("Deleted /temp_param");
}
// 删除命名空间下的所有参数
if (nh_.hasParam("temp_namespace")) {
nh_.deleteParam("temp_namespace");
ROS_INFO("Deleted temp_namespace");
}
}
void checkParameterExistence() {
ROS_INFO("=== Checking Parameter Existence ===");
// 检查参数是否存在
if (ros::param::has("/some_param")) {
ROS_INFO("Parameter /some_param exists");
} else {
ROS_INFO("Parameter /some_param does not exist");
}
// 使用NodeHandle检查
if (nh_.hasParam("local_param")) {
ROS_INFO("Local parameter exists");
}
}
void getAllParameters() {
ROS_INFO("=== Getting All Parameters ===");
// 获取所有参数名称
std::vector<std::string> param_names;
ros::param::getParamNames(param_names);
ROS_INFO("Total parameters: %zu", param_names.size());
// 打印前10个参数
for (size_t i = 0; i < std::min(size_t(10), param_names.size()); ++i) {
ROS_INFO(" %s", param_names[i].c_str());
}
}
void searchParameters() {
ROS_INFO("=== Searching Parameters ===");
// 搜索参数(从当前命名空间向上查找)
std::string param_value;
if (ros::param::search("robot_name", param_value)) {
ROS_INFO("Found robot_name at: %s", param_value.c_str());
}
// 使用searchParam
std::string param_key;
if (nh_.searchParam("config_file", param_key)) {
std::string value;
nh_.getParam(param_key, value);
ROS_INFO("Config file found at %s: %s",
param_key.c_str(), value.c_str());
}
}
void handleComplexParameters() {
ROS_INFO("=== Complex Parameters ===");
// 处理向量参数
std::vector<double> vector_param;
if (nh_.getParam("vector_param", vector_param)) {
ROS_INFO("Vector size: %zu", vector_param.size());
for (size_t i = 0; i < vector_param.size(); ++i) {
ROS_INFO(" [%zu]: %.2f", i, vector_param[i]);
}
}
// 处理嵌套参数
XmlRpc::XmlRpcValue complex_param;
if (nh_.getParam("complex_param", complex_param)) {
if (complex_param.getType() == XmlRpc::XmlRpcValue::TypeStruct) {
for (auto it = complex_param.begin();
it != complex_param.end(); ++it) {
ROS_INFO(" %s: %s", it->first.c_str(),
std::string(it->second).c_str());
}
}
}
// 处理YAML配置
handleYAMLConfig();
}
void handleYAMLConfig() {
// 处理YAML格式的配置
XmlRpc::XmlRpcValue robot_config;
if (nh_.getParam("robot_config", robot_config)) {
// 检查类型
if (robot_config.getType() == XmlRpc::XmlRpcValue::TypeStruct) {
// 获取嵌套参数
if (robot_config.hasMember("sensors")) {
XmlRpc::XmlRpcValue sensors = robot_config["sensors"];
if (sensors.hasMember("camera")) {
XmlRpc::XmlRpcValue camera = sensors["camera"];
if (camera.hasMember("resolution")) {
XmlRpc::XmlRpcValue resolution = camera["resolution"];
if (resolution.getType() == XmlRpc::XmlRpcValue::TypeArray) {
int width = static_cast<int>(resolution[0]);
int height = static_cast<int>(resolution[1]);
ROS_INFO("Camera resolution: %dx%d", width, height);
}
}
}
}
}
}
}
ros::NodeHandle nh_;
};
// 参数监听器
class ParameterListener {
public:
ParameterListener() : nh_("~") {
// 创建定时器周期性检查参数变化
param_check_timer_ = nh_.createTimer(
ros::Duration(1.0),
&ParameterListener::checkParameterChanges, this);
// 初始化参数值
nh_.param("monitored_param", last_value_, 0.0);
ROS_INFO("Parameter listener started");
}
private:
void checkParameterChanges(const ros::TimerEvent& event) {
double current_value;
nh_.param("monitored_param", current_value, 0.0);
if (std::abs(current_value - last_value_) > 0.001) {
ROS_INFO("Parameter changed: %.2f -> %.2f",
last_value_, current_value);
onParameterChanged(current_value);
last_value_ = current_value;
}
}
void onParameterChanged(double new_value) {
// 响应参数变化
ROS_INFO("Responding to parameter change: %.2f", new_value);
// 执行相应的操作
}
ros::NodeHandle nh_;
ros::Timer param_check_timer_;
double last_value_;
};
int main(int argc, char** argv) {
ros::init(argc, argv, "parameter_demo");
ParameterManager manager;
ParameterListener listener;
ros::spin();
return 0;
}
2.2 Python参数操作
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import rospy
import yaml
import json
from threading import Lock
class ParameterManager:
"""参数管理器类"""
def __init__(self):
rospy.init_node('parameter_manager')
# 演示所有参数操作
self.demonstrate_parameter_operations()
# 启动参数监听器
self.parameter_listener = ParameterListener()
def demonstrate_parameter_operations(self):
"""演示参数操作"""
self.get_basic_parameters()
self.get_parameters_with_defaults()
self.get_required_parameters()
self.set_parameters()
self.delete_parameters()
self.check_parameter_existence()
self.get_all_parameters()
self.search_parameters()
self.handle_complex_parameters()
self.handle_yaml_config()
def get_basic_parameters(self):
"""基本参数获取"""
rospy.loginfo("=== Basic Parameter Operations ===")
# 获取不同类型的参数
try:
# 从全局参数服务器获取
global_int = rospy.get_param('/global_int')
global_float = rospy.get_param('/global_float')
global_string = rospy.get_param('/global_string')
global_bool = rospy.get_param('/global_bool')
# 从私有命名空间获取
private_int = rospy.get_param('~private_int')
private_float = rospy.get_param('~private_float')
private_string = rospy.get_param('~private_string')
private_bool = rospy.get_param('~private_bool')
rospy.loginfo(f"Global - Int: {global_int}, Float: {global_float}, "
f"String: {global_string}, Bool: {global_bool}")
rospy.loginfo(f"Private - Int: {private_int}, Float: {private_float}, "
f"String: {private_string}, Bool: {private_bool}")
except KeyError as e:
rospy.logwarn(f"Parameter not found: {e}")
def get_parameters_with_defaults(self):
"""带默认值的参数获取"""
rospy.loginfo("=== Parameters with Defaults ===")
# 方法1: 使用get_param的default参数
max_retries = rospy.get_param('~max_retries', 3)
timeout = rospy.get_param('~timeout', 5.0)
robot_name = rospy.get_param('~robot_name', 'default_robot')
# 方法2: 使用try-except
try:
optional_param = rospy.get_param('~optional_param')
except KeyError:
optional_param = 'default_value'
# 方法3: 使用条件判断
if rospy.has_param('~another_param'):
another_param = rospy.get_param('~another_param')
else:
another_param = 100
rospy.loginfo(f"Max retries: {max_retries}, Timeout: {timeout}, "
f"Robot: {robot_name}")
rospy.loginfo(f"Optional: {optional_param}, Another: {another_param}")
def get_required_parameters(self):
"""获取必需参数"""
rospy.loginfo("=== Required Parameters ===")
# 必需参数(如果不存在则报错)
try:
config_file = rospy.get_param('~config_file')
critical_threshold = rospy.get_param('~critical_threshold')
rospy.loginfo(f"Config file: {config_file}, "
f"Threshold: {critical_threshold}")
except KeyError as e:
rospy.logfatal(f"Required parameter not found: {e}")
rospy.signal_shutdown("Missing required parameter")
def set_parameters(self):
"""设置参数"""
rospy.loginfo("=== Setting Parameters ===")
# 设置不同类型的参数
rospy.set_param('~my_int', 42)
rospy.set_param('~my_float', 3.14)
rospy.set_param('~my_string', 'hello')
rospy.set_param('~my_bool', True)
# 设置列表参数
rospy.set_param('~my_list', [1, 2, 3, 4, 5])
# 设置字典参数
rospy.set_param('~my_dict', {
'key1': 'value1',
'key2': 123,
'key3': True
})
# 设置嵌套参数
rospy.set_param('~robot', {
'name': 'robot_01',
'speed': 1.0,
'sensors': {
'camera': {
'resolution': [640, 480],
'fps': 30
},
'lidar': {
'range': 30.0,
'angle': 360
}
}
})
# 设置全局参数
rospy.set_param('/global_config', {
'version': '1.0',
'debug': True
})
rospy.loginfo("Parameters set successfully")
def delete_parameters(self):
"""删除参数"""
rospy.loginfo("=== Deleting Parameters ===")
# 创建临时参数
rospy.set_param('~temp_param', 'will_be_deleted')
# 删除参数
if rospy.has_param('~temp_param'):
rospy.delete_param('~temp_param')
rospy.loginfo("Deleted temp_param")
# 删除不存在的参数(不会报错)
try:
rospy.delete_param('~non_existent_param')
except KeyError:
rospy.loginfo("Parameter does not exist")
def check_parameter_existence(self):
"""检查参数存在性"""
rospy.loginfo("=== Checking Parameter Existence ===")
# 检查参数是否存在
params_to_check = [
'~my_int',
'~non_existent',
'/global_config',
'/rosversion'
]
for param in params_to_check:
if rospy.has_param(param):
rospy.loginfo(f"Parameter {param} exists")
value = rospy.get_param(param)
rospy.loginfo(f" Value: {value}")
else:
rospy.loginfo(f"Parameter {param} does not exist")
def get_all_parameters(self):
"""获取所有参数"""
rospy.loginfo("=== Getting All Parameters ===")
# 获取所有参数名称
param_names = rospy.get_param_names()
rospy.loginfo(f"Total parameters: {len(param_names)}")
# 按命名空间分组
namespaces = {}
for param in param_names:
namespace = '/'.join(param.split('/')[:-1])
if namespace not in namespaces:
namespaces[namespace] = []
namespaces[namespace].append(param)
# 打印前几个命名空间
for ns, params in list(namespaces.items())[:5]:
rospy.loginfo(f"Namespace: {ns if ns else '/'}")
for param in params[:3]:
rospy.loginfo(f" - {param}")
def search_parameters(self):
"""搜索参数"""
rospy.loginfo("=== Searching Parameters ===")
# 搜索参数(从当前命名空间向上查找)
search_keys = ['robot_name', 'config_file', 'rosversion']
for key in search_keys:
resolved_key = rospy.search_param(key)
if resolved_key:
value = rospy.get_param(resolved_key)
rospy.loginfo(f"Found '{key}' at '{resolved_key}': {value}")
else:
rospy.loginfo(f"Parameter '{key}' not found")
def handle_complex_parameters(self):
"""处理复杂参数"""
rospy.loginfo("=== Complex Parameters ===")
# 处理列表参数
list_param = rospy.get_param('~my_list', [])
rospy.loginfo(f"List parameter: {list_param}")
# 处理字典参数
dict_param = rospy.get_param('~my_dict', {})
rospy.loginfo(f"Dict parameter: {dict_param}")
# 处理嵌套结构
robot_config = rospy.get_param('~robot', {})
if robot_config:
# 访问嵌套值
robot_name = robot_config.get('name', 'unknown')
camera_res = robot_config.get('sensors', {}).get('camera', {}).get('resolution', [])
rospy.loginfo(f"Robot name: {robot_name}")
rospy.loginfo(f"Camera resolution: {camera_res}")
# 批量获取参数
self.batch_get_parameters()
def batch_get_parameters(self):
"""批量获取参数"""
# 定义参数配置
param_config = {
'max_speed': 1.0,
'min_speed': 0.1,
'acceleration': 0.5,
'deceleration': 0.8,
'control_frequency': 10.0
}
# 批量获取并使用默认值
params = {}
for key, default in param_config.items():
params[key] = rospy.get_param(f'~{key}', default)
rospy.loginfo(f"Batch parameters: {params}")
def handle_yaml_config(self):
"""处理YAML配置"""
rospy.loginfo("=== YAML Configuration ===")
# 创建YAML配置
yaml_config = """
robot:
name: my_robot
version: 1.0
specifications:
max_speed: 2.0
payload: 10.0
battery: 24.0
sensors:
- type: camera
model: RGB-D
fps: 30
- type: lidar
model: 2D
range: 30
- type: imu
model: 6DOF
rate: 100
"""
# 解析YAML
config = yaml.safe_load(yaml_config)
# 设置为参数
rospy.set_param('~yaml_config', config)
# 读取并使用
loaded_config = rospy.get_param('~yaml_config', {})
if loaded_config:
robot_name = loaded_config['robot']['name']
sensors = loaded_config['robot']['sensors']
rospy.loginfo(f"Robot: {robot_name}")
rospy.loginfo(f"Number of sensors: {len(sensors)}")
for sensor in sensors:
rospy.loginfo(f" - {sensor['type']}: {sensor['model']}")
class ParameterListener:
"""参数监听器"""
def __init__(self):
self.lock = Lock()
self.monitored_params = {}
# 要监控的参数列表
self.params_to_monitor = [
'~my_int',
'~my_float',
'~my_string'
]
# 初始化参数值
self.initialize_params()
# 创建定时器检查参数变化
self.check_timer = rospy.Timer(
rospy.Duration(1.0),
self.check_parameter_changes
)
rospy.loginfo("Parameter listener started")
def initialize_params(self):
"""初始化参数值"""
with self.lock:
for param in self.params_to_monitor:
try:
value = rospy.get_param(param)
self.monitored_params[param] = value
except KeyError:
self.monitored_params[param] = None
def check_parameter_changes(self, event):
"""检查参数变化"""
with self.lock:
for param in self.params_to_monitor:
try:
current_value = rospy.get_param(param)
if param in self.monitored_params:
if current_value != self.monitored_params[param]:
self.on_parameter_changed(
param,
self.monitored_params[param],
current_value
)
self.monitored_params[param] = current_value
else:
# 新参数
self.monitored_params[param] = current_value
rospy.loginfo(f"New parameter detected: {param} = {current_value}")
except KeyError:
if param in self.monitored_params and self.monitored_params[param] is not None:
# 参数被删除
rospy.loginfo(f"Parameter deleted: {param}")
self.monitored_params[param] = None
def on_parameter_changed(self, param_name, old_value, new_value):
"""参数变化回调"""
rospy.loginfo(f"Parameter changed: {param_name}")
rospy.loginfo(f" Old value: {old_value}")
rospy.loginfo(f" New value: {new_value}")
# 根据参数类型执行相应操作
if 'speed' in param_name:
self.update_speed_limits(new_value)
elif 'config' in param_name:
self.reload_configuration(new_value)
def update_speed_limits(self, new_speed):
"""更新速度限制"""
rospy.loginfo(f"Updating speed limits to: {new_speed}")
# 实现速度限制更新逻辑
def reload_configuration(self, new_config):
"""重新加载配置"""
rospy.loginfo("Reloading configuration...")
# 实现配置重载逻辑
def main():
try:
manager = ParameterManager()
rospy.spin()
except rospy.ROSInterruptException:
pass
if __name__ == '__main__':
main()
3. 参数文件管理
3.1 YAML参数文件
# config/robot_params.yaml
# 机器人基本信息
robot_info:
name: "my_robot"
model: "MR-100"
version: "2.1.0"
serial_number: "MR100-2024-001"
# 运动参数
motion:
max_linear_velocity: 2.0 # m/s
max_angular_velocity: 3.14 # rad/s
acceleration_limit: 1.0 # m/s^2
deceleration_limit: 2.0 # m/s^2
# 轮子参数
wheels:
diameter: 0.1 # m
distance: 0.5 # m
encoder_resolution: 1024 # ticks/revolution
# 传感器配置
sensors:
# 相机配置
camera:
enabled: true
device: "/dev/video0"
resolution: [640, 480]
fps: 30
calibration:
fx: 554.254691191187
fy: 554.254691191187
cx: 320.5
cy: 240.5
distortion: [0.0, 0.0, 0.0, 0.0, 0.0]
# 激光雷达配置
lidar:
enabled: true
port: "/dev/ttyUSB0"
baud_rate: 115200
range_min: 0.15
range_max: 12.0
angle_min: -3.14159
angle_max: 3.14159
angle_increment: 0.00872665 # 0.5 degree
# IMU配置
imu:
enabled: true
port: "/dev/ttyUSB1"
baud_rate: 9600
calibration:
accel_offset: [0.01, -0.02, 0.03]
gyro_offset: [0.001, -0.002, 0.0]
# 控制器参数
controllers:
# PID控制器
pid:
linear:
kp: 1.0
ki: 0.1
kd: 0.05
min_output: -1.0
max_output: 1.0
angular:
kp: 2.0
ki: 0.2
kd: 0.1
min_output: -3.14
max_output: 3.14
# 轨迹跟踪
trajectory:
lookahead_distance: 0.5
goal_tolerance: 0.1
angle_tolerance: 0.05
# 导航参数
navigation:
# 全局规划器
global_planner:
algorithm: "astar" # astar, dijkstra, rrt
resolution: 0.05
timeout: 5.0
# 局部规划器
local_planner:
algorithm: "dwa" # dwa, teb, mpc
sim_time: 2.0
vx_samples: 10
vtheta_samples: 20
# 代价地图
costmap:
global:
width: 100.0
height: 100.0
resolution: 0.05
origin_x: -50.0
origin_y: -50.0
local:
width: 5.0
height: 5.0
resolution: 0.025
rolling_window: true
# 安全参数
safety:
emergency_stop_distance: 0.3
warning_distance: 1.0
max_temperature: 60.0
min_battery_voltage: 22.0
watchdog_timeout: 1.0
# 调试参数
debug:
verbose: false
log_level: "info" # debug, info, warn, error
save_logs: true
log_directory: "/home/robot/logs"
visualization: true
3.2 加载参数文件
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import rospy
import rosparam
import yaml
import os
class ParameterFileManager:
"""参数文件管理器"""
def __init__(self):
rospy.init_node('parameter_file_manager')
# 演示参数文件操作
self.load_parameter_file()
self.save_parameter_file()
self.merge_parameters()
self.validate_parameters()
def load_parameter_file(self):
"""加载参数文件"""
rospy.loginfo("=== Loading Parameter Files ===")
# 方法1:使用rosparam.load_file
config_file = os.path.expanduser("~/catkin_ws/src/my_package/config/robot_params.yaml")
if os.path.exists(config_file):
# 加载到私有命名空间
paramlist = rosparam.load_file(config_file, default_namespace="~")
for params, ns in paramlist:
rosparam.upload_params(ns, params)
rospy.loginfo(f"Loaded parameters from {config_file}")
# 方法2:使用yaml.load
with open(config_file, 'r') as f:
config = yaml.safe_load(f)
# 将配置设置为参数
rospy.set_param('~loaded_config', config)
# 方法3:加载特定部分
self.load_specific_section(config_file, 'sensors')
def load_specific_section(self, file_path, section):
"""加载配置文件的特定部分"""
with open(file_path, 'r') as f:
config = yaml.safe_load(f)
if section in config:
rospy.set_param(f'~{section}', config[section])
rospy.loginfo(f"Loaded section '{section}'")
def save_parameter_file(self):
"""保存参数到文件"""
rospy.loginfo("=== Saving Parameters to File ===")
# 获取当前参数
params_to_save = {
'timestamp': rospy.Time.now().to_sec(),
'node_name': rospy.get_name(),
'parameters': {}
}
# 收集要保存的参数
param_names = rospy.get_param_names()
for param in param_names:
if param.startswith(rospy.get_name()):
# 只保存当前节点的参数
key = param.replace(rospy.get_name() + '/', '')
params_to_save['parameters'][key] = rospy.get_param(param)
# 保存到文件
output_file = os.path.expanduser("~/saved_params.yaml")
with open(output_file, 'w') as f:
yaml.dump(params_to_save, f, default_flow_style=False)
rospy.loginfo(f"Parameters saved to {output_file}")
# 使用rosparam dump
rosparam.dump_params(output_file, rospy.get_name())
def merge_parameters(self):
"""合并多个参数文件"""
rospy.loginfo("=== Merging Parameter Files ===")
# 基础配置
base_config = {
'robot': {
'name': 'default_robot',
'speed': 1.0
},
'sensors': {
'camera': {'enabled': False},
'lidar': {'enabled': False}
}
}
# 覆盖配置
override_config = {
'robot': {
'speed': 2.0
},
'sensors': {
'camera': {'enabled': True, 'fps': 30}
},
'debug': True
}
# 深度合并配置
merged_config = self.deep_merge(base_config, override_config)
# 设置合并后的参数
rospy.set_param('~merged_config', merged_config)
rospy.loginfo(f"Merged configuration: {merged_config}")
def deep_merge(self, dict1, dict2):
"""深度合并两个字典"""
result = dict1.copy()
for key, value in dict2.items():
if key in result and isinstance(result[key], dict) and isinstance(value, dict):
result[key] = self.deep_merge(result[key], value)
else:
result[key] = value
return result
def validate_parameters(self):
"""验证参数"""
rospy.loginfo("=== Validating Parameters ===")
# 定义参数规则
param_rules = {
'~motion/max_linear_velocity': {
'type': float,
'min': 0.0,
'max': 5.0,
'required': True
},
'~motion/max_angular_velocity': {
'type': float,
'min': 0.0,
'max': 6.28,
'required': True
},
'~sensors/camera/fps': {
'type': int,
'min': 1,
'max': 60,
'required': False,
'default': 30
}
}
# 验证参数
for param_name, rules in param_rules.items():
self.validate_single_parameter(param_name, rules)
def validate_single_parameter(self, param_name, rules):
"""验证单个参数"""
# 检查参数是否存在
if rospy.has_param(param_name):
value = rospy.get_param(param_name)
# 类型检查
if 'type' in rules and not isinstance(value, rules['type']):
rospy.logwarn(f"Parameter {param_name} has wrong type. "
f"Expected {rules['type']}, got {type(value)}")
return False
# 范围检查
if 'min' in rules and value < rules['min']:
rospy.logwarn(f"Parameter {param_name} = {value} is below minimum {rules['min']}")
return False
if 'max' in rules and value > rules['max']:
rospy.logwarn(f"Parameter {param_name} = {value} exceeds maximum {rules['max']}")
return False
rospy.loginfo(f"Parameter {param_name} = {value} is valid")
return True
else:
# 参数不存在
if rules.get('required', False):
rospy.logerr(f"Required parameter {param_name} not found!")
return False
else:
# 使用默认值
if 'default' in rules:
rospy.set_param(param_name, rules['default'])
rospy.loginfo(f"Set {param_name} to default value {rules['default']}")
return True
4. 动态参数配置
4.1 动态配置文件
# cfg/DynamicParams.cfg
#!/usr/bin/env python
from dynamic_reconfigure.parameter_generator_catkin import *
gen = ParameterGenerator()
# 添加参数
# gen.add(name, type, level, description, default, min, max)
# 整数参数
gen.add("int_param", int_t, 0,
"An integer parameter", 50, 0, 100)
# 浮点参数
gen.add("double_param", double_t, 0,
"A double parameter", 0.5, 0.0, 1.0)
# 字符串参数
gen.add("str_param", str_t, 0,
"A string parameter", "Hello World")
# 布尔参数
gen.add("bool_param", bool_t, 0,
"A boolean parameter", True)
# 枚举参数
size_enum = gen.enum([gen.const("Small", int_t, 0, "Small size"),
gen.const("Medium", int_t, 1, "Medium size"),
gen.const("Large", int_t, 2, "Large size")],
"Size options")
gen.add("size", int_t, 0, "Select size", 1, 0, 2, edit_method=size_enum)
# 分组参数
group_motion = gen.add_group("Motion")
group_motion.add("max_speed", double_t, 0, "Maximum speed", 1.0, 0.0, 10.0)
group_motion.add("acceleration", double_t, 0, "Acceleration", 0.5, 0.0, 5.0)
group_sensor = gen.add_group("Sensors")
group_sensor.add("enable_camera", bool_t, 0, "Enable camera", True)
group_sensor.add("camera_fps", int_t, 0, "Camera FPS", 30, 1, 60)
# 生成必要的文件
exit(gen.generate("my_package", "dynamic_param_node", "DynamicParams"))
4.2 动态参数服务器
// dynamic_param_server.cpp
#include
#include
#include
class DynamicParamServer {
public:
DynamicParamServer() : nh_("~") {
// 设置动态参数回调
dynamic_reconfigure::Server<my_package::DynamicParamsConfig>::CallbackType f;
f = boost::bind(&DynamicParamServer::configCallback, this, _1, _2);
server_.setCallback(f);
ROS_INFO("Dynamic parameter server started");
}
void configCallback(my_package::DynamicParamsConfig &config, uint32_t level) {
ROS_INFO("Reconfigure Request:");
ROS_INFO(" int_param: %d", config.int_param);
ROS_INFO(" double_param: %f", config.double_param);
ROS_INFO(" str_param: %s", config.str_param.c_str());
ROS_INFO(" bool_param: %s", config.bool_param ? "true" : "false");
ROS_INFO(" size: %d", config.size);
ROS_INFO(" max_speed: %f", config.max_speed);
ROS_INFO(" Level: %d", level);
// 保存配置
current_config_ = config;
// 根据level判断哪些参数改变了
if (level & 0x1) {
updateMotionParameters(config);
}
if (level & 0x2) {
updateSensorParameters(config);
}
}
private:
void updateMotionParameters(const my_package::DynamicParamsConfig &config) {
ROS_INFO("Updating motion parameters...");
// 更新运动控制参数
max_speed_ = config.max_speed;
acceleration_ = config.acceleration;
}
void updateSensorParameters(const my_package::DynamicParamsConfig &config) {
ROS_INFO("Updating sensor parameters...");
// 更新传感器参数
camera_enabled_ = config.enable_camera;
camera_fps_ = config.camera_fps;
}
ros::NodeHandle nh_;
dynamic_reconfigure::Server<my_package::DynamicParamsConfig> server_;
my_package::DynamicParamsConfig current_config_;
double max_speed_;
double acceleration_;
bool camera_enabled_;
int camera_fps_;
};
4.3 Python动态参数
#!/usr/bin/env python
import rospy
from dynamic_reconfigure.server import Server
from my_package.cfg import DynamicParamsConfig
class DynamicParameterServer:
"""动态参数服务器"""
def __init__(self):
rospy.init_node('dynamic_parameter_server')
# 当前配置
self.current_config = None
# 创建动态参数服务器
self.server = Server(DynamicParamsConfig, self.reconfigure_callback)
rospy.loginfo("Dynamic parameter server started")
def reconfigure_callback(self, config, level):
"""动态参数回调"""
rospy.loginfo("Reconfigure Request:")
rospy.loginfo(f" int_param: {config.int_param}")
rospy.loginfo(f" double_param: {config.double_param}")
rospy.loginfo(f" str_param: {config.str_param}")
rospy.loginfo(f" bool_param: {config.bool_param}")
rospy.loginfo(f" Level: {level}")
# 检查哪些参数改变了
if self.current_config:
self.check_parameter_changes(self.current_config, config)
# 保存当前配置
self.current_config = config
# 应用参数
self.apply_parameters(config, level)
return config
def check_parameter_changes(self, old_config, new_config):
"""检查参数变化"""
for key in dir(new_config):
if not key.startswith('_'):
old_value = getattr(old_config, key)
new_value = getattr(new_config, key)
if old_value != new_value:
rospy.loginfo(f"Parameter '{key}' changed: {old_value} -> {new_value}")
def apply_parameters(self, config, level):
"""应用参数"""
# 根据level判断参数类型
if level & 0x1: # 运动参数
self.update_motion_parameters(config)
if level & 0x2: # 传感器参数
self.update_sensor_parameters(config)
def update_motion_parameters(self, config):
"""更新运动参数"""
rospy.loginfo("Updating motion parameters...")
# 实现运动参数更新逻辑
def update_sensor_parameters(self, config):
"""更新传感器参数"""
rospy.loginfo("Updating sensor parameters...")
# 实现传感器参数更新逻辑
def main():
server = DynamicParameterServer()
rospy.spin()
if __name__ == '__main__':
main()
5. 参数命名空间
5.1 命名空间管理
#!/usr/bin/env python
import rospy
class NamespaceManager:
"""命名空间管理器"""
def __init__(self):
rospy.init_node('namespace_manager')
# 演示命名空间操作
self.demonstrate_namespaces()
def demonstrate_namespaces(self):
"""演示命名空间"""
# 1. 全局命名空间
rospy.set_param('/global_param', 'global_value')
# 2. 相对命名空间
rospy.set_param('relative_param', 'relative_value')
# 3. 私有命名空间
rospy.set_param('~private_param', 'private_value')
# 4. 嵌套命名空间
rospy.set_param('~robot/sensors/camera/enabled', True)
rospy.set_param('~robot/sensors/lidar/enabled', True)
# 5. 多机器人命名空间
for i in range(3):
robot_ns = f'/robot_{i}'
rospy.set_param(f'{robot_ns}/id', f'robot_{i}')
rospy.set_param(f'{robot_ns}/position/x', i * 2.0)
rospy.set_param(f'{robot_ns}/position/y', 0.0)
rospy.set_param(f'{robot_ns}/status', 'idle')
# 打印命名空间结构
self.print_namespace_tree()
def print_namespace_tree(self):
"""打印命名空间树"""
rospy.loginfo("=== Parameter Namespace Tree ===")
# 获取所有参数
params = rospy.get_param_names()
# 构建树结构
tree = {}
for param in params:
parts = param.split('/')
current = tree
for part in parts[:-1]:
if part not in current:
current[part] = {}
current = current[part]
# 添加叶子节点
if parts[-1]:
current[parts[-1]] = rospy.get_param(param)
# 打印树
self.print_tree(tree, indent=0)
def print_tree(self, tree, indent=0):
"""递归打印树"""
for key, value in tree.items():
if isinstance(value, dict):
rospy.loginfo(" " * indent + f"/{key}")
self.print_tree(value, indent + 1)
else:
rospy.loginfo(" " * indent + f"{key}: {value}")
6. Launch文件参数配置
6.1 Launch文件参数示例
<launch>
<param name="robot_name" value="my_robot" />
<param name="max_speed" value="2.0" />
<param name="debug_mode" value="true" />
<rosparam file="$(find my_package)/config/robot_params.yaml" command="load" />
<group ns="robot1">
<param name="id" value="robot_001" />
<param name="type" value="mobile" />
group>
<node name="my_node" pkg="my_package" type="my_node" output="screen">
<param name="private_param" value="private_value" />
<rosparam file="$(find my_package)/config/node_config.yaml" command="load" />
node>
<arg name="use_simulator" default="false" />
<param name="simulator_enabled" value="$(arg use_simulator)" />
<param name="home_directory" value="$(env HOME)" />
<param name="ros_version" value="$(env ROS_VERSION)" />
<param name="calculated_value" value="$(eval 2 * 3.14159)" />
<param name="conditional_value" value="$(eval 'simulation' if arg('use_simulator') else 'real')" />
<rosparam>
controllers:
pid:
kp: 1.0
ki: 0.1
kd: 0.05
trajectory:
lookahead: 0.5
tolerance: 0.1
rosparam>
<rosparam subst_value="true">
config_path: $(find my_package)/config
data_path: $(find my_package)/data
rosparam>
<include file="$(find my_package)/launch/sub_launch.launch">
<arg name="robot_id" value="robot_001" />
include>
launch>
7. 参数服务器进阶
7.1 参数缓存和优化
#!/usr/bin/env python
import rospy
import threading
import time
class ParameterCache:
"""参数缓存器"""
def __init__(self, cache_duration=1.0):
self.cache = {}
self.cache_duration = cache_duration
self.lock = threading.Lock()
def get(self, param_name, default=None):
"""获取缓存的参数"""
with self.lock:
# 检查缓存
if param_name in self.cache:
value, timestamp = self.cache[param_name]
# 检查是否过期
if time.time() - timestamp < self.cache_duration:
return value
# 从参数服务器获取
try:
value = rospy.get_param(param_name, default)
self.cache[param_name] = (value, time.time())
return value
except:
return default
def invalidate(self, param_name=None):
"""使缓存失效"""
with self.lock:
if param_name:
self.cache.pop(param_name, None)
else:
self.cache.clear()
class ParameterWatcher:
"""参数监视器"""
def __init__(self):
self.callbacks = {}
self.watching = True
self.thread = threading.Thread(target=self.watch_loop)
self.thread.daemon = True
self.thread.start()
def watch(self, param_name, callback):
"""监视参数变化"""
if param_name not in self.callbacks:
self.callbacks[param_name] = []
self.callbacks[param_name].append(callback)
# 获取初始值
try:
initial_value = rospy.get_param(param_name)
callback(param_name, None, initial_value)
except:
pass
def watch_loop(self):
"""监视循环"""
last_values = {}
while self.watching and not rospy.is_shutdown():
for param_name, callbacks in self.callbacks.items():
try:
current_value = rospy.get_param(param_name)
if param_name in last_values:
if current_value != last_values[param_name]:
# 参数改变
for callback in callbacks:
callback(param_name, last_values[param_name], current_value)
last_values[param_name] = current_value
except KeyError:
if param_name in last_values:
# 参数被删除
for callback in callbacks:
callback(param_name, last_values[param_name], None)
del last_values[param_name]
rospy.sleep(0.1)
def stop(self):
"""停止监视"""
self.watching = False
self.thread.join()
8. 配置管理最佳实践
8.1 配置管理系统
#!/usr/bin/env python
import rospy
import yaml
import json
import os
from enum import Enum
class ConfigLevel(Enum):
"""配置级别"""
DEFAULT = 0
SYSTEM = 1
USER = 2
RUNTIME = 3
class ConfigurationManager:
"""配置管理器"""
def __init__(self):
rospy.init_node('configuration_manager')
# 配置层级
self.configs = {
ConfigLevel.DEFAULT: {},
ConfigLevel.SYSTEM: {},
ConfigLevel.USER: {},
ConfigLevel.RUNTIME: {}
}
# 加载配置
self.load_configurations()
# 参数验证器
self.validator = ParameterValidator()
# 参数监视器
self.watcher = ParameterWatcher()
self.setup_watchers()
def load_configurations(self):
"""加载配置文件"""
# 加载默认配置
default_config_path = self.get_config_path('default.yaml')
self.load_config_file(default_config_path, ConfigLevel.DEFAULT)
# 加载系统配置
system_config_path = self.get_config_path('system.yaml')
self.load_config_file(system_config_path, ConfigLevel.SYSTEM)
# 加载用户配置
user_config_path = os.path.expanduser('~/.robot/config.yaml')
self.load_config_file(user_config_path, ConfigLevel.USER)
# 合并配置
self.merge_configurations()
def get_config_path(self, filename):
"""获取配置文件路径"""
try:
import rospkg
rospack = rospkg.RosPack()
package_path = rospack.get_path('my_package')
return os.path.join(package_path, 'config', filename)
except:
return filename
def load_config_file(self, filepath, level):
"""加载配置文件"""
if os.path.exists(filepath):
with open(filepath, 'r') as f:
config = yaml.safe_load(f)
self.configs[level] = config
rospy.loginfo(f"Loaded {level.name} config from {filepath}")
def merge_configurations(self):
"""合并配置层级"""
merged = {}
# 按优先级合并
for level in [ConfigLevel.DEFAULT, ConfigLevel.SYSTEM,
ConfigLevel.USER, ConfigLevel.RUNTIME]:
merged = self.deep_update(merged, self.configs[level])
# 设置到参数服务器
for key, value in self.flatten_dict(merged).items():
rospy.set_param(key, value)
rospy.loginfo("Configuration merged and loaded")
def deep_update(self, base_dict, update_dict):
"""深度更新字典"""
for key, value in update_dict.items():
if isinstance(value, dict) and key in base_dict:
base_dict[key] = self.deep_update(base_dict.get(key, {}), value)
else:
base_dict[key] = value
return base_dict
def flatten_dict(self, d, parent_key='~', sep='/'):
"""扁平化字典"""
items = []
for k, v in d.items():
new_key = f"{parent_key}{sep}{k}" if parent_key else k
if isinstance(v, dict):
items.extend(self.flatten_dict(v, new_key, sep=sep).items())
else:
items.append((new_key, v))
return dict(items)
def get_config(self, key, level=None):
"""获取配置"""
if level:
return self.get_nested_value(self.configs[level], key)
else:
# 从参数服务器获取
return rospy.get_param(f"~{key}")
def set_config(self, key, value, level=ConfigLevel.RUNTIME):
"""设置配置"""
# 验证参数
if not self.validator.validate(key, value):
rospy.logwarn(f"Invalid value for {key}: {value}")
return False
# 设置到指定级别
self.set_nested_value(self.configs[level], key, value)
# 更新参数服务器
rospy.set_param(f"~{key}", value)
return True
def get_nested_value(self, d, key):
"""获取嵌套值"""
keys = key.split('/')
for k in keys:
d = d.get(k, {})
return d
def set_nested_value(self, d, key, value):
"""设置嵌套值"""
keys = key.split('/')
for k in keys[:-1]:
d = d.setdefault(k, {})
d[keys[-1]] = value
def save_configuration(self, level=ConfigLevel.USER):
"""保存配置"""
if level == ConfigLevel.USER:
filepath = os.path.expanduser('~/.robot/config.yaml')
os.makedirs(os.path.dirname(filepath), exist_ok=True)
with open(filepath, 'w') as f:
yaml.dump(self.configs[level], f, default_flow_style=False)
rospy.loginfo(f"Configuration saved to {filepath}")
def setup_watchers(self):
"""设置参数监视器"""
# 监视关键参数
self.watcher.watch('~safety/emergency_stop', self.on_emergency_stop_changed)
self.watcher.watch('~motion/max_speed', self.on_max_speed_changed)
def on_emergency_stop_changed(self, name, old_value, new_value):
"""紧急停止参数变化"""
if new_value:
rospy.logwarn("EMERGENCY STOP ACTIVATED")
# 执行紧急停止逻辑
def on_max_speed_changed(self, name, old_value, new_value):
"""最大速度参数变化"""
rospy.loginfo(f"Max speed changed from {old_value} to {new_value}")
# 更新速度限制
class ParameterValidator:
"""参数验证器"""
def __init__(self):
# 定义验证规则
self.rules = {
'motion/max_speed': {
'type': float,
'min': 0.0,
'max': 10.0
},
'sensors/camera/fps': {
'type': int,
'min': 1,
'max': 120
},
'robot/name': {
'type': str,
'pattern': r'^[a-zA-Z0-9_]+$'
}
}
def validate(self, param_name, value):
"""验证参数"""
if param_name not in self.rules:
return True # 没有规则,默认通过
rule = self.rules[param_name]
# 类型检查
if 'type' in rule:
if not isinstance(value, rule['type']):
return False
# 范围检查
if 'min' in rule and value < rule['min']:
return False
if 'max' in rule and value > rule['max']:
return False
# 模式匹配
if 'pattern' in rule:
import re
if not re.match(rule['pattern'], str(value)):
return False
return True
9. 实战案例:智能配置管理系统
9.1 完整的配置管理系统
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import rospy
import yaml
import json
import os
import threading
from datetime import datetime
from dynamic_reconfigure.server import Server
from my_package.cfg import SystemConfig
class IntelligentConfigSystem:
"""智能配置管理系统"""
def __init__(self):
rospy.init_node('intelligent_config_system')
# 系统组件
self.config_manager = ConfigurationManager()
self.profile_manager = ProfileManager()
self.adaptation_engine = AdaptationEngine()
# 动态配置服务器
self.dyn_server = Server(SystemConfig, self.dynamic_callback)
# 启动系统
self.initialize_system()
rospy.loginfo("Intelligent Configuration System started")
def initialize_system(self):
"""初始化系统"""
# 加载配置文件
self.config_manager.load_all_configs()
# 加载配置档案
self.profile_manager.load_profiles()
# 启动自适应引擎
self.adaptation_engine.start()
# 设置服务
self.setup_services()
def setup_services(self):
"""设置ROS服务"""
rospy.Service('~save_config', SaveConfig, self.handle_save_config)
rospy.Service('~load_config', LoadConfig, self.handle_load_config)
rospy.Service('~switch_profile', SwitchProfile, self.handle_switch_profile)
rospy.Service('~get_config_info', GetConfigInfo, self.handle_get_config_info)
def dynamic_callback(self, config, level):
"""动态配置回调"""
rospy.loginfo(f"Dynamic reconfigure: level={level}")
# 更新配置
self.config_manager.update_from_dynamic(config)
# 触发自适应
self.adaptation_engine.adapt(config)
return config
def handle_save_config(self, req):
"""处理保存配置请求"""
success = self.config_manager.save_config(req.name, req.description)
return SaveConfigResponse(success=success)
def handle_load_config(self, req):
"""处理加载配置请求"""
success = self.config_manager.load_config(req.name)
return LoadConfigResponse(success=success)
def handle_switch_profile(self, req):
"""处理切换配置档案请求"""
success = self.profile_manager.switch_profile(req.profile_name)
return SwitchProfileResponse(success=success)
def handle_get_config_info(self, req):
"""处理获取配置信息请求"""
info = self.config_manager.get_info()
return GetConfigInfoResponse(info=info)
class ConfigurationManager:
"""配置管理器"""
def __init__(self):
self.configs = {}
self.current_config = None
self.config_history = []
self.lock = threading.Lock()
def load_all_configs(self):
"""加载所有配置"""
config_dir = self.get_config_directory()
for filename in os.listdir(config_dir):
if filename.endswith('.yaml'):
filepath = os.path.join(config_dir, filename)
self.load_config_file(filepath)
def get_config_directory(self):
"""获取配置目录"""
return os.path.expanduser('~/.robot/configs')
def load_config_file(self, filepath):
"""加载配置文件"""
with self.lock:
try:
with open(filepath, 'r') as f:
config = yaml.safe_load(f)
config_name = os.path.basename(filepath).replace('.yaml', '')
self.configs[config_name] = config
rospy.loginfo(f"Loaded configuration: {config_name}")
except Exception as e:
rospy.logerr(f"Failed to load config {filepath}: {e}")
def save_config(self, name, description=""):
"""保存当前配置"""
with self.lock:
# 获取当前参数
current_params = self.get_current_parameters()
# 添加元数据
config = {
'metadata': {
'name': name,
'description': description,
'timestamp': datetime.now().isoformat(),
'ros_version': rospy.get_param('/rosdistro', 'unknown')
},
'parameters': current_params
}
# 保存到文件
filepath = os.path.join(self.get_config_directory(), f"{name}.yaml")
with open(filepath, 'w') as f:
yaml.dump(config, f, default_flow_style=False)
self.configs[name] = config
rospy.loginfo(f"Configuration saved: {name}")
return True
def get_current_parameters(self):
"""获取当前参数"""
params = {}
# 获取所有参数
for param_name in rospy.get_param_names():
if not param_name.startswith('/rosout'):
params[param_name] = rospy.get_param(param_name)
return params
def load_config(self, name):
"""加载配置"""
with self.lock:
if name not in self.configs:
rospy.logerr(f"Configuration '{name}' not found")
return False
config = self.configs[name]
# 保存当前配置到历史
if self.current_config:
self.config_history.append(self.current_config)
# 应用新配置
for param_name, value in config['parameters'].items():
rospy.set_param(param_name, value)
self.current_config = name
rospy.loginfo(f"Configuration loaded: {name}")
return True
def update_from_dynamic(self, dynamic_config):
"""从动态配置更新"""
# 将动态配置转换为参数
for attr in dir(dynamic_config):
if not attr.startswith('_'):
value = getattr(dynamic_config, attr)
rospy.set_param(f"~{attr}", value)
def get_info(self):
"""获取配置信息"""
info = {
'current_config': self.current_config,
'available_configs': list(self.configs.keys()),
'history_size': len(self.config_history)
}
return json.dumps(info)
class ProfileManager:
"""配置档案管理器"""
def __init__(self):
self.profiles = {}
self.current_profile = None
self.load_default_profiles()
def load_default_profiles(self):
"""加载默认档案"""
# 性能模式
self.profiles['performance'] = {
'motion/max_speed': 5.0,
'motion/acceleration': 2.0,
'sensors/camera/fps': 60,
'control/frequency': 100.0
}
# 节能模式
self.profiles['power_saving'] = {
'motion/max_speed': 1.0,
'motion/acceleration': 0.5,
'sensors/camera/fps': 15,
'control/frequency': 10.0
}
# 精确模式
self.profiles['precision'] = {
'motion/max_speed': 0.5,
'motion/acceleration': 0.2,
'sensors/camera/fps': 30,
'control/frequency': 50.0
}
# 安全模式
self.profiles['safety'] = {
'motion/max_speed': 0.3,
'motion/acceleration': 0.1,
'safety/obstacle_distance': 2.0,
'safety/emergency_stop_distance': 0.5
}
def load_profiles(self):
"""加载用户档案"""
profile_dir = os.path.expanduser('~/.robot/profiles')
if os.path.exists(profile_dir):
for filename in os.listdir(profile_dir):
if filename.endswith('.yaml'):
filepath = os.path.join(profile_dir, filename)
with open(filepath, 'r') as f:
profile = yaml.safe_load(f)
profile_name = os.path.basename(filename).replace('.yaml', '')
self.profiles[profile_name] = profile
def switch_profile(self, profile_name):
"""切换档案"""
if profile_name not in self.profiles:
rospy.logerr(f"Profile '{profile_name}' not found")
return False
profile = self.profiles[profile_name]
# 应用档案参数
for param_name, value in profile.items():
rospy.set_param(f"~{param_name}", value)
self.current_profile = profile_name
rospy.loginfo(f"Switched to profile: {profile_name}")
return True
def create_profile(self, name, params):
"""创建新档案"""
self.profiles[name] = params
# 保存到文件
profile_dir = os.path.expanduser('~/.robot/profiles')
os.makedirs(profile_dir, exist_ok=True)
filepath = os.path.join(profile_dir, f"{name}.yaml")
with open(filepath, 'w') as f:
yaml.dump(params, f, default_flow_style=False)
rospy.loginfo(f"Profile created: {name}")
class AdaptationEngine:
"""自适应引擎"""
def __init__(self):
self.rules = []
self.metrics = {}
self.running = False
self.thread = None
self.setup_adaptation_rules()
def setup_adaptation_rules(self):
"""设置自适应规则"""
# 规则1:低电量时切换到节能模式
self.add_rule(
condition=lambda: self.get_metric('battery_level', 100) < 20,
action=lambda: self.switch_to_profile('power_saving'),
name="low_battery_rule"
)
# 规则2:高温时降低性能
self.add_rule(
condition=lambda: self.get_metric('temperature', 0) > 60,
action=lambda: self.reduce_performance(),
name="high_temperature_rule"
)
# 规则3:障碍物接近时降速
self.add_rule(
condition=lambda: self.get_metric('min_obstacle_distance', 10) < 1.0,
action=lambda: self.reduce_speed(),
name="obstacle_avoidance_rule"
)
def add_rule(self, condition, action, name=""):
"""添加自适应规则"""
self.rules.append({
'name': name,
'condition': condition,
'action': action,
'last_triggered': None
})
def start(self):
"""启动自适应引擎"""
self.running = True
self.thread = threading.Thread(target=self.adaptation_loop)
self.thread.daemon = True
self.thread.start()
def stop(self):
"""停止自适应引擎"""
self.running = False
if self.thread:
self.thread.join()
def adaptation_loop(self):
"""自适应循环"""
while self.running and not rospy.is_shutdown():
# 更新指标
self.update_metrics()
# 检查规则
for rule in self.rules:
try:
if rule['condition']():
# 避免频繁触发
now = rospy.Time.now()
if rule['last_triggered'] is None or
(now - rule['last_triggered']).to_sec() > 5.0:
rospy.loginfo(f"Triggering adaptation rule: {rule['name']}")
rule['action']()
rule['last_triggered'] = now
except Exception as e:
rospy.logerr(f"Error in adaptation rule {rule['name']}: {e}")
rospy.sleep(1.0)
def update_metrics(self):
"""更新系统指标"""
# 从各种来源收集指标
self.metrics['battery_level'] = self.get_battery_level()
self.metrics['temperature'] = self.get_temperature()
self.metrics['min_obstacle_distance'] = self.get_min_obstacle_distance()
self.metrics['cpu_usage'] = self.get_cpu_usage()
self.metrics['memory_usage'] = self.get_memory_usage()
def get_metric(self, name, default=0):
"""获取指标值"""
return self.metrics.get(name, default)
def adapt(self, config):
"""根据配置触发自适应"""
# 实现自适应逻辑
pass
def switch_to_profile(self, profile_name):
"""切换到指定档案"""
rospy.loginfo(f"Adaptation: Switching to {profile_name} profile")
# 实现档案切换
def reduce_performance(self):
"""降低性能"""
current_speed = rospy.get_param('~motion/max_speed', 1.0)
new_speed = current_speed * 0.8
rospy.set_param('~motion/max_speed', new_speed)
rospy.loginfo(f"Adaptation: Reduced speed to {new_speed}")
def reduce_speed(self):
"""降低速度"""
rospy.set_param('~motion/max_speed', 0.3)
rospy.loginfo("Adaptation: Speed reduced for obstacle avoidance")
# 指标获取方法
def get_battery_level(self):
"""获取电池电量"""
return rospy.get_param('~battery_level', 100.0)
def get_temperature(self):
"""获取温度"""
return rospy.get_param('~temperature', 25.0)
def get_min_obstacle_distance(self):
"""获取最近障碍物距离"""
return rospy.get_param('~min_obstacle_distance', 10.0)
def get_cpu_usage(self):
"""获取CPU使用率"""
try:
import psutil
return psutil.cpu_percent()
except:
return 0.0
def get_memory_usage(self):
"""获取内存使用率"""
try:
import psutil
return psutil.virtual_memory().percent
except:
return 0.0
def main():
try:
system = IntelligentConfigSystem()
rospy.spin()
except rospy.ROSInterruptException:
pass
if __name__ == '__main__':
main()
10. 总结与展望
10.1 本文总结
通过本文的学习,你已经掌握了:
- ✅ 参数服务器原理:理解参数存储和共享机制
- ✅ 参数基本操作:C++和Python的参数读写方法
- ✅ 参数文件管理:YAML配置文件的使用
- ✅ 动态参数配置:运行时参数调整
- ✅ 参数命名空间:层次化参数组织
- ✅ Launch文件配置:启动文件中的参数设置
- ✅ 高级特性:缓存、监视、验证等
- ✅ 最佳实践:配置管理系统设计
- ✅ 实战应用:智能配置管理系统
10.2 关键要点回顾
| 概念 | 要点 | 最佳实践 |
|---|---|---|
| 参数类型 | 支持基本类型和复合类型 | 使用合适的数据结构 |
| 命名空间 | 全局、相对、私有 | 合理组织参数层次 |
| 动态配置 | 运行时修改参数 | 提供合理的参数范围 |
| 参数文件 | YAML格式配置 | 分层管理配置文件 |
| 参数验证 | 类型和范围检查 | 设置默认值和边界 |
| 性能优化 | 缓存和批量操作 | 减少参数服务器访问 |
10.3 常见陷阱与解决方案
-
参数类型不匹配
- 问题:C++和Python类型差异
- 解决:使用XmlRpc处理复杂类型
-
参数更新延迟
- 问题:参数改变不能立即生效
- 解决:实现参数监听机制
-
命名空间混乱
- 问题:参数路径不清晰
- 解决:制定命名规范
-
配置文件冲突
- 问题:多个配置源覆盖
- 解决:定义优先级规则
10.4 下一步学习
在下一篇文章中,我们将学习:
- 自定义消息类型:创建复杂的消息结构
- 消息序列化:消息的编码和解码
- 消息版本管理:处理消息演化
- 性能优化:消息传输优化技巧
版权声明:本文为原创文章,转载请注明出处
💡 学习建议:参数管理是ROS系统配置的核心。建议先掌握基本的参数操作,再学习动态配置和配置管理系统。在实际项目中,合理的参数设计可以大大提高系统的灵活性和可维护性。
下一篇预告:《【ROS1从入门到精通】第7篇:消息类型自定义(打造专属通信协议)》
敬请期待!








