robotics-design-patterns

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Robotics Design Patterns

机器人设计模式

When to Use This Skill

什么时候使用该技能

  • Designing robot software architecture from scratch
  • Choosing between behavior trees, FSMs, or hybrid approaches
  • Structuring perception → planning → control pipelines
  • Implementing safety systems and watchdogs
  • Building hardware abstraction layers (HAL)
  • Designing for sim-to-real transfer
  • Architecting multi-robot / fleet systems
  • Making real-time vs. non-real-time tradeoffs
  • 从零开始设计机器人软件架构
  • 在behavior trees、FSM或是混合方案之间做选型
  • 搭建感知 → 规划 → 控制管线
  • 实现安全系统和watchdogs
  • 构建hardware abstraction layers (HAL)
  • 为sim-to-real迁移做架构设计
  • 搭建多机器人/集群系统架构
  • 权衡实时与非实时需求

Pattern 1: The Robot Software Stack

模式1:机器人软件栈

Every robot system follows this layered architecture, regardless of complexity:
┌─────────────────────────────────────────────┐
│               APPLICATION LAYER              │
│    Mission planning, task allocation, UI     │
├─────────────────────────────────────────────┤
│              BEHAVIORAL LAYER                │
│  Behavior trees, FSMs, decision-making       │
├─────────────────────────────────────────────┤
│             FUNCTIONAL LAYER                 │
│  Perception, Planning, Control, Estimation   │
├─────────────────────────────────────────────┤
│           COMMUNICATION LAYER                │
│     ROS2, DDS, shared memory, IPC            │
├─────────────────────────────────────────────┤
│          HARDWARE ABSTRACTION LAYER          │
│    Drivers, sensor interfaces, actuators     │
├─────────────────────────────────────────────┤
│              HARDWARE LAYER                  │
│    Cameras, LiDARs, motors, grippers, IMUs   │
└─────────────────────────────────────────────┘
Design Rule: Information flows UP through perception, decisions flow DOWN through control. Never let the application layer directly command hardware.
所有机器人系统无论复杂度高低,都遵循这个分层架构:
┌─────────────────────────────────────────────┐
│               APPLICATION LAYER              │
│    Mission planning, task allocation, UI     │
├─────────────────────────────────────────────┤
│              BEHAVIORAL LAYER                │
│  Behavior trees, FSMs, decision-making       │
├─────────────────────────────────────────────┤
│             FUNCTIONAL LAYER                 │
│  Perception, Planning, Control, Estimation   │
├─────────────────────────────────────────────┤
│           COMMUNICATION LAYER                │
│     ROS2, DDS, shared memory, IPC            │
├─────────────────────────────────────────────┤
│          HARDWARE ABSTRACTION LAYER          │
│    Drivers, sensor interfaces, actuators     │
├─────────────────────────────────────────────┤
│              HARDWARE LAYER                  │
│    Cameras, LiDARs, motors, grippers, IMUs   │
└─────────────────────────────────────────────┘
设计规则:信息通过感知链路向上流动,决策通过控制链路向下流动,绝对禁止应用层直接操控硬件。

Pattern 2: Behavior Trees (BT)

模式2:行为树(BT)

Behavior trees are the recommended default for robot decision-making. They're modular, reusable, and easier to debug than FSMs for complex behaviors.
行为树是机器人决策系统的推荐默认方案,相较于FSM,它模块化程度高、可复用性强,复杂行为的调试成本更低。

Core Node Types

核心节点类型

Sequence (→)     : Execute children left-to-right, FAIL on first failure
Fallback (?)     : Execute children left-to-right, SUCCEED on first success
Parallel (⇉)     : Execute all children simultaneously
Decorator        : Modify a single child's behavior
Action (leaf)    : Execute a robot action
Condition (leaf) : Check a condition (no side effects)
Sequence (→)     : Execute children left-to-right, FAIL on first failure
Fallback (?)     : Execute children left-to-right, SUCCEED on first success
Parallel (⇉)     : Execute all children simultaneously
Decorator        : Modify a single child's behavior
Action (leaf)    : Execute a robot action
Condition (leaf) : Check a condition (no side effects)

Example: Pick-and-Place BT

示例:抓取-放置行为树

                    → Sequence
                   /    |      \
            → Check     → Pick     → Place
           /    \      /   |  \     /  |  \
       Battery  Obj  Open  Move  Close Move Open Release
       OK?    Found? Grip  To    Grip  To   Grip
                      per  Obj   per   Goal per
                    → Sequence
                   /    |      \
            → Check     → Pick     → Place
           /    \      /   |  \     /  |  \
       Battery  Obj  Open  Move  Close Move Open Release
       OK?    Found? Grip  To    Grip  To   Grip
                      per  Obj   per   Goal per

Implementation Pattern

实现模式

python
import py_trees

class MoveToTarget(py_trees.behaviour.Behaviour):
    """Action node: Move robot to a target pose"""

    def __init__(self, name, target_key="target_pose"):
        super().__init__(name)
        self.target_key = target_key
        self.action_client = None

    def setup(self, **kwargs):
        """Called once when tree is set up — initialize resources"""
        self.node = kwargs.get('node')  # ROS2 node
        self.action_client = ActionClient(
            self.node, MoveBase, 'move_base')

    def initialise(self):
        """Called when this node first ticks — send the goal"""
        bb = self.blackboard
        target = bb.get(self.target_key)
        self.goal_handle = self.action_client.send_goal(target)
        self.logger.info(f"Moving to {target}")

    def update(self):
        """Called every tick — check progress"""
        if self.goal_handle is None:
            return py_trees.common.Status.FAILURE

        status = self.goal_handle.status
        if status == GoalStatus.STATUS_SUCCEEDED:
            return py_trees.common.Status.SUCCESS
        elif status == GoalStatus.STATUS_ABORTED:
            return py_trees.common.Status.FAILURE
        else:
            return py_trees.common.Status.RUNNING

    def terminate(self, new_status):
        """Called when node exits — cancel if preempted"""
        if new_status == py_trees.common.Status.INVALID:
            if self.goal_handle:
                self.goal_handle.cancel_goal()
                self.logger.info("Movement cancelled")
python
import py_trees

class MoveToTarget(py_trees.behaviour.Behaviour):
    """Action node: Move robot to a target pose"""

    def __init__(self, name, target_key="target_pose"):
        super().__init__(name)
        self.target_key = target_key
        self.action_client = None

    def setup(self, **kwargs):
        """Called once when tree is set up — initialize resources"""
        self.node = kwargs.get('node')  # ROS2 node
        self.action_client = ActionClient(
            self.node, MoveBase, 'move_base')

    def initialise(self):
        """Called when this node first ticks — send the goal"""
        bb = self.blackboard
        target = bb.get(self.target_key)
        self.goal_handle = self.action_client.send_goal(target)
        self.logger.info(f"Moving to {target}")

    def update(self):
        """Called every tick — check progress"""
        if self.goal_handle is None:
            return py_trees.common.Status.FAILURE

        status = self.goal_handle.status
        if status == GoalStatus.STATUS_SUCCEEDED:
            return py_trees.common.Status.SUCCESS
        elif status == GoalStatus.STATUS_ABORTED:
            return py_trees.common.Status.FAILURE
        else:
            return py_trees.common.Status.RUNNING

    def terminate(self, new_status):
        """Called when node exits — cancel if preempted"""
        if new_status == py_trees.common.Status.INVALID:
            if self.goal_handle:
                self.goal_handle.cancel_goal()
                self.logger.info("Movement cancelled")

Build the tree

Build the tree

def create_pick_place_tree(): root = py_trees.composites.Sequence("PickAndPlace", memory=True)
# Safety checks (Fallback: if any fails, abort)
safety = py_trees.composites.Sequence("SafetyChecks", memory=False)
safety.add_children([
    CheckBattery("BatteryOK", threshold=20.0),
    CheckEStop("EStopClear"),
])

pick = py_trees.composites.Sequence("Pick", memory=True)
pick.add_children([
    DetectObject("FindObject"),
    MoveToTarget("ApproachObject", target_key="object_pose"),
    GripperCommand("CloseGripper", action="close"),
])

place = py_trees.composites.Sequence("Place", memory=True)
place.add_children([
    MoveToTarget("MoveToPlace", target_key="place_pose"),
    GripperCommand("OpenGripper", action="open"),
])

root.add_children([safety, pick, place])
return root
undefined
def create_pick_place_tree(): root = py_trees.composites.Sequence("PickAndPlace", memory=True)
# Safety checks (Fallback: if any fails, abort)
safety = py_trees.composites.Sequence("SafetyChecks", memory=False)
safety.add_children([
    CheckBattery("BatteryOK", threshold=20.0),
    CheckEStop("EStopClear"),
])

pick = py_trees.composites.Sequence("Pick", memory=True)
pick.add_children([
    DetectObject("FindObject"),
    MoveToTarget("ApproachObject", target_key="object_pose"),
    GripperCommand("CloseGripper", action="close"),
])

place = py_trees.composites.Sequence("Place", memory=True)
place.add_children([
    MoveToTarget("MoveToPlace", target_key="place_pose"),
    GripperCommand("OpenGripper", action="open"),
])

root.add_children([safety, pick, place])
return root
undefined

Blackboard Pattern

黑板模式

python
undefined
python
undefined

The Blackboard is the shared memory for BT nodes

The Blackboard is the shared memory for BT nodes

bb = py_trees.blackboard.Blackboard()
bb = py_trees.blackboard.Blackboard()

Perception nodes WRITE to blackboard

Perception nodes WRITE to blackboard

class DetectObject(py_trees.behaviour.Behaviour): def update(self): detections = self.perception.detect() if detections: self.blackboard.set("object_pose", detections[0].pose) self.blackboard.set("object_class", detections[0].label) return Status.SUCCESS return Status.FAILURE
class DetectObject(py_trees.behaviour.Behaviour): def update(self): detections = self.perception.detect() if detections: self.blackboard.set("object_pose", detections[0].pose) self.blackboard.set("object_class", detections[0].label) return Status.SUCCESS return Status.FAILURE

Action nodes READ from blackboard

Action nodes READ from blackboard

class MoveToTarget(py_trees.behaviour.Behaviour): def initialise(self): target = self.blackboard.get("object_pose") self.send_goal(target)
undefined
class MoveToTarget(py_trees.behaviour.Behaviour): def initialise(self): target = self.blackboard.get("object_pose") self.send_goal(target)
undefined

Pattern 3: Finite State Machines (FSM)

模式3:有限状态机(FSM)

Use FSMs for simple, well-defined sequential behaviors with clear states. Prefer BTs for anything complex.
python
from enum import Enum, auto
import smach  # ROS state machine library

class RobotState(Enum):
    IDLE = auto()
    NAVIGATING = auto()
    PICKING = auto()
    PLACING = auto()
    ERROR = auto()
    CHARGING = auto()
FSM适用于逻辑清晰、定义明确的简单顺序行为场景,复杂场景优先使用BT。
python
from enum import Enum, auto
import smach  # ROS state machine library

class RobotState(Enum):
    IDLE = auto()
    NAVIGATING = auto()
    PICKING = auto()
    PLACING = auto()
    ERROR = auto()
    CHARGING = auto()

SMACH implementation

SMACH implementation

class NavigateState(smach.State): def init(self): smach.State.init(self, outcomes=['succeeded', 'aborted', 'preempted'], input_keys=['target_pose'], output_keys=['final_pose'])
def execute(self, userdata):
    # Navigation logic
    result = navigate_to(userdata.target_pose)
    if result.success:
        userdata.final_pose = result.pose
        return 'succeeded'
    return 'aborted'
class NavigateState(smach.State): def init(self): smach.State.init(self, outcomes=['succeeded', 'aborted', 'preempted'], input_keys=['target_pose'], output_keys=['final_pose'])
def execute(self, userdata):
    # Navigation logic
    result = navigate_to(userdata.target_pose)
    if result.success:
        userdata.final_pose = result.pose
        return 'succeeded'
    return 'aborted'

Build state machine

Build state machine

sm = smach.StateMachine(outcomes=['done', 'failed']) with sm: smach.StateMachine.add('NAVIGATE', NavigateState(), transitions={'succeeded': 'PICK', 'aborted': 'ERROR'}) smach.StateMachine.add('PICK', PickState(), transitions={'succeeded': 'PLACE', 'aborted': 'ERROR'}) smach.StateMachine.add('PLACE', PlaceState(), transitions={'succeeded': 'done', 'aborted': 'ERROR'}) smach.StateMachine.add('ERROR', ErrorRecovery(), transitions={'recovered': 'NAVIGATE', 'fatal': 'failed'})

**When to use FSM vs BT**:
- FSM: Linear workflows, simple devices, UI states, protocol implementations
- BT: Complex robots, reactive behaviors, many conditional branches, reusable sub-behaviors
sm = smach.StateMachine(outcomes=['done', 'failed']) with sm: smach.StateMachine.add('NAVIGATE', NavigateState(), transitions={'succeeded': 'PICK', 'aborted': 'ERROR'}) smach.StateMachine.add('PICK', PickState(), transitions={'succeeded': 'PLACE', 'aborted': 'ERROR'}) smach.StateMachine.add('PLACE', PlaceState(), transitions={'succeeded': 'done', 'aborted': 'ERROR'}) smach.StateMachine.add('ERROR', ErrorRecovery(), transitions={'recovered': 'NAVIGATE', 'fatal': 'failed'})

**FSM与BT选型对比**:
- FSM:线性工作流、简单设备、UI状态、协议实现
- BT:复杂机器人、响应式行为、多条件分支、可复用子行为

Pattern 4: Perception Pipeline

模式4:感知管线

Raw Sensors → Preprocessing → Detection/Estimation → Fusion → World Model
Raw Sensors → Preprocessing → Detection/Estimation → Fusion → World Model

Sensor Fusion Architecture

传感器融合架构

python
class SensorFusion:
    """Multi-sensor fusion using a central world model"""

    def __init__(self):
        self.world_model = WorldModel()
        self.filters = {
            'pose': ExtendedKalmanFilter(state_dim=6),
            'objects': MultiObjectTracker(),
        }

    def update_from_camera(self, detections, timestamp):
        """Camera provides object detections with high latency"""
        for det in detections:
            self.filters['objects'].update(
                det, sensor='camera',
                uncertainty=det.confidence,
                timestamp=timestamp
            )

    def update_from_lidar(self, points, timestamp):
        """LiDAR provides precise geometry with lower latency"""
        clusters = self.segment_points(points)
        for cluster in clusters:
            self.filters['objects'].update(
                cluster, sensor='lidar',
                uncertainty=0.02,  # 2cm typical LiDAR accuracy
                timestamp=timestamp
            )

    def update_from_imu(self, imu_data, timestamp):
        """IMU provides high-frequency attitude estimates"""
        self.filters['pose'].predict(imu_data, dt=timestamp - self.last_imu_t)
        self.last_imu_t = timestamp

    def get_world_state(self):
        """Query the fused world model"""
        return WorldState(
            robot_pose=self.filters['pose'].state,
            objects=self.filters['objects'].get_tracked_objects(),
            confidence=self.filters['objects'].get_confidence_map()
        )
python
class SensorFusion:
    """Multi-sensor fusion using a central world model"""

    def __init__(self):
        self.world_model = WorldModel()
        self.filters = {
            'pose': ExtendedKalmanFilter(state_dim=6),
            'objects': MultiObjectTracker(),
        }

    def update_from_camera(self, detections, timestamp):
        """Camera provides object detections with high latency"""
        for det in detections:
            self.filters['objects'].update(
                det, sensor='camera',
                uncertainty=det.confidence,
                timestamp=timestamp
            )

    def update_from_lidar(self, points, timestamp):
        """LiDAR provides precise geometry with lower latency"""
        clusters = self.segment_points(points)
        for cluster in clusters:
            self.filters['objects'].update(
                cluster, sensor='lidar',
                uncertainty=0.02,  # 2cm typical LiDAR accuracy
                timestamp=timestamp
            )

    def update_from_imu(self, imu_data, timestamp):
        """IMU provides high-frequency attitude estimates"""
        self.filters['pose'].predict(imu_data, dt=timestamp - self.last_imu_t)
        self.last_imu_t = timestamp

    def get_world_state(self):
        """Query the fused world model"""
        return WorldState(
            robot_pose=self.filters['pose'].state,
            objects=self.filters['objects'].get_tracked_objects(),
            confidence=self.filters['objects'].get_confidence_map()
        )

The Perception-Action Loop Timing

感知-行动循环时序

Camera (30Hz)  ─┐
LiDAR (10Hz)   ─┼──→ Fusion (50Hz) ──→ Planner (10Hz) ──→ Controller (100Hz+)
IMU (200Hz)    ─┘

RULE: Controller frequency > Planner frequency > Sensor frequency
      This ensures smooth execution despite variable perception latency.
Camera (30Hz)  ─┐
LiDAR (10Hz)   ─┼──→ Fusion (50Hz) ──→ Planner (10Hz) ──→ Controller (100Hz+)
IMU (200Hz)    ─┘

RULE: Controller frequency > Planner frequency > Sensor frequency
      This ensures smooth execution despite variable perception latency.
规则:控制器频率 > 规划器频率 > 传感器频率,即使感知存在延迟波动,也能保证执行流畅。

Pattern 5: Hardware Abstraction Layer (HAL)

模式5:硬件抽象层(HAL)

Never let application code talk directly to hardware. Always go through an abstraction layer.
python
from abc import ABC, abstractmethod

class GripperInterface(ABC):
    """Abstract gripper interface — implement for each hardware type"""

    @abstractmethod
    def open(self, width: float = 1.0) -> bool: ...

    @abstractmethod
    def close(self, force: float = 0.5) -> bool: ...

    @abstractmethod
    def get_state(self) -> GripperState: ...

    @abstractmethod
    def get_width(self) -> float: ...


class RobotiqGripper(GripperInterface):
    """Concrete implementation for Robotiq 2F-85"""
    def __init__(self, port='/dev/ttyUSB0'):
        self.serial = serial.Serial(port, 115200)
        # ... Modbus RTU setup

    def close(self, force=0.5):
        cmd = self._build_modbus_cmd(force=int(force * 255))
        self.serial.write(cmd)
        return self._wait_for_completion()


class SimulatedGripper(GripperInterface):
    """Simulation gripper for testing"""
    def __init__(self):
        self.width = 0.085  # 85mm open
        self.state = GripperState.OPEN

    def close(self, force=0.5):
        self.width = 0.0
        self.state = GripperState.CLOSED
        return True
绝对禁止应用代码直接访问硬件,所有操作必须经过抽象层。
python
from abc import ABC, abstractmethod

class GripperInterface(ABC):
    """Abstract gripper interface — implement for each hardware type"""

    @abstractmethod
    def open(self, width: float = 1.0) -> bool: ...

    @abstractmethod
    def close(self, force: float = 0.5) -> bool: ...

    @abstractmethod
    def get_state(self) -> GripperState: ...

    @abstractmethod
    def get_width(self) -> float: ...


class RobotiqGripper(GripperInterface):
    """Concrete implementation for Robotiq 2F-85"""
    def __init__(self, port='/dev/ttyUSB0'):
        self.serial = serial.Serial(port, 115200)
        # ... Modbus RTU setup

    def close(self, force=0.5):
        cmd = self._build_modbus_cmd(force=int(force * 255))
        self.serial.write(cmd)
        return self._wait_for_completion()


class SimulatedGripper(GripperInterface):
    """Simulation gripper for testing"""
    def __init__(self):
        self.width = 0.085  # 85mm open
        self.state = GripperState.OPEN

    def close(self, force=0.5):
        self.width = 0.0
        self.state = GripperState.CLOSED
        return True

Factory pattern for hardware instantiation

Factory pattern for hardware instantiation

def create_gripper(config: dict) -> GripperInterface: gripper_type = config.get('type', 'simulated') if gripper_type == 'robotiq': return RobotiqGripper(port=config['port']) elif gripper_type == 'simulated': return SimulatedGripper() else: raise ValueError(f"Unknown gripper type: {gripper_type}")
undefined
def create_gripper(config: dict) -> GripperInterface: gripper_type = config.get('type', 'simulated') if gripper_type == 'robotiq': return RobotiqGripper(port=config['port']) elif gripper_type == 'simulated': return SimulatedGripper() else: raise ValueError(f"Unknown gripper type: {gripper_type}")
undefined

Pattern 6: Safety Systems

模式6:安全系统

The Safety Hierarchy

安全层级

Level 0: Hardware E-Stop (physical button, cuts power)
Level 1: Safety-rated controller (SIL2/SIL3, hardware watchdog)
Level 2: Software watchdog (monitors heartbeats, enforces limits)
Level 3: Application safety (collision avoidance, workspace limits)
Level 0: Hardware E-Stop (physical button, cuts power)
Level 1: Safety-rated controller (SIL2/SIL3, hardware watchdog)
Level 2: Software watchdog (monitors heartbeats, enforces limits)
Level 3: Application safety (collision avoidance, workspace limits)

Software Watchdog Pattern

软件看门狗模式

python
import threading
import time

class SafetyWatchdog:
    """Monitors system health and triggers safe stop on failures"""

    def __init__(self, timeout_ms=500):
        self.timeout = timeout_ms / 1000.0
        self.heartbeats = {}
        self.lock = threading.Lock()
        self.safe_stop_triggered = False

        # Start monitoring thread
        self.monitor_thread = threading.Thread(
            target=self._monitor_loop, daemon=True)
        self.monitor_thread.start()

    def register_component(self, name: str, critical: bool = True):
        """Register a component that must send heartbeats"""
        with self.lock:
            self.heartbeats[name] = {
                'last_beat': time.monotonic(),
                'critical': critical,
                'alive': True
            }

    def heartbeat(self, name: str):
        """Called by components to signal they're alive"""
        with self.lock:
            if name in self.heartbeats:
                self.heartbeats[name]['last_beat'] = time.monotonic()
                self.heartbeats[name]['alive'] = True

    def _monitor_loop(self):
        while True:
            now = time.monotonic()
            with self.lock:
                for name, info in self.heartbeats.items():
                    elapsed = now - info['last_beat']
                    if elapsed > self.timeout and info['alive']:
                        info['alive'] = False
                        if info['critical']:
                            self._trigger_safe_stop(
                                f"Critical component '{name}' "
                                f"timed out ({elapsed:.1f}s)")
            time.sleep(self.timeout / 4)

    def _trigger_safe_stop(self, reason: str):
        if not self.safe_stop_triggered:
            self.safe_stop_triggered = True
            logger.critical(f"SAFE STOP: {reason}")
            self._execute_safe_stop()

    def _execute_safe_stop(self):
        """Bring robot to a safe state"""
        # 1. Stop all motion (zero velocity command)
        # 2. Engage brakes
        # 3. Publish emergency state to all nodes
        # 4. Log the event
        pass
python
import threading
import time

class SafetyWatchdog:
    """Monitors system health and triggers safe stop on failures"""

    def __init__(self, timeout_ms=500):
        self.timeout = timeout_ms / 1000.0
        self.heartbeats = {}
        self.lock = threading.Lock()
        self.safe_stop_triggered = False

        # Start monitoring thread
        self.monitor_thread = threading.Thread(
            target=self._monitor_loop, daemon=True)
        self.monitor_thread.start()

    def register_component(self, name: str, critical: bool = True):
        """Register a component that must send heartbeats"""
        with self.lock:
            self.heartbeats[name] = {
                'last_beat': time.monotonic(),
                'critical': critical,
                'alive': True
            }

    def heartbeat(self, name: str):
        """Called by components to signal they're alive"""
        with self.lock:
            if name in self.heartbeats:
                self.heartbeats[name]['last_beat'] = time.monotonic()
                self.heartbeats[name]['alive'] = True

    def _monitor_loop(self):
        while True:
            now = time.monotonic()
            with self.lock:
                for name, info in self.heartbeats.items():
                    elapsed = now - info['last_beat']
                    if elapsed > self.timeout and info['alive']:
                        info['alive'] = False
                        if info['critical']:
                            self._trigger_safe_stop(
                                f"Critical component '{name}' "
                                f"timed out ({elapsed:.1f}s)")
            time.sleep(self.timeout / 4)

    def _trigger_safe_stop(self, reason: str):
        if not self.safe_stop_triggered:
            self.safe_stop_triggered = True
            logger.critical(f"SAFE STOP: {reason}")
            self._execute_safe_stop()

    def _execute_safe_stop(self):
        """Bring robot to a safe state"""
        # 1. Stop all motion (zero velocity command)
        # 2. Engage brakes
        # 3. Publish emergency state to all nodes
        # 4. Log the event
        pass

Workspace Limits

工作区限制

python
class WorkspaceMonitor:
    """Enforce that robot stays within safe operational bounds"""

    def __init__(self, limits: dict):
        self.joint_limits = limits['joints']    # {joint: (min, max)}
        self.cartesian_bounds = limits['cartesian']  # AABB or convex hull
        self.velocity_limits = limits['velocity']
        self.force_limits = limits['force']

    def check_command(self, command) -> SafetyResult:
        """Validate a command BEFORE sending to hardware"""
        violations = []

        # Joint limit check
        for joint, value in command.joint_positions.items():
            lo, hi = self.joint_limits[joint]
            if not (lo <= value <= hi):
                violations.append(
                    f"Joint {joint}={value:.3f} outside [{lo:.3f}, {hi:.3f}]")

        # Velocity check
        for joint, vel in command.joint_velocities.items():
            if abs(vel) > self.velocity_limits[joint]:
                violations.append(
                    f"Joint {joint} velocity {vel:.3f} exceeds limit")

        if violations:
            return SafetyResult(safe=False, violations=violations)
        return SafetyResult(safe=True)
python
class WorkspaceMonitor:
    """Enforce that robot stays within safe operational bounds"""

    def __init__(self, limits: dict):
        self.joint_limits = limits['joints']    # {joint: (min, max)}
        self.cartesian_bounds = limits['cartesian']  # AABB or convex hull
        self.velocity_limits = limits['velocity']
        self.force_limits = limits['force']

    def check_command(self, command) -> SafetyResult:
        """Validate a command BEFORE sending to hardware"""
        violations = []

        # Joint limit check
        for joint, value in command.joint_positions.items():
            lo, hi = self.joint_limits[joint]
            if not (lo <= value <= hi):
                violations.append(
                    f"Joint {joint}={value:.3f} outside [{lo:.3f}, {hi:.3f}]")

        # Velocity check
        for joint, vel in command.joint_velocities.items():
            if abs(vel) > self.velocity_limits[joint]:
                violations.append(
                    f"Joint {joint} velocity {vel:.3f} exceeds limit")

        if violations:
            return SafetyResult(safe=False, violations=violations)
        return SafetyResult(safe=True)

Pattern 7: Sim-to-Real Architecture

模式7:Sim-to-Real架构

┌────────────────────────────────────┐
│         Application Code           │
│  (Same code runs in sim AND real)  │
├──────────────┬─────────────────────┤
│   Sim HAL    │     Real HAL        │
│  (MuJoCo/    │  (Hardware          │
│   Gazebo/    │   drivers)          │
│   Isaac)     │                     │
└──────────────┴─────────────────────┘
Key Principles:
  1. Application code NEVER knows if it's in sim or real
  2. Same message types, same topic names, same interfaces
  3. Use
    use_sim_time
    parameter to switch clock sources
  4. Domain randomization happens INSIDE the sim HAL
  5. Transfer learning adapters sit at the HAL boundary
python
undefined
┌────────────────────────────────────┐
│         Application Code           │
│  (Same code runs in sim AND real)  │
├──────────────┬─────────────────────┤
│   Sim HAL    │     Real HAL        │
│  (MuJoCo/    │  (Hardware          │
│   Gazebo/    │   drivers)          │
│   Isaac)     │                     │
└──────────────┴─────────────────────┘
核心原则
  1. 应用代码完全感知不到自身运行在仿真环境还是真实硬件上
  2. 统一消息类型、主题名称和接口定义
  3. 使用
    use_sim_time
    参数切换时钟源
  4. 域随机化在仿真HAL内部实现
  5. 迁移学习适配器部署在HAL边界
python
undefined

Config-driven sim/real switching

Config-driven sim/real switching

class RobotDriver: def init(self, config): if config['mode'] == 'simulation': self.arm = SimulatedArm(config['sim']) self.camera = SimulatedCamera(config['sim']) elif config['mode'] == 'real': self.arm = UR5Driver(config['real']['arm_ip']) self.camera = RealSenseDriver(config['real']['camera_serial'])
    # Application code uses the same interface regardless
    self.perception = PerceptionPipeline(self.camera)
    self.planner = MotionPlanner(self.arm)
undefined
class RobotDriver: def init(self, config): if config['mode'] == 'simulation': self.arm = SimulatedArm(config['sim']) self.camera = SimulatedCamera(config['sim']) elif config['mode'] == 'real': self.arm = UR5Driver(config['real']['arm_ip']) self.camera = RealSenseDriver(config['real']['camera_serial'])
    # Application code uses the same interface regardless
    self.perception = PerceptionPipeline(self.camera)
    self.planner = MotionPlanner(self.arm)
undefined

Pattern 8: Data Recording Architecture

模式8:数据记录架构

Critical for learning-based robotics — designed for the ForgeIR ecosystem:
┌─────────────────────────────────────────────┐
│              Event-Based Recorder            │
│  Triggers: action boundaries, anomalies,     │
│  task completions, operator signals           │
├─────────────────────────────────────────────┤
│           Multimodal Data Streams            │
│  Camera (30Hz) | Joint State (100Hz) |       │
│  Force/Torque (1kHz) | Language Annotations  │
├─────────────────────────────────────────────┤
│            Storage Layer                     │
│  Episode-based structure with metadata       │
│  Format: MCAP / Zarr / HDF5 / RLDS          │
├─────────────────────────────────────────────┤
│           Quality Assessment                 │
│  Completeness checks, trajectory validation  │
│  Anomaly detection, diversity analysis       │
└─────────────────────────────────────────────┘
python
class EpisodeRecorder:
    """Records robot episodes with event-based boundaries"""

    def __init__(self, config):
        self.streams = {}
        self.episode_active = False
        self.current_episode = None
        self.storage = StorageBackend(config['format'])  # Zarr, MCAP, etc.

    def register_stream(self, name, msg_type, frequency_hz):
        self.streams[name] = StreamConfig(
            name=name, type=msg_type, freq=frequency_hz)

    def start_episode(self, metadata: dict):
        """Begin recording an episode with metadata"""
        self.current_episode = Episode(
            id=uuid4(),
            start_time=time.monotonic(),
            metadata=metadata,  # task, operator, environment, etc.
            streams={name: [] for name in self.streams}
        )
        self.episode_active = True

    def record_step(self, stream_name, data, timestamp):
        if self.episode_active:
            self.current_episode.streams[stream_name].append(
                DataPoint(data=data, timestamp=timestamp))

    def end_episode(self, outcome: str, annotations: dict = None):
        """Finalize and store the episode"""
        self.episode_active = False
        self.current_episode.end_time = time.monotonic()
        self.current_episode.outcome = outcome
        self.current_episode.annotations = annotations

        # Validate before saving
        quality = self.validate_episode(self.current_episode)
        self.current_episode.quality_score = quality

        self.storage.save(self.current_episode)
        return self.current_episode.id
对基于学习的机器人系统至关重要——为ForgeIR生态设计:
┌─────────────────────────────────────────────┐
│              Event-Based Recorder            │
│  Triggers: action boundaries, anomalies,     │
│  task completions, operator signals           │
├─────────────────────────────────────────────┤
│           Multimodal Data Streams            │
│  Camera (30Hz) | Joint State (100Hz) |       │
│  Force/Torque (1kHz) | Language Annotations  │
├─────────────────────────────────────────────┤
│            Storage Layer                     │
│  Episode-based structure with metadata       │
│  Format: MCAP / Zarr / HDF5 / RLDS          │
├─────────────────────────────────────────────┤
│           Quality Assessment                 │
│  Completeness checks, trajectory validation  │
│  Anomaly detection, diversity analysis       │
└─────────────────────────────────────────────┘
python
class EpisodeRecorder:
    """Records robot episodes with event-based boundaries"""

    def __init__(self, config):
        self.streams = {}
        self.episode_active = False
        self.current_episode = None
        self.storage = StorageBackend(config['format'])  # Zarr, MCAP, etc.

    def register_stream(self, name, msg_type, frequency_hz):
        self.streams[name] = StreamConfig(
            name=name, type=msg_type, freq=frequency_hz)

    def start_episode(self, metadata: dict):
        """Begin recording an episode with metadata"""
        self.current_episode = Episode(
            id=uuid4(),
            start_time=time.monotonic(),
            metadata=metadata,  # task, operator, environment, etc.
            streams={name: [] for name in self.streams}
        )
        self.episode_active = True

    def record_step(self, stream_name, data, timestamp):
        if self.episode_active:
            self.current_episode.streams[stream_name].append(
                DataPoint(data=data, timestamp=timestamp))

    def end_episode(self, outcome: str, annotations: dict = None):
        """Finalize and store the episode"""
        self.episode_active = False
        self.current_episode.end_time = time.monotonic()
        self.current_episode.outcome = outcome
        self.current_episode.annotations = annotations

        # Validate before saving
        quality = self.validate_episode(self.current_episode)
        self.current_episode.quality_score = quality

        self.storage.save(self.current_episode)
        return self.current_episode.id

Anti-Patterns to Avoid

需要避免的反模式

1. God Node

1. 上帝节点

Problem: One node does everything — perception, planning, control, logging. Fix: Single responsibility. One node, one job. Connect via topics.
问题:单个节点负责所有工作——感知、规划、控制、日志。 解决方案:遵循单一职责原则,一个节点只承担一项任务,通过主题连接节点。

2. Hardcoded Magic Numbers

2. 硬编码魔法数字

Problem:
if distance < 0.35:
scattered everywhere. Fix: Parameters with descriptive names, loaded from config files.
问题
if distance < 0.35:
这类逻辑散落在代码各处。 解决方案:使用带描述性名称的参数,统一从配置文件加载。

3. Polling Instead of Events

3. 轮询代替事件驱动

Problem:
while True: check_sensor(); sleep(0.01)
Fix: Use callbacks, subscribers, event-driven architecture.
问题:使用
while True: check_sensor(); sleep(0.01)
这类轮询逻辑。 解决方案:使用回调、订阅者模式,采用事件驱动架构。

4. No Error Recovery

4. 无错误恢复机制

Problem: Robot stops forever on first error. Fix: Every action node needs a failure mode. Behavior trees with fallbacks.
问题:机器人遇到第一个错误就永久停止运行。 解决方案:每个动作节点都需要配置失败处理逻辑,行为树搭配fallback节点使用。

5. Sim-Only Code

5. 仅支持仿真的代码

Problem: Code works perfectly in simulation, crashes on real hardware. Fix: HAL pattern. Test with hardware-in-the-loop early and often.
问题:代码在仿真环境中完美运行,在真实硬件上直接崩溃。 解决方案:使用HAL模式,尽早且频繁地进行硬件在环测试。

6. No Timestamps

6. 缺失时间戳

Problem: Sensor data without timestamps — impossible to fuse or replay. Fix: Timestamp EVERYTHING at the source. Use monotonic clocks for control.
问题:传感器数据没有携带时间戳,无法进行融合或回放。 解决方案:所有数据在生成源头都打上时间戳,控制链路使用单调时钟。

7. Blocking the Control Loop

7. 阻塞控制循环

Problem: Perception computation blocks the 100Hz control loop. Fix: Separate processes/threads. Control loop must NEVER be blocked.
问题:感知计算阻塞了100Hz的控制循环。 解决方案:使用独立进程/线程隔离计算任务,控制循环绝对不能被阻塞。

8. No Data Logging

8. 无数据日志

Problem: Can't reproduce bugs, can't train models, can't audit behavior. Fix: Always record. Event-based recording is cheap. Use MCAP format.
问题:无法复现bug,无法训练模型,无法审计机器人行为。 解决方案:始终开启数据记录,事件驱动的记录成本极低,推荐使用MCAP格式。

Architecture Decision Checklist

架构决策检查清单

When designing a new robot system, answer these questions:
  1. What's the safety architecture? (E-stop, watchdog, workspace limits)
  2. What are the real-time requirements? (Control at 100Hz+, perception at 10-30Hz)
  3. What's the behavioral framework? (BT for complex, FSM for simple)
  4. How does sim-to-real work? (HAL pattern, same interfaces)
  5. How is data recorded? (Episode-based, event-triggered, with metadata)
  6. How are failures handled? (Graceful degradation, recovery behaviors)
  7. What's the communication middleware? (ROS2 for most cases)
  8. How is the system deployed? (Docker, snap, direct install)
  9. How is it tested? (Unit, integration, hardware-in-the-loop, field)
  10. How is it monitored? (Heartbeats, metrics, dashboards)
设计新的机器人系统时,请回答以下问题:
  1. 安全架构是什么样的?(急停、看门狗、工作区限制)
  2. 实时要求是什么?(控制频率100Hz以上,感知频率10-30Hz)
  3. 使用什么行为框架?(复杂场景用BT,简单场景用FSM)
  4. Sim-to-real如何实现?(HAL模式,统一接口)
  5. 数据如何记录?(基于episode,事件触发,携带元数据)
  6. 故障如何处理?(优雅降级、恢复行为)
  7. 使用什么通信中间件?(大多数场景使用ROS2)
  8. 系统如何部署?(Docker、snap、直接安装)
  9. 如何进行测试?(单元测试、集成测试、硬件在环测试、现场测试)
  10. 如何监控系统运行?(心跳、指标、可视化看板)