robotics-design-patterns
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseRobotics Design Patterns
机器人设计模式
When to Use This Skill
什么时候使用该技能
- Designing robot software architecture from scratch
- Choosing between behavior trees, FSMs, or hybrid approaches
- Structuring perception → planning → control pipelines
- Implementing safety systems and watchdogs
- Building hardware abstraction layers (HAL)
- Designing for sim-to-real transfer
- Architecting multi-robot / fleet systems
- Making real-time vs. non-real-time tradeoffs
- 从零开始设计机器人软件架构
- 在behavior trees、FSM或是混合方案之间做选型
- 搭建感知 → 规划 → 控制管线
- 实现安全系统和watchdogs
- 构建hardware abstraction layers (HAL)
- 为sim-to-real迁移做架构设计
- 搭建多机器人/集群系统架构
- 权衡实时与非实时需求
Pattern 1: The Robot Software Stack
模式1:机器人软件栈
Every robot system follows this layered architecture, regardless of complexity:
┌─────────────────────────────────────────────┐
│ APPLICATION LAYER │
│ Mission planning, task allocation, UI │
├─────────────────────────────────────────────┤
│ BEHAVIORAL LAYER │
│ Behavior trees, FSMs, decision-making │
├─────────────────────────────────────────────┤
│ FUNCTIONAL LAYER │
│ Perception, Planning, Control, Estimation │
├─────────────────────────────────────────────┤
│ COMMUNICATION LAYER │
│ ROS2, DDS, shared memory, IPC │
├─────────────────────────────────────────────┤
│ HARDWARE ABSTRACTION LAYER │
│ Drivers, sensor interfaces, actuators │
├─────────────────────────────────────────────┤
│ HARDWARE LAYER │
│ Cameras, LiDARs, motors, grippers, IMUs │
└─────────────────────────────────────────────┘Design Rule: Information flows UP through perception, decisions flow DOWN through control. Never let the application layer directly command hardware.
所有机器人系统无论复杂度高低,都遵循这个分层架构:
┌─────────────────────────────────────────────┐
│ APPLICATION LAYER │
│ Mission planning, task allocation, UI │
├─────────────────────────────────────────────┤
│ BEHAVIORAL LAYER │
│ Behavior trees, FSMs, decision-making │
├─────────────────────────────────────────────┤
│ FUNCTIONAL LAYER │
│ Perception, Planning, Control, Estimation │
├─────────────────────────────────────────────┤
│ COMMUNICATION LAYER │
│ ROS2, DDS, shared memory, IPC │
├─────────────────────────────────────────────┤
│ HARDWARE ABSTRACTION LAYER │
│ Drivers, sensor interfaces, actuators │
├─────────────────────────────────────────────┤
│ HARDWARE LAYER │
│ Cameras, LiDARs, motors, grippers, IMUs │
└─────────────────────────────────────────────┘设计规则:信息通过感知链路向上流动,决策通过控制链路向下流动,绝对禁止应用层直接操控硬件。
Pattern 2: Behavior Trees (BT)
模式2:行为树(BT)
Behavior trees are the recommended default for robot decision-making. They're modular, reusable, and easier to debug than FSMs for complex behaviors.
行为树是机器人决策系统的推荐默认方案,相较于FSM,它模块化程度高、可复用性强,复杂行为的调试成本更低。
Core Node Types
核心节点类型
Sequence (→) : Execute children left-to-right, FAIL on first failure
Fallback (?) : Execute children left-to-right, SUCCEED on first success
Parallel (⇉) : Execute all children simultaneously
Decorator : Modify a single child's behavior
Action (leaf) : Execute a robot action
Condition (leaf) : Check a condition (no side effects)Sequence (→) : Execute children left-to-right, FAIL on first failure
Fallback (?) : Execute children left-to-right, SUCCEED on first success
Parallel (⇉) : Execute all children simultaneously
Decorator : Modify a single child's behavior
Action (leaf) : Execute a robot action
Condition (leaf) : Check a condition (no side effects)Example: Pick-and-Place BT
示例:抓取-放置行为树
→ Sequence
/ | \
→ Check → Pick → Place
/ \ / | \ / | \
Battery Obj Open Move Close Move Open Release
OK? Found? Grip To Grip To Grip
per Obj per Goal per → Sequence
/ | \
→ Check → Pick → Place
/ \ / | \ / | \
Battery Obj Open Move Close Move Open Release
OK? Found? Grip To Grip To Grip
per Obj per Goal perImplementation Pattern
实现模式
python
import py_trees
class MoveToTarget(py_trees.behaviour.Behaviour):
"""Action node: Move robot to a target pose"""
def __init__(self, name, target_key="target_pose"):
super().__init__(name)
self.target_key = target_key
self.action_client = None
def setup(self, **kwargs):
"""Called once when tree is set up — initialize resources"""
self.node = kwargs.get('node') # ROS2 node
self.action_client = ActionClient(
self.node, MoveBase, 'move_base')
def initialise(self):
"""Called when this node first ticks — send the goal"""
bb = self.blackboard
target = bb.get(self.target_key)
self.goal_handle = self.action_client.send_goal(target)
self.logger.info(f"Moving to {target}")
def update(self):
"""Called every tick — check progress"""
if self.goal_handle is None:
return py_trees.common.Status.FAILURE
status = self.goal_handle.status
if status == GoalStatus.STATUS_SUCCEEDED:
return py_trees.common.Status.SUCCESS
elif status == GoalStatus.STATUS_ABORTED:
return py_trees.common.Status.FAILURE
else:
return py_trees.common.Status.RUNNING
def terminate(self, new_status):
"""Called when node exits — cancel if preempted"""
if new_status == py_trees.common.Status.INVALID:
if self.goal_handle:
self.goal_handle.cancel_goal()
self.logger.info("Movement cancelled")python
import py_trees
class MoveToTarget(py_trees.behaviour.Behaviour):
"""Action node: Move robot to a target pose"""
def __init__(self, name, target_key="target_pose"):
super().__init__(name)
self.target_key = target_key
self.action_client = None
def setup(self, **kwargs):
"""Called once when tree is set up — initialize resources"""
self.node = kwargs.get('node') # ROS2 node
self.action_client = ActionClient(
self.node, MoveBase, 'move_base')
def initialise(self):
"""Called when this node first ticks — send the goal"""
bb = self.blackboard
target = bb.get(self.target_key)
self.goal_handle = self.action_client.send_goal(target)
self.logger.info(f"Moving to {target}")
def update(self):
"""Called every tick — check progress"""
if self.goal_handle is None:
return py_trees.common.Status.FAILURE
status = self.goal_handle.status
if status == GoalStatus.STATUS_SUCCEEDED:
return py_trees.common.Status.SUCCESS
elif status == GoalStatus.STATUS_ABORTED:
return py_trees.common.Status.FAILURE
else:
return py_trees.common.Status.RUNNING
def terminate(self, new_status):
"""Called when node exits — cancel if preempted"""
if new_status == py_trees.common.Status.INVALID:
if self.goal_handle:
self.goal_handle.cancel_goal()
self.logger.info("Movement cancelled")Build the tree
Build the tree
def create_pick_place_tree():
root = py_trees.composites.Sequence("PickAndPlace", memory=True)
# Safety checks (Fallback: if any fails, abort)
safety = py_trees.composites.Sequence("SafetyChecks", memory=False)
safety.add_children([
CheckBattery("BatteryOK", threshold=20.0),
CheckEStop("EStopClear"),
])
pick = py_trees.composites.Sequence("Pick", memory=True)
pick.add_children([
DetectObject("FindObject"),
MoveToTarget("ApproachObject", target_key="object_pose"),
GripperCommand("CloseGripper", action="close"),
])
place = py_trees.composites.Sequence("Place", memory=True)
place.add_children([
MoveToTarget("MoveToPlace", target_key="place_pose"),
GripperCommand("OpenGripper", action="open"),
])
root.add_children([safety, pick, place])
return rootundefineddef create_pick_place_tree():
root = py_trees.composites.Sequence("PickAndPlace", memory=True)
# Safety checks (Fallback: if any fails, abort)
safety = py_trees.composites.Sequence("SafetyChecks", memory=False)
safety.add_children([
CheckBattery("BatteryOK", threshold=20.0),
CheckEStop("EStopClear"),
])
pick = py_trees.composites.Sequence("Pick", memory=True)
pick.add_children([
DetectObject("FindObject"),
MoveToTarget("ApproachObject", target_key="object_pose"),
GripperCommand("CloseGripper", action="close"),
])
place = py_trees.composites.Sequence("Place", memory=True)
place.add_children([
MoveToTarget("MoveToPlace", target_key="place_pose"),
GripperCommand("OpenGripper", action="open"),
])
root.add_children([safety, pick, place])
return rootundefinedBlackboard Pattern
黑板模式
python
undefinedpython
undefinedThe Blackboard is the shared memory for BT nodes
The Blackboard is the shared memory for BT nodes
bb = py_trees.blackboard.Blackboard()
bb = py_trees.blackboard.Blackboard()
Perception nodes WRITE to blackboard
Perception nodes WRITE to blackboard
class DetectObject(py_trees.behaviour.Behaviour):
def update(self):
detections = self.perception.detect()
if detections:
self.blackboard.set("object_pose", detections[0].pose)
self.blackboard.set("object_class", detections[0].label)
return Status.SUCCESS
return Status.FAILURE
class DetectObject(py_trees.behaviour.Behaviour):
def update(self):
detections = self.perception.detect()
if detections:
self.blackboard.set("object_pose", detections[0].pose)
self.blackboard.set("object_class", detections[0].label)
return Status.SUCCESS
return Status.FAILURE
Action nodes READ from blackboard
Action nodes READ from blackboard
class MoveToTarget(py_trees.behaviour.Behaviour):
def initialise(self):
target = self.blackboard.get("object_pose")
self.send_goal(target)
undefinedclass MoveToTarget(py_trees.behaviour.Behaviour):
def initialise(self):
target = self.blackboard.get("object_pose")
self.send_goal(target)
undefinedPattern 3: Finite State Machines (FSM)
模式3:有限状态机(FSM)
Use FSMs for simple, well-defined sequential behaviors with clear states. Prefer BTs for anything complex.
python
from enum import Enum, auto
import smach # ROS state machine library
class RobotState(Enum):
IDLE = auto()
NAVIGATING = auto()
PICKING = auto()
PLACING = auto()
ERROR = auto()
CHARGING = auto()FSM适用于逻辑清晰、定义明确的简单顺序行为场景,复杂场景优先使用BT。
python
from enum import Enum, auto
import smach # ROS state machine library
class RobotState(Enum):
IDLE = auto()
NAVIGATING = auto()
PICKING = auto()
PLACING = auto()
ERROR = auto()
CHARGING = auto()SMACH implementation
SMACH implementation
class NavigateState(smach.State):
def init(self):
smach.State.init(self,
outcomes=['succeeded', 'aborted', 'preempted'],
input_keys=['target_pose'],
output_keys=['final_pose'])
def execute(self, userdata):
# Navigation logic
result = navigate_to(userdata.target_pose)
if result.success:
userdata.final_pose = result.pose
return 'succeeded'
return 'aborted'class NavigateState(smach.State):
def init(self):
smach.State.init(self,
outcomes=['succeeded', 'aborted', 'preempted'],
input_keys=['target_pose'],
output_keys=['final_pose'])
def execute(self, userdata):
# Navigation logic
result = navigate_to(userdata.target_pose)
if result.success:
userdata.final_pose = result.pose
return 'succeeded'
return 'aborted'Build state machine
Build state machine
sm = smach.StateMachine(outcomes=['done', 'failed'])
with sm:
smach.StateMachine.add('NAVIGATE', NavigateState(),
transitions={'succeeded': 'PICK', 'aborted': 'ERROR'})
smach.StateMachine.add('PICK', PickState(),
transitions={'succeeded': 'PLACE', 'aborted': 'ERROR'})
smach.StateMachine.add('PLACE', PlaceState(),
transitions={'succeeded': 'done', 'aborted': 'ERROR'})
smach.StateMachine.add('ERROR', ErrorRecovery(),
transitions={'recovered': 'NAVIGATE', 'fatal': 'failed'})
**When to use FSM vs BT**:
- FSM: Linear workflows, simple devices, UI states, protocol implementations
- BT: Complex robots, reactive behaviors, many conditional branches, reusable sub-behaviorssm = smach.StateMachine(outcomes=['done', 'failed'])
with sm:
smach.StateMachine.add('NAVIGATE', NavigateState(),
transitions={'succeeded': 'PICK', 'aborted': 'ERROR'})
smach.StateMachine.add('PICK', PickState(),
transitions={'succeeded': 'PLACE', 'aborted': 'ERROR'})
smach.StateMachine.add('PLACE', PlaceState(),
transitions={'succeeded': 'done', 'aborted': 'ERROR'})
smach.StateMachine.add('ERROR', ErrorRecovery(),
transitions={'recovered': 'NAVIGATE', 'fatal': 'failed'})
**FSM与BT选型对比**:
- FSM:线性工作流、简单设备、UI状态、协议实现
- BT:复杂机器人、响应式行为、多条件分支、可复用子行为Pattern 4: Perception Pipeline
模式4:感知管线
Raw Sensors → Preprocessing → Detection/Estimation → Fusion → World ModelRaw Sensors → Preprocessing → Detection/Estimation → Fusion → World ModelSensor Fusion Architecture
传感器融合架构
python
class SensorFusion:
"""Multi-sensor fusion using a central world model"""
def __init__(self):
self.world_model = WorldModel()
self.filters = {
'pose': ExtendedKalmanFilter(state_dim=6),
'objects': MultiObjectTracker(),
}
def update_from_camera(self, detections, timestamp):
"""Camera provides object detections with high latency"""
for det in detections:
self.filters['objects'].update(
det, sensor='camera',
uncertainty=det.confidence,
timestamp=timestamp
)
def update_from_lidar(self, points, timestamp):
"""LiDAR provides precise geometry with lower latency"""
clusters = self.segment_points(points)
for cluster in clusters:
self.filters['objects'].update(
cluster, sensor='lidar',
uncertainty=0.02, # 2cm typical LiDAR accuracy
timestamp=timestamp
)
def update_from_imu(self, imu_data, timestamp):
"""IMU provides high-frequency attitude estimates"""
self.filters['pose'].predict(imu_data, dt=timestamp - self.last_imu_t)
self.last_imu_t = timestamp
def get_world_state(self):
"""Query the fused world model"""
return WorldState(
robot_pose=self.filters['pose'].state,
objects=self.filters['objects'].get_tracked_objects(),
confidence=self.filters['objects'].get_confidence_map()
)python
class SensorFusion:
"""Multi-sensor fusion using a central world model"""
def __init__(self):
self.world_model = WorldModel()
self.filters = {
'pose': ExtendedKalmanFilter(state_dim=6),
'objects': MultiObjectTracker(),
}
def update_from_camera(self, detections, timestamp):
"""Camera provides object detections with high latency"""
for det in detections:
self.filters['objects'].update(
det, sensor='camera',
uncertainty=det.confidence,
timestamp=timestamp
)
def update_from_lidar(self, points, timestamp):
"""LiDAR provides precise geometry with lower latency"""
clusters = self.segment_points(points)
for cluster in clusters:
self.filters['objects'].update(
cluster, sensor='lidar',
uncertainty=0.02, # 2cm typical LiDAR accuracy
timestamp=timestamp
)
def update_from_imu(self, imu_data, timestamp):
"""IMU provides high-frequency attitude estimates"""
self.filters['pose'].predict(imu_data, dt=timestamp - self.last_imu_t)
self.last_imu_t = timestamp
def get_world_state(self):
"""Query the fused world model"""
return WorldState(
robot_pose=self.filters['pose'].state,
objects=self.filters['objects'].get_tracked_objects(),
confidence=self.filters['objects'].get_confidence_map()
)The Perception-Action Loop Timing
感知-行动循环时序
Camera (30Hz) ─┐
LiDAR (10Hz) ─┼──→ Fusion (50Hz) ──→ Planner (10Hz) ──→ Controller (100Hz+)
IMU (200Hz) ─┘
RULE: Controller frequency > Planner frequency > Sensor frequency
This ensures smooth execution despite variable perception latency.Camera (30Hz) ─┐
LiDAR (10Hz) ─┼──→ Fusion (50Hz) ──→ Planner (10Hz) ──→ Controller (100Hz+)
IMU (200Hz) ─┘
RULE: Controller frequency > Planner frequency > Sensor frequency
This ensures smooth execution despite variable perception latency.规则:控制器频率 > 规划器频率 > 传感器频率,即使感知存在延迟波动,也能保证执行流畅。
Pattern 5: Hardware Abstraction Layer (HAL)
模式5:硬件抽象层(HAL)
Never let application code talk directly to hardware. Always go through an abstraction layer.
python
from abc import ABC, abstractmethod
class GripperInterface(ABC):
"""Abstract gripper interface — implement for each hardware type"""
@abstractmethod
def open(self, width: float = 1.0) -> bool: ...
@abstractmethod
def close(self, force: float = 0.5) -> bool: ...
@abstractmethod
def get_state(self) -> GripperState: ...
@abstractmethod
def get_width(self) -> float: ...
class RobotiqGripper(GripperInterface):
"""Concrete implementation for Robotiq 2F-85"""
def __init__(self, port='/dev/ttyUSB0'):
self.serial = serial.Serial(port, 115200)
# ... Modbus RTU setup
def close(self, force=0.5):
cmd = self._build_modbus_cmd(force=int(force * 255))
self.serial.write(cmd)
return self._wait_for_completion()
class SimulatedGripper(GripperInterface):
"""Simulation gripper for testing"""
def __init__(self):
self.width = 0.085 # 85mm open
self.state = GripperState.OPEN
def close(self, force=0.5):
self.width = 0.0
self.state = GripperState.CLOSED
return True绝对禁止应用代码直接访问硬件,所有操作必须经过抽象层。
python
from abc import ABC, abstractmethod
class GripperInterface(ABC):
"""Abstract gripper interface — implement for each hardware type"""
@abstractmethod
def open(self, width: float = 1.0) -> bool: ...
@abstractmethod
def close(self, force: float = 0.5) -> bool: ...
@abstractmethod
def get_state(self) -> GripperState: ...
@abstractmethod
def get_width(self) -> float: ...
class RobotiqGripper(GripperInterface):
"""Concrete implementation for Robotiq 2F-85"""
def __init__(self, port='/dev/ttyUSB0'):
self.serial = serial.Serial(port, 115200)
# ... Modbus RTU setup
def close(self, force=0.5):
cmd = self._build_modbus_cmd(force=int(force * 255))
self.serial.write(cmd)
return self._wait_for_completion()
class SimulatedGripper(GripperInterface):
"""Simulation gripper for testing"""
def __init__(self):
self.width = 0.085 # 85mm open
self.state = GripperState.OPEN
def close(self, force=0.5):
self.width = 0.0
self.state = GripperState.CLOSED
return TrueFactory pattern for hardware instantiation
Factory pattern for hardware instantiation
def create_gripper(config: dict) -> GripperInterface:
gripper_type = config.get('type', 'simulated')
if gripper_type == 'robotiq':
return RobotiqGripper(port=config['port'])
elif gripper_type == 'simulated':
return SimulatedGripper()
else:
raise ValueError(f"Unknown gripper type: {gripper_type}")
undefineddef create_gripper(config: dict) -> GripperInterface:
gripper_type = config.get('type', 'simulated')
if gripper_type == 'robotiq':
return RobotiqGripper(port=config['port'])
elif gripper_type == 'simulated':
return SimulatedGripper()
else:
raise ValueError(f"Unknown gripper type: {gripper_type}")
undefinedPattern 6: Safety Systems
模式6:安全系统
The Safety Hierarchy
安全层级
Level 0: Hardware E-Stop (physical button, cuts power)
Level 1: Safety-rated controller (SIL2/SIL3, hardware watchdog)
Level 2: Software watchdog (monitors heartbeats, enforces limits)
Level 3: Application safety (collision avoidance, workspace limits)Level 0: Hardware E-Stop (physical button, cuts power)
Level 1: Safety-rated controller (SIL2/SIL3, hardware watchdog)
Level 2: Software watchdog (monitors heartbeats, enforces limits)
Level 3: Application safety (collision avoidance, workspace limits)Software Watchdog Pattern
软件看门狗模式
python
import threading
import time
class SafetyWatchdog:
"""Monitors system health and triggers safe stop on failures"""
def __init__(self, timeout_ms=500):
self.timeout = timeout_ms / 1000.0
self.heartbeats = {}
self.lock = threading.Lock()
self.safe_stop_triggered = False
# Start monitoring thread
self.monitor_thread = threading.Thread(
target=self._monitor_loop, daemon=True)
self.monitor_thread.start()
def register_component(self, name: str, critical: bool = True):
"""Register a component that must send heartbeats"""
with self.lock:
self.heartbeats[name] = {
'last_beat': time.monotonic(),
'critical': critical,
'alive': True
}
def heartbeat(self, name: str):
"""Called by components to signal they're alive"""
with self.lock:
if name in self.heartbeats:
self.heartbeats[name]['last_beat'] = time.monotonic()
self.heartbeats[name]['alive'] = True
def _monitor_loop(self):
while True:
now = time.monotonic()
with self.lock:
for name, info in self.heartbeats.items():
elapsed = now - info['last_beat']
if elapsed > self.timeout and info['alive']:
info['alive'] = False
if info['critical']:
self._trigger_safe_stop(
f"Critical component '{name}' "
f"timed out ({elapsed:.1f}s)")
time.sleep(self.timeout / 4)
def _trigger_safe_stop(self, reason: str):
if not self.safe_stop_triggered:
self.safe_stop_triggered = True
logger.critical(f"SAFE STOP: {reason}")
self._execute_safe_stop()
def _execute_safe_stop(self):
"""Bring robot to a safe state"""
# 1. Stop all motion (zero velocity command)
# 2. Engage brakes
# 3. Publish emergency state to all nodes
# 4. Log the event
passpython
import threading
import time
class SafetyWatchdog:
"""Monitors system health and triggers safe stop on failures"""
def __init__(self, timeout_ms=500):
self.timeout = timeout_ms / 1000.0
self.heartbeats = {}
self.lock = threading.Lock()
self.safe_stop_triggered = False
# Start monitoring thread
self.monitor_thread = threading.Thread(
target=self._monitor_loop, daemon=True)
self.monitor_thread.start()
def register_component(self, name: str, critical: bool = True):
"""Register a component that must send heartbeats"""
with self.lock:
self.heartbeats[name] = {
'last_beat': time.monotonic(),
'critical': critical,
'alive': True
}
def heartbeat(self, name: str):
"""Called by components to signal they're alive"""
with self.lock:
if name in self.heartbeats:
self.heartbeats[name]['last_beat'] = time.monotonic()
self.heartbeats[name]['alive'] = True
def _monitor_loop(self):
while True:
now = time.monotonic()
with self.lock:
for name, info in self.heartbeats.items():
elapsed = now - info['last_beat']
if elapsed > self.timeout and info['alive']:
info['alive'] = False
if info['critical']:
self._trigger_safe_stop(
f"Critical component '{name}' "
f"timed out ({elapsed:.1f}s)")
time.sleep(self.timeout / 4)
def _trigger_safe_stop(self, reason: str):
if not self.safe_stop_triggered:
self.safe_stop_triggered = True
logger.critical(f"SAFE STOP: {reason}")
self._execute_safe_stop()
def _execute_safe_stop(self):
"""Bring robot to a safe state"""
# 1. Stop all motion (zero velocity command)
# 2. Engage brakes
# 3. Publish emergency state to all nodes
# 4. Log the event
passWorkspace Limits
工作区限制
python
class WorkspaceMonitor:
"""Enforce that robot stays within safe operational bounds"""
def __init__(self, limits: dict):
self.joint_limits = limits['joints'] # {joint: (min, max)}
self.cartesian_bounds = limits['cartesian'] # AABB or convex hull
self.velocity_limits = limits['velocity']
self.force_limits = limits['force']
def check_command(self, command) -> SafetyResult:
"""Validate a command BEFORE sending to hardware"""
violations = []
# Joint limit check
for joint, value in command.joint_positions.items():
lo, hi = self.joint_limits[joint]
if not (lo <= value <= hi):
violations.append(
f"Joint {joint}={value:.3f} outside [{lo:.3f}, {hi:.3f}]")
# Velocity check
for joint, vel in command.joint_velocities.items():
if abs(vel) > self.velocity_limits[joint]:
violations.append(
f"Joint {joint} velocity {vel:.3f} exceeds limit")
if violations:
return SafetyResult(safe=False, violations=violations)
return SafetyResult(safe=True)python
class WorkspaceMonitor:
"""Enforce that robot stays within safe operational bounds"""
def __init__(self, limits: dict):
self.joint_limits = limits['joints'] # {joint: (min, max)}
self.cartesian_bounds = limits['cartesian'] # AABB or convex hull
self.velocity_limits = limits['velocity']
self.force_limits = limits['force']
def check_command(self, command) -> SafetyResult:
"""Validate a command BEFORE sending to hardware"""
violations = []
# Joint limit check
for joint, value in command.joint_positions.items():
lo, hi = self.joint_limits[joint]
if not (lo <= value <= hi):
violations.append(
f"Joint {joint}={value:.3f} outside [{lo:.3f}, {hi:.3f}]")
# Velocity check
for joint, vel in command.joint_velocities.items():
if abs(vel) > self.velocity_limits[joint]:
violations.append(
f"Joint {joint} velocity {vel:.3f} exceeds limit")
if violations:
return SafetyResult(safe=False, violations=violations)
return SafetyResult(safe=True)Pattern 7: Sim-to-Real Architecture
模式7:Sim-to-Real架构
┌────────────────────────────────────┐
│ Application Code │
│ (Same code runs in sim AND real) │
├──────────────┬─────────────────────┤
│ Sim HAL │ Real HAL │
│ (MuJoCo/ │ (Hardware │
│ Gazebo/ │ drivers) │
│ Isaac) │ │
└──────────────┴─────────────────────┘Key Principles:
- Application code NEVER knows if it's in sim or real
- Same message types, same topic names, same interfaces
- Use parameter to switch clock sources
use_sim_time - Domain randomization happens INSIDE the sim HAL
- Transfer learning adapters sit at the HAL boundary
python
undefined┌────────────────────────────────────┐
│ Application Code │
│ (Same code runs in sim AND real) │
├──────────────┬─────────────────────┤
│ Sim HAL │ Real HAL │
│ (MuJoCo/ │ (Hardware │
│ Gazebo/ │ drivers) │
│ Isaac) │ │
└──────────────┴─────────────────────┘核心原则:
- 应用代码完全感知不到自身运行在仿真环境还是真实硬件上
- 统一消息类型、主题名称和接口定义
- 使用参数切换时钟源
use_sim_time - 域随机化在仿真HAL内部实现
- 迁移学习适配器部署在HAL边界
python
undefinedConfig-driven sim/real switching
Config-driven sim/real switching
class RobotDriver:
def init(self, config):
if config['mode'] == 'simulation':
self.arm = SimulatedArm(config['sim'])
self.camera = SimulatedCamera(config['sim'])
elif config['mode'] == 'real':
self.arm = UR5Driver(config['real']['arm_ip'])
self.camera = RealSenseDriver(config['real']['camera_serial'])
# Application code uses the same interface regardless
self.perception = PerceptionPipeline(self.camera)
self.planner = MotionPlanner(self.arm)undefinedclass RobotDriver:
def init(self, config):
if config['mode'] == 'simulation':
self.arm = SimulatedArm(config['sim'])
self.camera = SimulatedCamera(config['sim'])
elif config['mode'] == 'real':
self.arm = UR5Driver(config['real']['arm_ip'])
self.camera = RealSenseDriver(config['real']['camera_serial'])
# Application code uses the same interface regardless
self.perception = PerceptionPipeline(self.camera)
self.planner = MotionPlanner(self.arm)undefinedPattern 8: Data Recording Architecture
模式8:数据记录架构
Critical for learning-based robotics — designed for the ForgeIR ecosystem:
┌─────────────────────────────────────────────┐
│ Event-Based Recorder │
│ Triggers: action boundaries, anomalies, │
│ task completions, operator signals │
├─────────────────────────────────────────────┤
│ Multimodal Data Streams │
│ Camera (30Hz) | Joint State (100Hz) | │
│ Force/Torque (1kHz) | Language Annotations │
├─────────────────────────────────────────────┤
│ Storage Layer │
│ Episode-based structure with metadata │
│ Format: MCAP / Zarr / HDF5 / RLDS │
├─────────────────────────────────────────────┤
│ Quality Assessment │
│ Completeness checks, trajectory validation │
│ Anomaly detection, diversity analysis │
└─────────────────────────────────────────────┘python
class EpisodeRecorder:
"""Records robot episodes with event-based boundaries"""
def __init__(self, config):
self.streams = {}
self.episode_active = False
self.current_episode = None
self.storage = StorageBackend(config['format']) # Zarr, MCAP, etc.
def register_stream(self, name, msg_type, frequency_hz):
self.streams[name] = StreamConfig(
name=name, type=msg_type, freq=frequency_hz)
def start_episode(self, metadata: dict):
"""Begin recording an episode with metadata"""
self.current_episode = Episode(
id=uuid4(),
start_time=time.monotonic(),
metadata=metadata, # task, operator, environment, etc.
streams={name: [] for name in self.streams}
)
self.episode_active = True
def record_step(self, stream_name, data, timestamp):
if self.episode_active:
self.current_episode.streams[stream_name].append(
DataPoint(data=data, timestamp=timestamp))
def end_episode(self, outcome: str, annotations: dict = None):
"""Finalize and store the episode"""
self.episode_active = False
self.current_episode.end_time = time.monotonic()
self.current_episode.outcome = outcome
self.current_episode.annotations = annotations
# Validate before saving
quality = self.validate_episode(self.current_episode)
self.current_episode.quality_score = quality
self.storage.save(self.current_episode)
return self.current_episode.id对基于学习的机器人系统至关重要——为ForgeIR生态设计:
┌─────────────────────────────────────────────┐
│ Event-Based Recorder │
│ Triggers: action boundaries, anomalies, │
│ task completions, operator signals │
├─────────────────────────────────────────────┤
│ Multimodal Data Streams │
│ Camera (30Hz) | Joint State (100Hz) | │
│ Force/Torque (1kHz) | Language Annotations │
├─────────────────────────────────────────────┤
│ Storage Layer │
│ Episode-based structure with metadata │
│ Format: MCAP / Zarr / HDF5 / RLDS │
├─────────────────────────────────────────────┤
│ Quality Assessment │
│ Completeness checks, trajectory validation │
│ Anomaly detection, diversity analysis │
└─────────────────────────────────────────────┘python
class EpisodeRecorder:
"""Records robot episodes with event-based boundaries"""
def __init__(self, config):
self.streams = {}
self.episode_active = False
self.current_episode = None
self.storage = StorageBackend(config['format']) # Zarr, MCAP, etc.
def register_stream(self, name, msg_type, frequency_hz):
self.streams[name] = StreamConfig(
name=name, type=msg_type, freq=frequency_hz)
def start_episode(self, metadata: dict):
"""Begin recording an episode with metadata"""
self.current_episode = Episode(
id=uuid4(),
start_time=time.monotonic(),
metadata=metadata, # task, operator, environment, etc.
streams={name: [] for name in self.streams}
)
self.episode_active = True
def record_step(self, stream_name, data, timestamp):
if self.episode_active:
self.current_episode.streams[stream_name].append(
DataPoint(data=data, timestamp=timestamp))
def end_episode(self, outcome: str, annotations: dict = None):
"""Finalize and store the episode"""
self.episode_active = False
self.current_episode.end_time = time.monotonic()
self.current_episode.outcome = outcome
self.current_episode.annotations = annotations
# Validate before saving
quality = self.validate_episode(self.current_episode)
self.current_episode.quality_score = quality
self.storage.save(self.current_episode)
return self.current_episode.idAnti-Patterns to Avoid
需要避免的反模式
1. God Node
1. 上帝节点
Problem: One node does everything — perception, planning, control, logging.
Fix: Single responsibility. One node, one job. Connect via topics.
问题:单个节点负责所有工作——感知、规划、控制、日志。
解决方案:遵循单一职责原则,一个节点只承担一项任务,通过主题连接节点。
2. Hardcoded Magic Numbers
2. 硬编码魔法数字
Problem: scattered everywhere.
Fix: Parameters with descriptive names, loaded from config files.
if distance < 0.35:问题: 这类逻辑散落在代码各处。
解决方案:使用带描述性名称的参数,统一从配置文件加载。
if distance < 0.35:3. Polling Instead of Events
3. 轮询代替事件驱动
Problem:
Fix: Use callbacks, subscribers, event-driven architecture.
while True: check_sensor(); sleep(0.01)问题:使用 这类轮询逻辑。
解决方案:使用回调、订阅者模式,采用事件驱动架构。
while True: check_sensor(); sleep(0.01)4. No Error Recovery
4. 无错误恢复机制
Problem: Robot stops forever on first error.
Fix: Every action node needs a failure mode. Behavior trees with fallbacks.
问题:机器人遇到第一个错误就永久停止运行。
解决方案:每个动作节点都需要配置失败处理逻辑,行为树搭配fallback节点使用。
5. Sim-Only Code
5. 仅支持仿真的代码
Problem: Code works perfectly in simulation, crashes on real hardware.
Fix: HAL pattern. Test with hardware-in-the-loop early and often.
问题:代码在仿真环境中完美运行,在真实硬件上直接崩溃。
解决方案:使用HAL模式,尽早且频繁地进行硬件在环测试。
6. No Timestamps
6. 缺失时间戳
Problem: Sensor data without timestamps — impossible to fuse or replay.
Fix: Timestamp EVERYTHING at the source. Use monotonic clocks for control.
问题:传感器数据没有携带时间戳,无法进行融合或回放。
解决方案:所有数据在生成源头都打上时间戳,控制链路使用单调时钟。
7. Blocking the Control Loop
7. 阻塞控制循环
Problem: Perception computation blocks the 100Hz control loop.
Fix: Separate processes/threads. Control loop must NEVER be blocked.
问题:感知计算阻塞了100Hz的控制循环。
解决方案:使用独立进程/线程隔离计算任务,控制循环绝对不能被阻塞。
8. No Data Logging
8. 无数据日志
Problem: Can't reproduce bugs, can't train models, can't audit behavior.
Fix: Always record. Event-based recording is cheap. Use MCAP format.
问题:无法复现bug,无法训练模型,无法审计机器人行为。
解决方案:始终开启数据记录,事件驱动的记录成本极低,推荐使用MCAP格式。
Architecture Decision Checklist
架构决策检查清单
When designing a new robot system, answer these questions:
- What's the safety architecture? (E-stop, watchdog, workspace limits)
- What are the real-time requirements? (Control at 100Hz+, perception at 10-30Hz)
- What's the behavioral framework? (BT for complex, FSM for simple)
- How does sim-to-real work? (HAL pattern, same interfaces)
- How is data recorded? (Episode-based, event-triggered, with metadata)
- How are failures handled? (Graceful degradation, recovery behaviors)
- What's the communication middleware? (ROS2 for most cases)
- How is the system deployed? (Docker, snap, direct install)
- How is it tested? (Unit, integration, hardware-in-the-loop, field)
- How is it monitored? (Heartbeats, metrics, dashboards)
设计新的机器人系统时,请回答以下问题:
- 安全架构是什么样的?(急停、看门狗、工作区限制)
- 实时要求是什么?(控制频率100Hz以上,感知频率10-30Hz)
- 使用什么行为框架?(复杂场景用BT,简单场景用FSM)
- Sim-to-real如何实现?(HAL模式,统一接口)
- 数据如何记录?(基于episode,事件触发,携带元数据)
- 故障如何处理?(优雅降级、恢复行为)
- 使用什么通信中间件?(大多数场景使用ROS2)
- 系统如何部署?(Docker、snap、直接安装)
- 如何进行测试?(单元测试、集成测试、硬件在环测试、现场测试)
- 如何监控系统运行?(心跳、指标、可视化看板)