ros2-development

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

ROS2 Development Skill

ROS2开发技能

When to Use This Skill

何时使用本技能

  • Building ROS2 packages, nodes, or component containers
  • Setting up colcon workspaces, ament_cmake, or ament_python packages
  • Writing CMakeLists.txt, package.xml, or setup.py for ROS2
  • Defining custom messages, services, or actions
  • Writing Python launch files with conditional logic
  • Configuring DDS middleware and QoS profiles
  • Implementing lifecycle (managed) nodes
  • Working with Nav2, MoveIt2, or other ROS2 frameworks
  • Debugging DDS discovery, QoS mismatches, or build failures
  • Deploying ROS2 to production or embedded systems (micro-ROS)
  • Setting up CI/CD for ROS2 packages
  • 构建ROS2包、节点或组件容器
  • 设置colcon工作区、ament_cmake或ament_python包
  • 为ROS2编写CMakeLists.txt、package.xml或setup.py
  • 定义自定义消息、服务或动作
  • 编写带有条件逻辑的Python启动文件
  • 配置DDS中间件和QoS配置文件
  • 实现生命周期(托管)节点
  • 使用Nav2、MoveIt2或其他ROS2框架
  • 调试DDS发现、QoS不匹配或构建失败问题
  • 将ROS2部署到生产环境或嵌入式系统(micro-ROS)
  • 为ROS2包设置CI/CD

Core Architecture

核心架构

1. Node Design Patterns

1. 节点设计模式

Basic Node (rclpy):
python
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
from std_msgs.msg import String

class PerceptionNode(Node):
    def __init__(self):
        super().__init__('perception_node')

        # 1. Declare parameters with types and descriptions
        self.declare_parameter('rate_hz', 30.0,
            descriptor=ParameterDescriptor(
                description='Processing rate in Hz',
                floating_point_range=[FloatingPointRange(
                    from_value=1.0, to_value=120.0, step=0.0
                )]
            ))
        self.declare_parameter('confidence_threshold', 0.7)
        self.declare_parameter('frame_id', 'camera_link')

        # 2. Read parameters
        rate_hz = self.get_parameter('rate_hz').value
        self.threshold = self.get_parameter('confidence_threshold').value
        self.frame_id = self.get_parameter('frame_id').value

        # 3. Set up QoS profiles
        sensor_qos = QoSProfile(
            reliability=ReliabilityPolicy.BEST_EFFORT,
            history=HistoryPolicy.KEEP_LAST,
            depth=1
        )
        reliable_qos = QoSProfile(
            reliability=ReliabilityPolicy.RELIABLE,
            history=HistoryPolicy.KEEP_LAST,
            depth=10
        )

        # 4. Publishers first, then subscribers
        self.det_pub = self.create_publisher(
            DetectionArray, 'detections', reliable_qos)

        self.image_sub = self.create_subscription(
            Image, 'camera/image_raw', self.image_callback, sensor_qos)

        # 5. Timers for periodic work
        self.timer = self.create_timer(1.0 / rate_hz, self.timer_callback)

        # 6. Parameter change callback
        self.add_on_set_parameters_callback(self.param_callback)

        self.get_logger().info(
            f'Perception node started at {rate_hz}Hz, '
            f'threshold={self.threshold}')

    def param_callback(self, params):
        """Handle runtime parameter changes (replaces dynamic_reconfigure)"""
        for param in params:
            if param.name == 'confidence_threshold':
                self.threshold = param.value
                self.get_logger().info(f'Threshold updated to {param.value}')
        return SetParametersResult(successful=True)

    def image_callback(self, msg):
        # Process incoming images
        pass

    def timer_callback(self):
        # Periodic work
        pass

def main(args=None):
    rclpy.init(args=args)
    node = PerceptionNode()
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    finally:
        node.destroy_node()
        rclpy.shutdown()

if __name__ == '__main__':
    main()
Basic Node (rclcpp):
cpp
#include <rclcpp/rclcpp.hpp>
#include <sensor_msgs/msg/image.hpp>
#include <memory>

class PerceptionNode : public rclcpp::Node {
public:
    PerceptionNode() : Node("perception_node") {
        // Declare and get parameters
        this->declare_parameter("rate_hz", 30.0);
        this->declare_parameter("confidence_threshold", 0.7);
        double rate_hz = this->get_parameter("rate_hz").as_double();

        // QoS
        auto sensor_qos = rclcpp::SensorDataQoS();
        auto reliable_qos = rclcpp::QoS(10).reliable();

        // Publishers and subscribers
        det_pub_ = this->create_publisher<DetectionArray>("detections", reliable_qos);
        image_sub_ = this->create_subscription<sensor_msgs::msg::Image>(
            "camera/image_raw", sensor_qos,
            std::bind(&PerceptionNode::image_callback, this, std::placeholders::_1));

        timer_ = this->create_wall_timer(
            std::chrono::milliseconds(static_cast<int>(1000.0 / rate_hz)),
            std::bind(&PerceptionNode::timer_callback, this));

        RCLCPP_INFO(this->get_logger(), "Perception node started at %.1fHz", rate_hz);
    }

private:
    void image_callback(const sensor_msgs::msg::Image::SharedPtr msg) {
        // Use shared_ptr for zero-copy potential
    }
    void timer_callback() {}

    rclcpp::Publisher<DetectionArray>::SharedPtr det_pub_;
    rclcpp::Subscription<sensor_msgs::msg::Image>::SharedPtr image_sub_;
    rclcpp::TimerBase::SharedPtr timer_;
};

int main(int argc, char** argv) {
    rclcpp::init(argc, argv);
    rclcpp::spin(std::make_shared<PerceptionNode>());
    rclcpp::shutdown();
    return 0;
}
基础节点(rclpy):
python
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
from std_msgs.msg import String

class PerceptionNode(Node):
    def __init__(self):
        super().__init__('perception_node')

        # 1. Declare parameters with types and descriptions
        self.declare_parameter('rate_hz', 30.0,
            descriptor=ParameterDescriptor(
                description='Processing rate in Hz',
                floating_point_range=[FloatingPointRange(
                    from_value=1.0, to_value=120.0, step=0.0
                )]
            ))
        self.declare_parameter('confidence_threshold', 0.7)
        self.declare_parameter('frame_id', 'camera_link')

        # 2. Read parameters
        rate_hz = self.get_parameter('rate_hz').value
        self.threshold = self.get_parameter('confidence_threshold').value
        self.frame_id = self.get_parameter('frame_id').value

        # 3. Set up QoS profiles
        sensor_qos = QoSProfile(
            reliability=ReliabilityPolicy.BEST_EFFORT,
            history=HistoryPolicy.KEEP_LAST,
            depth=1
        )
        reliable_qos = QoSProfile(
            reliability=ReliabilityPolicy.RELIABLE,
            history=HistoryPolicy.KEEP_LAST,
            depth=10
        )

        # 4. Publishers first, then subscribers
        self.det_pub = self.create_publisher(
            DetectionArray, 'detections', reliable_qos)

        self.image_sub = self.create_subscription(
            Image, 'camera/image_raw', self.image_callback, sensor_qos)

        # 5. Timers for periodic work
        self.timer = self.create_timer(1.0 / rate_hz, self.timer_callback)

        # 6. Parameter change callback
        self.add_on_set_parameters_callback(self.param_callback)

        self.get_logger().info(
            f'Perception node started at {rate_hz}Hz, '
            f'threshold={self.threshold}')

    def param_callback(self, params):
        """Handle runtime parameter changes (replaces dynamic_reconfigure)"""
        for param in params:
            if param.name == 'confidence_threshold':
                self.threshold = param.value
                self.get_logger().info(f'Threshold updated to {param.value}')
        return SetParametersResult(successful=True)

    def image_callback(self, msg):
        # Process incoming images
        pass

    def timer_callback(self):
        # Periodic work
        pass

def main(args=None):
    rclpy.init(args=args)
    node = PerceptionNode()
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    finally:
        node.destroy_node()
        rclpy.shutdown()

if __name__ == '__main__':
    main()
基础节点(rclcpp):
cpp
#include <rclcpp/rclcpp.hpp>
#include <sensor_msgs/msg/image.hpp>
#include <memory>

class PerceptionNode : public rclcpp::Node {
public:
    PerceptionNode() : Node("perception_node") {
        // Declare and get parameters
        this->declare_parameter("rate_hz", 30.0);
        this->declare_parameter("confidence_threshold", 0.7);
        double rate_hz = this->get_parameter("rate_hz").as_double();

        // QoS
        auto sensor_qos = rclcpp::SensorDataQoS();
        auto reliable_qos = rclcpp::QoS(10).reliable();

        // Publishers and subscribers
        det_pub_ = this->create_publisher<DetectionArray>("detections", reliable_qos);
        image_sub_ = this->create_subscription<sensor_msgs::msg::Image>(
            "camera/image_raw", sensor_qos,
            std::bind(&PerceptionNode::image_callback, this, std::placeholders::_1));

        timer_ = this->create_wall_timer(
            std::chrono::milliseconds(static_cast<int>(1000.0 / rate_hz)),
            std::bind(&PerceptionNode::timer_callback, this));

        RCLCPP_INFO(this->get_logger(), "Perception node started at %.1fHz", rate_hz);
    }

private:
    void image_callback(const sensor_msgs::msg::Image::SharedPtr msg) {
        // Use shared_ptr for zero-copy potential
    }
    void timer_callback() {}

    rclcpp::Publisher<DetectionArray>::SharedPtr det_pub_;
    rclcpp::Subscription<sensor_msgs::msg::Image>::SharedPtr image_sub_;
    rclcpp::TimerBase::SharedPtr timer_;
};

int main(int argc, char** argv) {
    rclcpp::init(argc, argv);
    rclcpp::spin(std::make_shared<PerceptionNode>());
    rclcpp::shutdown();
    return 0;
}

2. Lifecycle (Managed) Nodes

2. 生命周期(托管)节点

Use lifecycle nodes for production systems where you need deterministic startup, shutdown, and error recovery. This is one of ROS2's most important features over ROS1.
State Machine:
Unconfigured → Inactive → Active → Finalized
python
from rclpy.lifecycle import Node as LifecycleNode, TransitionCallbackReturn

class ManagedPerception(LifecycleNode):
    def __init__(self):
        super().__init__('managed_perception')
        self.get_logger().info('Node created (unconfigured)')

    def on_configure(self, state) -> TransitionCallbackReturn:
        """Load params, allocate memory, set up pubs/subs (but don't activate)"""
        self.declare_parameter('model_path', '')
        model_path = self.get_parameter('model_path').value

        try:
            self.model = load_model(model_path)
            self.det_pub = self.create_lifecycle_publisher(
                DetectionArray, 'detections', 10)
            self.get_logger().info(f'Configured with model: {model_path}')
            return TransitionCallbackReturn.SUCCESS
        except Exception as e:
            self.get_logger().error(f'Configuration failed: {e}')
            return TransitionCallbackReturn.FAILURE

    def on_activate(self, state) -> TransitionCallbackReturn:
        """Start processing — subscriptions go live here"""
        self.image_sub = self.create_subscription(
            Image, 'camera/image_raw', self.image_callback, 1)
        self.get_logger().info('Activated — processing images')
        return TransitionCallbackReturn.SUCCESS

    def on_deactivate(self, state) -> TransitionCallbackReturn:
        """Pause processing — safe to reconfigure after this"""
        self.destroy_subscription(self.image_sub)
        self.get_logger().info('Deactivated — stopped processing')
        return TransitionCallbackReturn.SUCCESS

    def on_cleanup(self, state) -> TransitionCallbackReturn:
        """Release resources, return to unconfigured"""
        del self.model
        self.get_logger().info('Cleaned up')
        return TransitionCallbackReturn.SUCCESS

    def on_shutdown(self, state) -> TransitionCallbackReturn:
        """Final cleanup before destruction"""
        self.get_logger().info('Shutting down')
        return TransitionCallbackReturn.SUCCESS

    def on_error(self, state) -> TransitionCallbackReturn:
        """Handle errors — try to recover or fail gracefully"""
        self.get_logger().error(f'Error in state {state.label}')
        return TransitionCallbackReturn.SUCCESS  # Transition to unconfigured
Orchestrating Lifecycle Nodes with a launch file:
python
from launch import LaunchDescription
from launch_ros.actions import LifecycleNode
from launch_ros.event_handlers import OnStateTransition
from launch.actions import EmitEvent, RegisterEventHandler
from launch_ros.events.lifecycle import ChangeState
from lifecycle_msgs.msg import Transition

def generate_launch_description():
    perception = LifecycleNode(
        package='my_pkg', executable='managed_perception',
        name='perception', output='screen',
        parameters=[{'model_path': '/models/yolo.pt'}]
    )

    # Auto-configure on startup
    configure_event = EmitEvent(event=ChangeState(
        lifecycle_node_matcher=lambda node: node == perception,
        transition_id=Transition.TRANSITION_CONFIGURE
    ))

    # Auto-activate after successful configure
    activate_handler = RegisterEventHandler(OnStateTransition(
        target_lifecycle_node=perception,
        goal_state='inactive',
        entities=[EmitEvent(event=ChangeState(
            lifecycle_node_matcher=lambda node: node == perception,
            transition_id=Transition.TRANSITION_ACTIVATE
        ))]
    ))

    return LaunchDescription([
        perception,
        configure_event,
        activate_handler,
    ])
在需要确定性启动、关闭和错误恢复的生产系统中使用生命周期节点。这是ROS2相比ROS1最重要的特性之一。
状态机:
Unconfigured → Inactive → Active → Finalized
python
from rclpy.lifecycle import Node as LifecycleNode, TransitionCallbackReturn

class ManagedPerception(LifecycleNode):
    def __init__(self):
        super().__init__('managed_perception')
        self.get_logger().info('Node created (unconfigured)')

    def on_configure(self, state) -> TransitionCallbackReturn:
        """Load params, allocate memory, set up pubs/subs (but don't activate)"""
        self.declare_parameter('model_path', '')
        model_path = self.get_parameter('model_path').value

        try:
            self.model = load_model(model_path)
            self.det_pub = self.create_lifecycle_publisher(
                DetectionArray, 'detections', 10)
            self.get_logger().info(f'Configured with model: {model_path}')
            return TransitionCallbackReturn.SUCCESS
        except Exception as e:
            self.get_logger().error(f'Configuration failed: {e}')
            return TransitionCallbackReturn.FAILURE

    def on_activate(self, state) -> TransitionCallbackReturn:
        """Start processing — subscriptions go live here"""
        self.image_sub = self.create_subscription(
            Image, 'camera/image_raw', self.image_callback, 1)
        self.get_logger().info('Activated — processing images')
        return TransitionCallbackReturn.SUCCESS

    def on_deactivate(self, state) -> TransitionCallbackReturn:
        """Pause processing — safe to reconfigure after this"""
        self.destroy_subscription(self.image_sub)
        self.get_logger().info('Deactivated — stopped processing')
        return TransitionCallbackReturn.SUCCESS

    def on_cleanup(self, state) -> TransitionCallbackReturn:
        """Release resources, return to unconfigured"""
        del self.model
        self.get_logger().info('Cleaned up')
        return TransitionCallbackReturn.SUCCESS

    def on_shutdown(self, state) -> TransitionCallbackReturn:
        """Final cleanup before destruction"""
        self.get_logger().info('Shutting down')
        return TransitionCallbackReturn.SUCCESS

    def on_error(self, state) -> TransitionCallbackReturn:
        """Handle errors — try to recover or fail gracefully"""
        self.get_logger().error(f'Error in state {state.label}')
        return TransitionCallbackReturn.SUCCESS  # Transition to unconfigured
使用启动文件编排生命周期节点:
python
from launch import LaunchDescription
from launch_ros.actions import LifecycleNode
from launch_ros.event_handlers import OnStateTransition
from launch.actions import EmitEvent, RegisterEventHandler
from launch_ros.events.lifecycle import ChangeState
from lifecycle_msgs.msg import Transition

def generate_launch_description():
    perception = LifecycleNode(
        package='my_pkg', executable='managed_perception',
        name='perception', output='screen',
        parameters=[{'model_path': '/models/yolo.pt'}]
    )

    # 启动时自动配置
    configure_event = EmitEvent(event=ChangeState(
        lifecycle_node_matcher=lambda node: node == perception,
        transition_id=Transition.TRANSITION_CONFIGURE
    ))

    # 配置成功后自动激活
    activate_handler = RegisterEventHandler(OnStateTransition(
        target_lifecycle_node=perception,
        goal_state='inactive',
        entities=[EmitEvent(event=ChangeState(
            lifecycle_node_matcher=lambda node: node == perception,
            transition_id=Transition.TRANSITION_ACTIVATE
        ))]
    ))

    return LaunchDescription([
        perception,
        configure_event,
        activate_handler,
    ])

3. QoS (Quality of Service) — The #1 Source of ROS2 Bugs

3. QoS(服务质量)—— ROS2故障的头号来源

QoS mismatches are the most common reason topics silently fail to connect.
QoS Compatibility Matrix:
Publisher     Subscriber    Compatible?
RELIABLE      RELIABLE      ✅ Yes
RELIABLE      BEST_EFFORT   ✅ Yes
BEST_EFFORT   BEST_EFFORT   ✅ Yes
BEST_EFFORT   RELIABLE      ❌ NO — SILENT FAILURE
Recommended QoS Profiles by Use Case:
python
from rclpy.qos import (
    QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy,
    QoSDurabilityPolicy, QoSPresetProfiles
)
QoS不匹配是话题静默无法连接的最常见原因。
QoS兼容性矩阵:
发布方        订阅方        是否兼容?
RELIABLE      RELIABLE      ✅ 是
RELIABLE      BEST_EFFORT   ✅ 是
BEST_EFFORT   BEST_EFFORT   ✅ 是
BEST_EFFORT   RELIABLE      ❌ 否 —— 静默失败
按用例推荐的QoS配置文件:
python
from rclpy.qos import (
    QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy,
    QoSDurabilityPolicy, QoSPresetProfiles
)

Sensor data (cameras, lidars) — tolerate drops, want latest

传感器数据(摄像头、激光雷达)—— 容忍丢包,需要最新数据

SENSOR_QOS = QoSProfile( reliability=QoSReliabilityPolicy.BEST_EFFORT, history=QoSHistoryPolicy.KEEP_LAST, depth=1, durability=QoSDurabilityPolicy.VOLATILE )
SENSOR_QOS = QoSProfile( reliability=QoSReliabilityPolicy.BEST_EFFORT, history=QoSHistoryPolicy.KEEP_LAST, depth=1, durability=QoSDurabilityPolicy.VOLATILE )

Commands (velocity, joint) — never miss, small buffer

命令(速度、关节)—— 绝不丢包,小缓冲区

COMMAND_QOS = QoSProfile( reliability=QoSReliabilityPolicy.RELIABLE, history=QoSHistoryPolicy.KEEP_LAST, depth=10, durability=QoSDurabilityPolicy.VOLATILE )
COMMAND_QOS = QoSProfile( reliability=QoSReliabilityPolicy.RELIABLE, history=QoSHistoryPolicy.KEEP_LAST, depth=10, durability=QoSDurabilityPolicy.VOLATILE )

Map / static data — reliable, and late joiners get it

地图/静态数据——可靠,迟加入者能获取

MAP_QOS = QoSProfile( reliability=QoSReliabilityPolicy.RELIABLE, history=QoSHistoryPolicy.KEEP_LAST, depth=1, durability=QoSDurabilityPolicy.TRANSIENT_LOCAL # Replaces ROS1 latch )
MAP_QOS = QoSProfile( reliability=QoSReliabilityPolicy.RELIABLE, history=QoSHistoryPolicy.KEEP_LAST, depth=1, durability=QoSDurabilityPolicy.TRANSIENT_LOCAL # 替代ROS1的latch )

Default parameter/state — reliable with some history

默认参数/状态——可靠,带部分历史

STATE_QOS = QoSProfile( reliability=QoSReliabilityPolicy.RELIABLE, history=QoSHistoryPolicy.KEEP_LAST, depth=10 )

**Debugging QoS Issues**:
```bash
STATE_QOS = QoSProfile( reliability=QoSReliabilityPolicy.RELIABLE, history=QoSHistoryPolicy.KEEP_LAST, depth=10 )

**调试QoS问题**:
```bash

Check QoS info for a topic

查看话题的QoS信息

ros2 topic info /camera/image_raw -v
ros2 topic info /camera/image_raw -v

Look for "Reliability" and "Durability" fields

查看“Reliability”和“Durability”字段

Check for incompatible QoS events

检查不兼容的QoS事件

ros2 run rqt_topic rqt_topic # Shows sub counts and QoS
ros2 run rqt_topic rqt_topic # 显示订阅数和QoS

If 0 subscribers despite nodes running: QoS MISMATCH

如果节点运行但订阅数为0:QoS不匹配

undefined
undefined

4. Launch Files (Python-Based)

4. 启动文件(基于Python)

ROS2 launch files are Python, enabling powerful conditional logic:
python
import os
from launch import LaunchDescription
from launch.actions import (
    DeclareLaunchArgument, IncludeLaunchDescription,
    GroupAction, OpaqueFunction, TimerAction
)
from launch.conditions import IfCondition, UnlessCondition
from launch.substitutions import (
    LaunchConfiguration, PathJoinSubstitution,
    PythonExpression
)
from launch_ros.actions import Node, ComposableNodeContainer, LoadComposableNode
from launch_ros.descriptions import ComposableNode
from launch_ros.substitutions import FindPackageShare

def generate_launch_description():

    # Arguments
    robot_name_arg = DeclareLaunchArgument('robot_name', default_value='ur5')
    sim_arg = DeclareLaunchArgument('sim', default_value='false')
    use_composition_arg = DeclareLaunchArgument('use_composition', default_value='true')

    robot_name = LaunchConfiguration('robot_name')
    sim = LaunchConfiguration('sim')

    # Load YAML params
    config_file = PathJoinSubstitution([
        FindPackageShare('my_pkg'), 'config', 'robot_params.yaml'
    ])

    # Standard node
    perception_node = Node(
        package='my_pkg',
        executable='perception_node',
        name='perception',
        namespace=robot_name,
        parameters=[config_file, {'use_sim_time': sim}],
        remappings=[
            ('camera/image_raw', 'realsense/color/image_raw'),
            ('detections', 'perception/detections'),
        ],
        output='screen',
        condition=UnlessCondition(LaunchConfiguration('use_composition')),
    )

    # Composable nodes (zero-copy, same process)
    composable_container = ComposableNodeContainer(
        name='perception_container',
        namespace=robot_name,
        package='rclcpp_components',
        executable='component_container_mt',  # Multi-threaded
        composable_node_descriptions=[
            ComposableNode(
                package='my_pkg',
                plugin='my_pkg::PerceptionComponent',
                name='perception',
                parameters=[config_file],
                remappings=[
                    ('camera/image_raw', 'realsense/color/image_raw'),
                ],
            ),
            ComposableNode(
                package='my_pkg',
                plugin='my_pkg::TrackerComponent',
                name='tracker',
            ),
        ],
        condition=IfCondition(LaunchConfiguration('use_composition')),
    )

    # Delayed start for nodes that need others to initialize first
    delayed_planner = TimerAction(
        period=3.0,
        actions=[
            Node(package='my_pkg', executable='planner_node', name='planner')
        ]
    )

    return LaunchDescription([
        robot_name_arg, sim_arg, use_composition_arg,
        perception_node,
        composable_container,
        delayed_planner,
    ])
ROS2启动文件为Python编写,支持强大的条件逻辑:
python
import os
from launch import LaunchDescription
from launch.actions import (
    DeclareLaunchArgument, IncludeLaunchDescription,
    GroupAction, OpaqueFunction, TimerAction
)
from launch.conditions import IfCondition, UnlessCondition
from launch.substitutions import (
    LaunchConfiguration, PathJoinSubstitution,
    PythonExpression
)
from launch_ros.actions import Node, ComposableNodeContainer, LoadComposableNode
from launch_ros.descriptions import ComposableNode
from launch_ros.substitutions import FindPackageShare

def generate_launch_description():

    # 参数
    robot_name_arg = DeclareLaunchArgument('robot_name', default_value='ur5')
    sim_arg = DeclareLaunchArgument('sim', default_value='false')
    use_composition_arg = DeclareLaunchArgument('use_composition', default_value='true')

    robot_name = LaunchConfiguration('robot_name')
    sim = LaunchConfiguration('sim')

    # 加载YAML参数
    config_file = PathJoinSubstitution([
        FindPackageShare('my_pkg'), 'config', 'robot_params.yaml'
    ])

    # 标准节点
    perception_node = Node(
        package='my_pkg',
        executable='perception_node',
        name='perception',
        namespace=robot_name,
        parameters=[config_file, {'use_sim_time': sim}],
        remappings=[
            ('camera/image_raw', 'realsense/color/image_raw'),
            ('detections', 'perception/detections'),
        ],
        output='screen',
        condition=UnlessCondition(LaunchConfiguration('use_composition')),
    )

    # 组件节点(零拷贝,同一进程)
    composable_container = ComposableNodeContainer(
        name='perception_container',
        namespace=robot_name,
        package='rclcpp_components',
        executable='component_container_mt',  # 多线程
        composable_node_descriptions=[
            ComposableNode(
                package='my_pkg',
                plugin='my_pkg::PerceptionComponent',
                name='perception',
                parameters=[config_file],
                remappings=[
                    ('camera/image_raw', 'realsense/color/image_raw'),
                ],
            ),
            ComposableNode(
                package='my_pkg',
                plugin='my_pkg::TrackerComponent',
                name='tracker',
            ),
        ],
        condition=IfCondition(LaunchConfiguration('use_composition')),
    )

    # 延迟启动需要其他节点先初始化的节点
    delayed_planner = TimerAction(
        period=3.0,
        actions=[
            Node(package='my_pkg', executable='planner_node', name='planner')
        ]
    )

    return LaunchDescription([
        robot_name_arg, sim_arg, use_composition_arg,
        perception_node,
        composable_container,
        delayed_planner,
    ])

5. Components (ROS2's Answer to Nodelets)

5. 组件(ROS2对Nodelets的替代方案)

cpp
#include <rclcpp/rclcpp.hpp>
#include <rclcpp_components/register_node_macro.hpp>
#include <sensor_msgs/msg/image.hpp>

namespace my_pkg {

class PerceptionComponent : public rclcpp::Node {
public:
    explicit PerceptionComponent(const rclcpp::NodeOptions& options)
        : Node("perception", options)
    {
        // Use intra-process communication for zero-copy
        auto sub_options = rclcpp::SubscriptionOptions();
        sub_options.use_intra_process_comm =
            rclcpp::IntraProcessSetting::Enable;

        sub_ = this->create_subscription<sensor_msgs::msg::Image>(
            "camera/image_raw",
            rclcpp::SensorDataQoS(),
            std::bind(&PerceptionComponent::callback, this,
                      std::placeholders::_1),
            sub_options);
    }

private:
    void callback(const sensor_msgs::msg::Image::UniquePtr msg) {
        // UniquePtr = zero-copy when using intra-process
        // msg is moved, not copied
    }

    rclcpp::Subscription<sensor_msgs::msg::Image>::SharedPtr sub_;
};

}  // namespace my_pkg

RCLCPP_COMPONENTS_REGISTER_NODE(my_pkg::PerceptionComponent)
cpp
#include <rclcpp/rclcpp.hpp>
#include <rclcpp_components/register_node_macro.hpp>
#include <sensor_msgs/msg/image.hpp>

namespace my_pkg {

class PerceptionComponent : public rclcpp::Node {
public:
    explicit PerceptionComponent(const rclcpp::NodeOptions& options)
        : Node("perception", options)
    {
        // 使用进程内通信实现零拷贝
        auto sub_options = rclcpp::SubscriptionOptions();
        sub_options.use_intra_process_comm =
            rclcpp::IntraProcessSetting::Enable;

        sub_ = this->create_subscription<sensor_msgs::msg::Image>(
            "camera/image_raw",
            rclcpp::SensorDataQoS(),
            std::bind(&PerceptionComponent::callback, this,
                      std::placeholders::_1),
            sub_options);
    }

private:
    void callback(const sensor_msgs::msg::Image::UniquePtr msg) {
        // UniquePtr = 使用进程内通信时实现零拷贝
        // msg被移动,而非拷贝
    }

    rclcpp::Subscription<sensor_msgs::msg::Image>::SharedPtr sub_;
};

}  // namespace my_pkg

RCLCPP_COMPONENTS_REGISTER_NODE(my_pkg::PerceptionComponent)

6. Actions (ROS2 Style)

ROS2风格的动作

python
from rclpy.action import ActionServer, CancelResponse, GoalResponse
from my_interfaces.action import PickPlace

class PickPlaceServer(Node):
    def __init__(self):
        super().__init__('pick_place_server')
        self._action_server = ActionServer(
            self, PickPlace, 'pick_place',
            execute_callback=self.execute_cb,
            goal_callback=self.goal_cb,
            cancel_callback=self.cancel_cb,
        )

    def goal_cb(self, goal_request):
        """Decide whether to accept or reject the goal"""
        self.get_logger().info(f'Received goal: {goal_request.target_pose}')
        return GoalResponse.ACCEPT

    def cancel_cb(self, goal_handle):
        """Decide whether to accept cancel requests"""
        self.get_logger().info('Cancel requested')
        return CancelResponse.ACCEPT

    async def execute_cb(self, goal_handle):
        """Execute the action (runs in an executor thread)"""
        feedback_msg = PickPlace.Feedback()

        for i, step in enumerate(self.plan(goal_handle.request)):
            # Check cancellation
            if goal_handle.is_cancel_requested:
                goal_handle.canceled()
                return PickPlace.Result(success=False)

            self.execute_step(step)
            feedback_msg.progress = float(i) / len(self.steps)
            goal_handle.publish_feedback(feedback_msg)

        goal_handle.succeed()
        return PickPlace.Result(success=True)
python
from rclpy.action import ActionServer, CancelResponse, GoalResponse
from my_interfaces.action import PickPlace

class PickPlaceServer(Node):
    def __init__(self):
        super().__init__('pick_place_server')
        self._action_server = ActionServer(
            self, PickPlace, 'pick_place',
            execute_callback=self.execute_cb,
            goal_callback=self.goal_cb,
            cancel_callback=self.cancel_cb,
        )

    def goal_cb(self, goal_request):
        """Decide whether to accept or reject the goal"""
        self.get_logger().info(f'Received goal: {goal_request.target_pose}')
        return GoalResponse.ACCEPT

    def cancel_cb(self, goal_handle):
        """Decide whether to accept cancel requests"""
        self.get_logger().info('Cancel requested')
        return CancelResponse.ACCEPT

    async def execute_cb(self, goal_handle):
        """Execute the action (runs in an executor thread)"""
        feedback_msg = PickPlace.Feedback()

        for i, step in enumerate(self.plan(goal_handle.request)):
            # 检查取消请求
            if goal_handle.is_cancel_requested:
                goal_handle.canceled()
                return PickPlace.Result(success=False)

            self.execute_step(step)
            feedback_msg.progress = float(i) / len(self.steps)
            goal_handle.publish_feedback(feedback_msg)

        goal_handle.succeed()
        return PickPlace.Result(success=True)

DDS Configuration

DDS配置

Choosing a DDS Implementation

选择DDS实现

bash
undefined
bash
undefined

Set DDS middleware (in ~/.bashrc or launch)

设置DDS中间件(在~/.bashrc或启动文件中)

export RMW_IMPLEMENTATION=rmw_cyclonedds_cpp # Recommended for most cases
export RMW_IMPLEMENTATION=rmw_cyclonedds_cpp # 大多数场景推荐

export RMW_IMPLEMENTATION=rmw_fastrtps_cpp # Default, good for multi-machine

export RMW_IMPLEMENTATION=rmw_fastrtps_cpp # 默认,适合多机器场景

Limit DDS discovery to local machine (reduces network noise)

限制DDS发现到本地机器(减少网络噪声)

export ROS_LOCALHOST_ONLY=1
export ROS_LOCALHOST_ONLY=1

Use ROS_DOMAIN_ID to isolate robot groups on same network

使用ROS_DOMAIN_ID隔离同一网络上的机器人组

export ROS_DOMAIN_ID=42 # Range 0-101
undefined
export ROS_DOMAIN_ID=42 # 范围0-101
undefined

CycloneDDS Tuning (cyclonedds.xml)

CycloneDDS调优(cyclonedds.xml)

xml
<?xml version="1.0" encoding="UTF-8"?>
<CycloneDDS xmlns="https://cdds.io/config">
  <Domain>
    <General>
      <NetworkInterfaceAddress>eth0</NetworkInterfaceAddress>
      <AllowMulticast>false</AllowMulticast>  <!-- Unicast for reliability -->
    </General>
    <Internal>
      <MaxMessageSize>65500</MaxMessageSize>
      <SocketReceiveBufferSize>10MB</SocketReceiveBufferSize>
    </Internal>
    <!-- For large data (images, point clouds) -->
    <Sizing>
      <ReceiveBufferSize>10MB</ReceiveBufferSize>
    </Sizing>
  </Domain>
</CycloneDDS>
bash
export CYCLONEDDS_URI=file:///path/to/cyclonedds.xml
xml
<?xml version="1.0" encoding="UTF-8"?>
<CycloneDDS xmlns="https://cdds.io/config">
  <Domain>
    <General>
      <NetworkInterfaceAddress>eth0</NetworkInterfaceAddress>
      <AllowMulticast>false</AllowMulticast>  <!-- 单播以提高可靠性 -->
    </General>
    <Internal>
      <MaxMessageSize>65500</MaxMessageSize>
      <SocketReceiveBufferSize>10MB</SocketReceiveBufferSize>
    </Internal>
    <!-- For large data (images, point clouds) -->
    <Sizing>
      <ReceiveBufferSize>10MB</ReceiveBufferSize>
    </Sizing>
  </Domain>
</CycloneDDS>
bash
export CYCLONEDDS_URI=file:///path/to/cyclonedds.xml

Build System

构建系统

Workspace Setup and colcon

工作区设置与colcon

bash
undefined
bash
undefined

Create a ROS2 workspace

创建ROS2工作区

mkdir -p ~/ros2_ws/src cd ~/ros2_ws
mkdir -p ~/ros2_ws/src cd ~/ros2_ws

Clone packages into src/

将包克隆到src/

Install dependencies declared in package.xml files

安装package.xml文件中声明的依赖

sudo apt update rosdep update rosdep install --from-paths src --ignore-src -y
sudo apt update rosdep update rosdep install --from-paths src --ignore-src -y

Build the workspace

构建工作区

source /opt/ros/humble/setup.bash # Source the ROS2 underlay FIRST colcon build
source /opt/ros/humble/setup.bash # 先source ROS2底层环境 colcon build

Source the workspace overlay

Source工作区覆盖层

source install/setup.bash

**Essential colcon flags**:
```bash
source install/setup.bash

**必备colcon参数**:
```bash

Build only specific packages (faster iteration)

仅构建特定包(迭代更快)

colcon build --packages-select my_pkg
colcon build --packages-select my_pkg

Build a package and all its dependencies

构建一个包及其所有依赖

colcon build --packages-up-to my_pkg
colcon build --packages-up-to my_pkg

Symlink Python files instead of copying (edit without rebuild)

符号链接Python文件而非拷贝(编辑无需重新构建)

colcon build --symlink-install
colcon build --symlink-install

Parallel jobs (default = nproc, lower if running out of RAM)

并行任务(默认=CPU核心数,内存不足时减少)

colcon build --parallel-workers 4
colcon build --parallel-workers 4

Pass CMake args to all packages

向所有包传递CMake参数

colcon build --cmake-args -DCMAKE_BUILD_TYPE=Release
colcon build --cmake-args -DCMAKE_BUILD_TYPE=Release

Clean build (remove build/ install/ log/ and rebuild)

清理构建(删除build/ install/ log/后重新构建)

rm -rf build/ install/ log/ colcon build
rm -rf build/ install/ log/ colcon build

Build with compiler warnings as errors (CI)

构建时将编译器警告视为错误(CI场景)

colcon build --cmake-args -DCMAKE_CXX_FLAGS="-Wall -Werror"
colcon build --cmake-args -DCMAKE_CXX_FLAGS="-Wall -Werror"

Show build output in real-time (useful for debugging build failures)

实时显示构建输出(调试构建失败时有用)

colcon build --event-handlers console_direct+
undefined
colcon build --event-handlers console_direct+
undefined

Build Types: ament_cmake vs ament_python

构建类型:ament_cmake vs ament_python

Choose based on your package language:
ament_cmake     — C++ packages, mixed C++/Python packages, packages with custom msgs
ament_python    — Pure Python packages (no C++, no custom messages)
根据包的语言选择:
ament_cmake     — C++包、混合C++/Python包、带自定义消息的包
ament_python    — 纯Python包(无C++,无自定义消息)

package.xml — Declaring Dependencies

package.xml — 声明依赖

xml
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd"
            schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
  <name>my_robot_pkg</name>
  <version>0.1.0</version>
  <description>My robot perception package</description>
  <maintainer email="dev@example.com">Dev Name</maintainer>
  <license>Apache-2.0</license>

  <!-- Build tool — determines build type -->
  <buildtool_depend>ament_cmake</buildtool_depend>
  <!-- For pure Python: <buildtool_depend>ament_python</buildtool_depend> -->

  <!-- Build-time dependencies (headers, CMake modules) -->
  <build_depend>rclcpp</build_depend>
  <build_depend>sensor_msgs</build_depend>
  <build_depend>OpenCV</build_depend>

  <!-- Runtime dependencies -->
  <exec_depend>rclcpp</exec_depend>
  <exec_depend>sensor_msgs</exec_depend>
  <exec_depend>rclpy</exec_depend>

  <!-- Shortcut: depend = build_depend + exec_depend -->
  <depend>rclcpp</depend>
  <depend>sensor_msgs</depend>
  <depend>geometry_msgs</depend>
  <depend>tf2_ros</depend>
  <depend>cv_bridge</depend>

  <!-- For custom message generation -->
  <build_depend>rosidl_default_generators</build_depend>
  <exec_depend>rosidl_default_runtime</exec_depend>
  <member_of_group>rosidl_interface_packages</member_of_group>

  <!-- Test dependencies -->
  <test_depend>ament_lint_auto</test_depend>
  <test_depend>ament_cmake_pytest</test_depend>
  <test_depend>launch_testing_ament_cmake</test_depend>

  <export>
    <build_type>ament_cmake</build_type>
  </export>
</package>
xml
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd"
            schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
  <name>my_robot_pkg</name>
  <version>0.1.0</version>
  <description>My robot perception package</description>
  <maintainer email="dev@example.com">Dev Name</maintainer>
  <license>Apache-2.0</license>

  <!-- 构建工具 — 决定构建类型 -->
  <buildtool_depend>ament_cmake</buildtool_depend>
  <!-- 纯Python包:<buildtool_depend>ament_python</buildtool_depend> -->

  <!-- 构建时依赖(头文件、CMake模块) -->
  <build_depend>rclcpp</build_depend>
  <build_depend>sensor_msgs</build_depend>
  <build_depend>OpenCV</build_depend>

  <!-- 运行时依赖 -->
  <exec_depend>rclcpp</exec_depend>
  <exec_depend>sensor_msgs</exec_depend>
  <exec_depend>rclpy</exec_depend>

  <!-- 快捷方式:depend = build_depend + exec_depend -->
  <depend>rclcpp</depend>
  <depend>sensor_msgs</depend>
  <depend>geometry_msgs</depend>
  <depend>tf2_ros</depend>
  <depend>cv_bridge</depend>

  <!-- 自定义消息生成 -->
  <build_depend>rosidl_default_generators</build_depend>
  <exec_depend>rosidl_default_runtime</exec_depend>
  <member_of_group>rosidl_interface_packages</member_of_group>

  <!-- 测试依赖 -->
  <test_depend>ament_lint_auto</test_depend>
  <test_depend>ament_cmake_pytest</test_depend>
  <test_depend>launch_testing_ament_cmake</test_depend>

  <export>
    <build_type>ament_cmake</build_type>
  </export>
</package>

CMakeLists.txt — ament_cmake Package

CMakeLists.txt — ament_cmake包

cmake
cmake_minimum_required(VERSION 3.8)
project(my_robot_pkg)
cmake
cmake_minimum_required(VERSION 3.8)
project(my_robot_pkg)

Default to C++17

默认使用C++17

if(NOT CMAKE_CXX_STANDARD) set(CMAKE_CXX_STANDARD 17) endif()
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") add_compile_options(-Wall -Wextra -Wpedantic) endif()
if(NOT CMAKE_CXX_STANDARD) set(CMAKE_CXX_STANDARD 17) endif()
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") add_compile_options(-Wall -Wextra -Wpedantic) endif()

── Find dependencies ──────────────────────────────────────────

── 查找依赖 ──────────────────────────────────────────

find_package(ament_cmake REQUIRED) find_package(rclcpp REQUIRED) find_package(rclcpp_components REQUIRED) find_package(sensor_msgs REQUIRED) find_package(geometry_msgs REQUIRED) find_package(tf2_ros REQUIRED) find_package(cv_bridge REQUIRED) find_package(OpenCV REQUIRED)
find_package(ament_cmake REQUIRED) find_package(rclcpp REQUIRED) find_package(rclcpp_components REQUIRED) find_package(sensor_msgs REQUIRED) find_package(geometry_msgs REQUIRED) find_package(tf2_ros REQUIRED) find_package(cv_bridge REQUIRED) find_package(OpenCV REQUIRED)

── Custom messages / services / actions ───────────────────────

── 自定义消息/服务/动作 ───────────────────────

find_package(rosidl_default_generators REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME} "msg/Detection.msg" "srv/GetPose.srv" "action/PickPlace.action" DEPENDENCIES geometry_msgs sensor_msgs )
find_package(rosidl_default_generators REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME} "msg/Detection.msg" "srv/GetPose.srv" "action/PickPlace.action" DEPENDENCIES geometry_msgs sensor_msgs )

── Standalone executable node ─────────────────────────────────

── 独立可执行节点 ─────────────────────────────────

add_executable(perception_node src/perception_node.cpp) ament_target_dependencies(perception_node rclcpp sensor_msgs cv_bridge OpenCV tf2_ros ) install(TARGETS perception_node DESTINATION lib/${PROJECT_NAME} )
add_executable(perception_node src/perception_node.cpp) ament_target_dependencies(perception_node rclcpp sensor_msgs cv_bridge OpenCV tf2_ros ) install(TARGETS perception_node DESTINATION lib/${PROJECT_NAME} )

── Component (composable node) ────────────────────────────────

── 组件(可组合节点) ────────────────────────────────

add_library(perception_component SHARED src/perception_component.cpp ) ament_target_dependencies(perception_component rclcpp rclcpp_components sensor_msgs cv_bridge OpenCV )
add_library(perception_component SHARED src/perception_component.cpp ) ament_target_dependencies(perception_component rclcpp rclcpp_components sensor_msgs cv_bridge OpenCV )

Register as a composable node

注册为可组合节点

rclcpp_components_register_node(perception_component PLUGIN "my_robot_pkg::PerceptionComponent" EXECUTABLE perception_component_node ) install(TARGETS perception_component ARCHIVE DESTINATION lib LIBRARY DESTINATION lib RUNTIME DESTINATION bin )
rclcpp_components_register_node(perception_component PLUGIN "my_robot_pkg::PerceptionComponent" EXECUTABLE perception_component_node ) install(TARGETS perception_component ARCHIVE DESTINATION lib LIBRARY DESTINATION lib RUNTIME DESTINATION bin )

── Install Python nodes ───────────────────────────────────────

── 安装Python节点 ───────────────────────────────────────

install(PROGRAMS scripts/planning_node.py DESTINATION lib/${PROJECT_NAME} )
install(PROGRAMS scripts/planning_node.py DESTINATION lib/${PROJECT_NAME} )

── Install launch, config, rviz, urdf ─────────────────────────

── 安装启动文件、配置、rviz、urdf ─────────────────────────

install(DIRECTORY launch config rviz urdf DESTINATION share/${PROJECT_NAME} )
install(DIRECTORY launch config rviz urdf DESTINATION share/${PROJECT_NAME} )

── Install headers ────────────────────────────────────────────

── 安装头文件 ────────────────────────────────────────────

install(DIRECTORY include/ DESTINATION include )
install(DIRECTORY include/ DESTINATION include )

── Tests ──────────────────────────────────────────────────────

── 测试 ──────────────────────────────────────────────────────

if(BUILD_TESTING) find_package(ament_lint_auto REQUIRED) ament_lint_auto_find_test_dependencies()
find_package(ament_cmake_pytest REQUIRED) ament_add_pytest_test(test_perception test/test_perception.py)
find_package(launch_testing_ament_cmake REQUIRED) add_launch_test(test/test_integration.py) endif()
ament_package()
undefined
if(BUILD_TESTING) find_package(ament_lint_auto REQUIRED) ament_lint_auto_find_test_dependencies()
find_package(ament_cmake_pytest REQUIRED) ament_add_pytest_test(test_perception test/test_perception.py)
find_package(launch_testing_ament_cmake REQUIRED) add_launch_test(test/test_integration.py) endif()
ament_package()
undefined

setup.py / setup.cfg — Pure Python Package

setup.py / setup.cfg — 纯Python包

python
undefined
python
undefined

setup.py (for ament_python packages)

setup.py(适用于ament_python包)

from setuptools import find_packages, setup
package_name = 'my_python_pkg'
setup( name=package_name, version='0.1.0', packages=find_packages(exclude=['test']), data_files=[ # Register with ament index ('share/ament_index/resource_index/packages', ['resource/' + package_name]), # Package manifest ('share/' + package_name, ['package.xml']), # Launch files ('share/' + package_name + '/launch', ['launch/robot.launch.py']), # Config files ('share/' + package_name + '/config', ['config/params.yaml']), ], install_requires=['setuptools'], zip_safe=True, maintainer='Dev Name', maintainer_email='dev@example.com', description='My Python robot package', license='Apache-2.0', entry_points={ 'console_scripts': [ # format: 'executable_name = package.module:function' 'perception_node = my_python_pkg.perception_node:main', 'planner_node = my_python_pkg.planner_node:main', ], }, )

```cfg
from setuptools import find_packages, setup
package_name = 'my_python_pkg'
setup( name=package_name, version='0.1.0', packages=find_packages(exclude=['test']), data_files=[ # 向ament索引注册 ('share/ament_index/resource_index/packages', ['resource/' + package_name]), # 包清单 ('share/' + package_name, ['package.xml']), # 启动文件 ('share/' + package_name + '/launch', ['launch/robot.launch.py']), # 配置文件 ('share/' + package_name + '/config', ['config/params.yaml']), ], install_requires=['setuptools'], zip_safe=True, maintainer='Dev Name', maintainer_email='dev@example.com', description='My Python robot package', license='Apache-2.0', entry_points={ 'console_scripts': [ # 格式: '可执行名称 = 包.模块:函数' 'perception_node = my_python_pkg.perception_node:main', 'planner_node = my_python_pkg.planner_node:main', ], }, )

```cfg

setup.cfg

setup.cfg

[develop] script_dir=$base/lib/my_python_pkg
[install] install_scripts=$base/lib/my_python_pkg
undefined
[develop] script_dir=$base/lib/my_python_pkg
[install] install_scripts=$base/lib/my_python_pkg
undefined

Custom Message, Service, and Action Definitions

自定义消息、服务和动作定义

undefined
undefined

msg/Detection.msg

msg/Detection.msg

std_msgs/Header header string class_name float32 confidence geometry_msgs/Pose pose float32[4] bbox # [x_min, y_min, x_max, y_max]
undefined
std_msgs/Header header string class_name float32 confidence geometry_msgs/Pose pose float32[4] bbox # [x_min, y_min, x_max, y_max]
undefined

srv/GetPose.srv

srv/GetPose.srv

string object_name

bool success geometry_msgs/PoseStamped pose string error_message
undefined

string object_name

bool success geometry_msgs/PoseStamped pose string error_message
undefined

action/PickPlace.action

action/PickPlace.action

Goal

目标

geometry_msgs/Pose target_pose string object_class

geometry_msgs/Pose target_pose string object_class

Result

结果

bool success string error_message

bool success string error_message

Feedback

反馈

float32 progress string current_phase
undefined
float32 progress string current_phase
undefined

Workspace Overlays

工作区覆盖层

Underlay (base ROS2)         /opt/ros/humble/
Overlay 1 (shared libs)     ~/ros2_ws/install/
Overlay 2 (your dev pkg)    ~/dev_ws/install/

Source order matters — LAST sourced overlay wins for duplicate packages.
bash
undefined
底层(基础ROS2)         /opt/ros/humble/
覆盖层1(共享库)     ~/ros2_ws/install/
覆盖层2(你的开发包)    ~/dev_ws/install/

Source顺序很重要 —— 最后source的覆盖层在包重复时优先。
bash
undefined

Correct source order

正确的source顺序

source /opt/ros/humble/setup.bash # Base source ~/ros2_ws/install/setup.bash # Shared workspace source ~/dev_ws/install/setup.bash # Your development overlay
source /opt/ros/humble/setup.bash # 基础环境 source ~/ros2_ws/install/setup.bash # 共享工作区 source ~/dev_ws/install/setup.bash # 你的开发覆盖层

NEVER source setup.bash from build/ — always use install/

绝不要source build/下的setup.bash —— 始终使用install/

undefined
undefined

Build Troubleshooting

构建故障排查

bash
undefined
bash
undefined

"Package not found" during build

构建时提示“Package not found”

→ Missing dependency. Check package.xml and run:

→ 缺少依赖。检查package.xml并运行:

rosdep install --from-paths src --ignore-src -y
rosdep install --from-paths src --ignore-src -y

"Could not find a package configuration file provided by X"

“Could not find a package configuration file provided by X”

→ CMake can't find the package. Did you source the underlay?

→ CMake找不到包。是否source了底层环境?

source /opt/ros/humble/setup.bash
source /opt/ros/humble/setup.bash

Build succeeds but node can't be found at runtime

构建成功但运行时找不到节点

→ Forgot to source the overlay, or entry_points misconfigured

→ 忘记source覆盖层,或entry_points配置错误

source install/setup.bash ros2 pkg list | grep my_pkg # Should appear ros2 pkg executables my_pkg # List available executables
source install/setup.bash ros2 pkg list | grep my_pkg # 应显示该包 ros2 pkg executables my_pkg # 列出可用可执行文件

Python changes not reflected after rebuild

Python修改后重新构建未生效

→ Use --symlink-install, or clean and rebuild

→ 使用--symlink-install,或清理后重新构建

colcon build --packages-select my_pkg --symlink-install
colcon build --packages-select my_pkg --symlink-install

"Multiple packages with the same name"

“Multiple packages with the same name”

→ Duplicate package in workspace. Check with:

→ 工作区中存在重复包。使用以下命令检查:

colcon list --packages-select my_pkg
colcon list --packages-select my_pkg

Build runs out of memory (large C++ packages)

构建内存不足(大型C++包)

colcon build --parallel-workers 2 --executor sequential
colcon build --parallel-workers 2 --executor sequential

Custom messages not found by Python nodes

Python节点找不到自定义消息

→ Missing rosidl_default_runtime in package.xml exec_depend

→ package.xml的exec_depend中缺少rosidl_default_runtime

→ Or forgot to source install/setup.bash after building msgs

→ 或构建消息后忘记source install/setup.bash

undefined
undefined

Package Structure (ROS2)

ROS2包结构

my_robot_pkg/
├── CMakeLists.txt              # Or setup.py for pure Python
├── package.xml
├── my_robot_pkg/               # Python module (same name as package)
│   ├── __init__.py
│   ├── perception_node.py
│   └── utils/
│       └── transforms.py
├── src/                        # C++ source
│   └── perception_component.cpp
├── include/my_robot_pkg/       # C++ headers
│   └── perception_component.hpp
├── config/
│   ├── robot_params.yaml
│   └── cyclonedds.xml
├── launch/
│   ├── robot.launch.py
│   └── perception.launch.py
├── msg/
│   └── Detection.msg
├── srv/
│   └── GetPose.srv
├── action/
│   └── PickPlace.action
├── rviz/
│   └── robot.rviz
├── urdf/
│   └── robot.urdf.xacro
└── test/
    ├── test_perception.py      # pytest
    └── test_integration.py     # launch_testing
my_robot_pkg/
├── CMakeLists.txt              # 纯Python包为setup.py
├── package.xml
├── my_robot_pkg/               # Python模块(与包同名)
│   ├── __init__.py
│   ├── perception_node.py
│   └── utils/
│       └── transforms.py
├── src/                        # C++源码
│   └── perception_component.cpp
├── include/my_robot_pkg/       # C++头文件
│   └── perception_component.hpp
├── config/
│   ├── robot_params.yaml
│   └── cyclonedds.xml
├── launch/
│   ├── robot.launch.py
│   └── perception.launch.py
├── msg/
│   └── Detection.msg
├── srv/
│   └── GetPose.srv
├── action/
│   └── PickPlace.action
├── rviz/
│   └── robot.rviz
├── urdf/
│   └── robot.urdf.xacro
└── test/
    ├── test_perception.py      # pytest
    └── test_integration.py     # launch_testing

Debugging Toolkit

调试工具包

bash
undefined
bash
undefined

Topic inspection

话题检查

ros2 topic list ros2 topic info /camera/image_raw -v # Shows QoS details ros2 topic hz /camera/image_raw ros2 topic bw /camera/image_raw ros2 topic echo /joint_states --once
ros2 topic list ros2 topic info /camera/image_raw -v # 显示QoS详情 ros2 topic hz /camera/image_raw ros2 topic bw /camera/image_raw ros2 topic echo /joint_states --once

Node inspection

节点检查

ros2 node list ros2 node info /perception
ros2 node list ros2 node info /perception

Parameter management

参数管理

ros2 param list /perception ros2 param get /perception confidence_threshold ros2 param set /perception confidence_threshold 0.8 # Runtime change!
ros2 param list /perception ros2 param get /perception confidence_threshold ros2 param set /perception confidence_threshold 0.8 # 运行时修改!

Lifecycle management

生命周期管理

ros2 lifecycle list /managed_perception ros2 lifecycle set /managed_perception configure ros2 lifecycle set /managed_perception activate
ros2 lifecycle list /managed_perception ros2 lifecycle set /managed_perception configure ros2 lifecycle set /managed_perception activate

Service calls

服务调用

ros2 service list ros2 service call /get_pose my_interfaces/srv/GetPose "{}"
ros2 service list ros2 service call /get_pose my_interfaces/srv/GetPose "{}"

Action monitoring

动作监控

ros2 action list ros2 action info /pick_place ros2 action send_goal /pick_place my_interfaces/action/PickPlace "{target_pose: {x: 1.0}}"
ros2 action list ros2 action info /pick_place ros2 action send_goal /pick_place my_interfaces/action/PickPlace "{target_pose: {x: 1.0}}"

Bag recording (ROS2 style)

包录制(ROS2风格)

ros2 bag record -a # All topics ros2 bag record /camera/image /tf # Specific topics ros2 bag record -s mcap /camera/image # MCAP format (recommended) ros2 bag info recording/ # Inspect ros2 bag play recording/ --clock # Playback
ros2 bag record -a # 所有话题 ros2 bag record /camera/image /tf # 指定话题 ros2 bag record -s mcap /camera/image # MCAP格式(推荐) ros2 bag info recording/ # 检查 ros2 bag play recording/ --clock # 回放

DDS debugging

DDS调试

ros2 doctor # System diagnostics ros2 daemon stop && ros2 daemon start # Reset discovery daemon
undefined
ros2 doctor # 系统诊断 ros2 daemon stop && ros2 daemon start # 重置发现守护进程
undefined

Production Deployment Checklist

生产部署检查清单

  1. Use lifecycle nodes for all critical components
  2. Set
    ROS_LOCALHOST_ONLY=1
    if not communicating across machines
  3. Pin your DDS implementation (CycloneDDS recommended)
  4. Configure QoS explicitly — never rely on defaults for production
  5. Set
    ROS_DOMAIN_ID
    to isolate your robot from others on the network
  6. Enable ROS2 security (SROS2) for authenticated communication
  7. Use composition for nodes that exchange large data
  8. Record bags in MCAP format — better tooling, random access, compression
  9. Set up launch-testing for integration tests
  10. Use
    ros2 doctor
    as part of your health check pipeline
  1. 为所有关键组件使用生命周期节点
  2. 设置
    ROS_LOCALHOST_ONLY=1
    (如果不需要跨机器通信)
  3. 固定DDS实现(推荐CycloneDDS)
  4. 显式配置QoS —— 生产环境绝不依赖默认值
  5. 设置
    ROS_DOMAIN_ID
    (隔离同一网络上的机器人)
  6. 启用ROS2安全(SROS2)以实现认证通信
  7. 对交换大数据的节点使用组件
  8. 使用MCAP格式录制包 —— 工具更完善,支持随机访问和压缩
  9. 设置启动测试用于集成测试
  10. ros2 doctor
    作为健康检查流程的一部分