wake-word-detection
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseWake Word Detection Skill
唤醒词检测技能
1. Overview
1. 概述
Risk Level: MEDIUM - Continuous audio monitoring, privacy implications, resource constraints
You are an expert in wake word detection with deep expertise in openWakeWord, keyword spotting, and always-listening systems.
Primary Use Cases:
- JARVIS activation phrase detection ("Hey JARVIS")
- Always-listening with minimal resource usage
- Offline wake word detection (no cloud dependency)
风险等级:中等——持续音频监控,存在隐私影响和资源限制
您是一位精通唤醒词检测的专家,在openWakeWord、关键词识别和始终监听系统方面拥有深厚经验。
主要使用场景:
- JARVIS激活短语检测(“Hey JARVIS”)
- 低资源消耗的始终监听功能
- 离线唤醒词检测(无云依赖)
2. Core Principles
2. 核心原则
- TDD First - Write tests before implementation code
- Performance Aware - Optimize for CPU, memory, and latency
- Privacy Preserving - Never store audio, minimize buffers
- Accuracy Focused - Minimize false positives/negatives
- Resource Efficient - Target <5% CPU, <100MB memory
- TDD优先——在编写实现代码前先编写测试
- 性能感知——针对CPU、内存和延迟进行优化
- 隐私保护——绝不存储音频,最小化缓冲区
- 聚焦准确性——最小化误报/漏报
- 资源高效——目标CPU占用<5%,内存占用<100MB
3. Core Responsibilities
3. 核心职责
3.1 Privacy-First Monitoring
3.1 隐私优先的监控
- Process locally - Never send audio to external services
- Buffer minimally - Only keep audio needed for detection
- Discard non-wake - Immediately discard non-wake audio
- User control - Easy disable/pause functionality
- 本地处理——绝不将音频发送到外部服务
- 最小化缓冲区——仅保留检测所需的音频
- 丢弃非唤醒音频——立即丢弃非唤醒触发的音频
- 用户可控——提供便捷的禁用/暂停功能
3.2 Efficiency Requirements
3.2 效率要求
- Minimal CPU usage (<5% average)
- Low memory footprint (<100MB)
- Low latency detection (<500ms)
- Low false positive rate (<1 per hour)
- 最低CPU使用率(平均<5%)
- 低内存占用(<100MB)
- 低延迟检测(<500ms)
- 低误报率(每小时<1次)
4. Technical Foundation
4. 技术基础
python
undefinedpython
undefinedrequirements.txt
requirements.txt
openwakeword>=0.6.0
numpy>=1.24.0
sounddevice>=0.4.6
onnxruntime>=1.16.0
---openwakeword>=0.6.0
numpy>=1.24.0
sounddevice>=0.4.6
onnxruntime>=1.16.0
---5. Implementation Workflow (TDD)
5. 实现工作流(TDD)
Step 1: Write Failing Test First
步骤1:先编写失败的测试用例
python
undefinedpython
undefinedtests/test_wake_word.py
tests/test_wake_word.py
import pytest
import numpy as np
from unittest.mock import Mock, patch
class TestWakeWordDetector:
"""TDD tests for wake word detection."""
def test_detection_accuracy_threshold(self):
"""Test that detector respects confidence threshold."""
from wake_word import SecureWakeWordDetector
detector = SecureWakeWordDetector(threshold=0.7)
callback = Mock()
test_audio = np.random.randn(16000).astype(np.float32)
with patch.object(detector.model, 'predict') as mock_predict:
# Below threshold - should not trigger
mock_predict.return_value = {"hey_jarvis": np.array([0.5])}
detector._test_process(test_audio, callback)
callback.assert_not_called()
# Above threshold - should trigger
mock_predict.return_value = {"hey_jarvis": np.array([0.8])}
detector._test_process(test_audio, callback)
callback.assert_called_once()
def test_buffer_cleared_after_detection(self):
"""Test privacy: buffer cleared immediately after detection."""
from wake_word import SecureWakeWordDetector
detector = SecureWakeWordDetector()
detector.audio_buffer.extend(np.zeros(16000))
with patch.object(detector.model, 'predict') as mock_predict:
mock_predict.return_value = {"hey_jarvis": np.array([0.9])}
detector._process_audio()
assert len(detector.audio_buffer) == 0, "Buffer must be cleared"
def test_cpu_usage_under_threshold(self):
"""Test CPU usage stays under 5%."""
import psutil
import time
from wake_word import SecureWakeWordDetector
detector = SecureWakeWordDetector()
process = psutil.Process()
start_time = time.time()
while time.time() - start_time < 10:
audio = np.random.randn(1600).astype(np.float32)
detector.audio_buffer.extend(audio)
if len(detector.audio_buffer) >= 16000:
detector._process_audio()
avg_cpu = process.cpu_percent() / psutil.cpu_count()
assert avg_cpu < 5, f"CPU usage too high: {avg_cpu}%"
def test_memory_footprint(self):
"""Test memory usage stays under 100MB."""
import tracemalloc
from wake_word import SecureWakeWordDetector
tracemalloc.start()
detector = SecureWakeWordDetector()
for _ in range(600):
audio = np.random.randn(1600).astype(np.float32)
detector.audio_buffer.extend(audio)
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
peak_mb = peak / 1024 / 1024
assert peak_mb < 100, f"Memory too high: {peak_mb}MB"undefinedimport pytest
import numpy as np
from unittest.mock import Mock, patch
class TestWakeWordDetector:
"""TDD tests for wake word detection."""
def test_detection_accuracy_threshold(self):
"""Test that detector respects confidence threshold."""
from wake_word import SecureWakeWordDetector
detector = SecureWakeWordDetector(threshold=0.7)
callback = Mock()
test_audio = np.random.randn(16000).astype(np.float32)
with patch.object(detector.model, 'predict') as mock_predict:
# Below threshold - should not trigger
mock_predict.return_value = {"hey_jarvis": np.array([0.5])}
detector._test_process(test_audio, callback)
callback.assert_not_called()
# Above threshold - should trigger
mock_predict.return_value = {"hey_jarvis": np.array([0.8])}
detector._test_process(test_audio, callback)
callback.assert_called_once()
def test_buffer_cleared_after_detection(self):
"""Test privacy: buffer cleared immediately after detection."""
from wake_word import SecureWakeWordDetector
detector = SecureWakeWordDetector()
detector.audio_buffer.extend(np.zeros(16000))
with patch.object(detector.model, 'predict') as mock_predict:
mock_predict.return_value = {"hey_jarvis": np.array([0.9])}
detector._process_audio()
assert len(detector.audio_buffer) == 0, "Buffer must be cleared"
def test_cpu_usage_under_threshold(self):
"""Test CPU usage stays under 5%."""
import psutil
import time
from wake_word import SecureWakeWordDetector
detector = SecureWakeWordDetector()
process = psutil.Process()
start_time = time.time()
while time.time() - start_time < 10:
audio = np.random.randn(1600).astype(np.float32)
detector.audio_buffer.extend(audio)
if len(detector.audio_buffer) >= 16000:
detector._process_audio()
avg_cpu = process.cpu_percent() / psutil.cpu_count()
assert avg_cpu < 5, f"CPU usage too high: {avg_cpu}%"
def test_memory_footprint(self):
"""Test memory usage stays under 100MB."""
import tracemalloc
from wake_word import SecureWakeWordDetector
tracemalloc.start()
detector = SecureWakeWordDetector()
for _ in range(600):
audio = np.random.randn(1600).astype(np.float32)
detector.audio_buffer.extend(audio)
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
peak_mb = peak / 1024 / 1024
assert peak_mb < 100, f"Memory too high: {peak_mb}MB"undefinedStep 2: Implement Minimum to Pass
步骤2:编写满足测试的最小实现
python
class SecureWakeWordDetector:
def __init__(self, threshold=0.5):
self.threshold = threshold
self.model = Model(wakeword_models=["hey_jarvis"])
self.audio_buffer = deque(maxlen=24000)
def _test_process(self, audio, callback):
predictions = self.model.predict(audio)
for model_name, scores in predictions.items():
if np.max(scores) > self.threshold:
self.audio_buffer.clear()
callback(model_name, np.max(scores))
breakpython
class SecureWakeWordDetector:
def __init__(self, threshold=0.5):
self.threshold = threshold
self.model = Model(wakeword_models=["hey_jarvis"])
self.audio_buffer = deque(maxlen=24000)
def _test_process(self, audio, callback):
predictions = self.model.predict(audio)
for model_name, scores in predictions.items():
if np.max(scores) > self.threshold:
self.audio_buffer.clear()
callback(model_name, np.max(scores))
breakStep 3: Run Full Verification
步骤3:运行完整验证
bash
pytest tests/test_wake_word.py -v
pytest --cov=wake_word --cov-report=term-missingbash
pytest tests/test_wake_word.py -v
pytest --cov=wake_word --cov-report=term-missing6. Implementation Patterns
6. 实现模式
Pattern 1: Secure Wake Word Detector
模式1:安全唤醒词检测器
python
from openwakeword.model import Model
import numpy as np
import sounddevice as sd
from collections import deque
import structlog
logger = structlog.get_logger()
class SecureWakeWordDetector:
"""Privacy-preserving wake word detection."""
def __init__(self, model_path: str = None, threshold: float = 0.5, sample_rate: int = 16000):
if model_path:
self.model = Model(wakeword_models=[model_path])
else:
self.model = Model(wakeword_models=["hey_jarvis"])
self.threshold = threshold
self.sample_rate = sample_rate
self.buffer_size = int(sample_rate * 1.5)
self.audio_buffer = deque(maxlen=self.buffer_size)
self.is_listening = False
self.on_wake = None
def start(self, callback):
"""Start listening for wake word."""
self.on_wake = callback
self.is_listening = True
def audio_callback(indata, frames, time, status):
if not self.is_listening:
return
audio = indata[:, 0] if len(indata.shape) > 1 else indata
self.audio_buffer.extend(audio)
if len(self.audio_buffer) >= self.sample_rate:
self._process_audio()
self.stream = sd.InputStream(
samplerate=self.sample_rate, channels=1, dtype=np.float32,
callback=audio_callback, blocksize=int(self.sample_rate * 0.1)
)
self.stream.start()
def _process_audio(self):
"""Process audio buffer for wake word."""
audio = np.array(list(self.audio_buffer))
predictions = self.model.predict(audio)
for model_name, scores in predictions.items():
if np.max(scores) > self.threshold:
self.audio_buffer.clear() # Privacy: clear immediately
if self.on_wake:
self.on_wake(model_name, np.max(scores))
break
def stop(self):
"""Stop listening."""
self.is_listening = False
if hasattr(self, 'stream'):
self.stream.stop()
self.stream.close()
self.audio_buffer.clear()python
from openwakeword.model import Model
import numpy as np
import sounddevice as sd
from collections import deque
import structlog
logger = structlog.get_logger()
class SecureWakeWordDetector:
"""Privacy-preserving wake word detection."""
def __init__(self, model_path: str = None, threshold: float = 0.5, sample_rate: int = 16000):
if model_path:
self.model = Model(wakeword_models=[model_path])
else:
self.model = Model(wakeword_models=["hey_jarvis"])
self.threshold = threshold
self.sample_rate = sample_rate
self.buffer_size = int(sample_rate * 1.5)
self.audio_buffer = deque(maxlen=self.buffer_size)
self.is_listening = False
self.on_wake = None
def start(self, callback):
"""Start listening for wake word."""
self.on_wake = callback
self.is_listening = True
def audio_callback(indata, frames, time, status):
if not self.is_listening:
return
audio = indata[:, 0] if len(indata.shape) > 1 else indata
self.audio_buffer.extend(audio)
if len(self.audio_buffer) >= self.sample_rate:
self._process_audio()
self.stream = sd.InputStream(
samplerate=self.sample_rate, channels=1, dtype=np.float32,
callback=audio_callback, blocksize=int(self.sample_rate * 0.1)
)
self.stream.start()
def _process_audio(self):
"""Process audio buffer for wake word."""
audio = np.array(list(self.audio_buffer))
predictions = self.model.predict(audio)
for model_name, scores in predictions.items():
if np.max(scores) > self.threshold:
self.audio_buffer.clear() # Privacy: clear immediately
if self.on_wake:
self.on_wake(model_name, np.max(scores))
break
def stop(self):
"""Stop listening."""
self.is_listening = False
if hasattr(self, 'stream'):
self.stream.stop()
self.stream.close()
self.audio_buffer.clear()Pattern 2: False Positive Reduction
模式2:减少误报
python
class RobustDetector:
"""Reduce false positives with confirmation."""
def __init__(self, detector: SecureWakeWordDetector):
self.detector = detector
self.detection_history = []
self.confirmation_window = 2.0
self.min_confirmations = 2
def on_potential_wake(self, model: str, confidence: float):
now = time.time()
self.detection_history.append({"time": now, "confidence": confidence})
self.detection_history = [d for d in self.detection_history if now - d["time"] < self.confirmation_window]
if len(self.detection_history) >= self.min_confirmations:
avg_confidence = np.mean([d["confidence"] for d in self.detection_history])
if avg_confidence > 0.6:
self.detection_history.clear()
return True
return Falsepython
class RobustDetector:
"""Reduce false positives with confirmation."""
def __init__(self, detector: SecureWakeWordDetector):
self.detector = detector
self.detection_history = []
self.confirmation_window = 2.0
self.min_confirmations = 2
def on_potential_wake(self, model: str, confidence: float):
now = time.time()
self.detection_history.append({"time": now, "confidence": confidence})
self.detection_history = [d for d in self.detection_history if now - d["time"] < self.confirmation_window]
if len(self.detection_history) >= self.min_confirmations:
avg_confidence = np.mean([d["confidence"] for d in self.detection_history])
if avg_confidence > 0.6:
self.detection_history.clear()
return True
return False7. Performance Patterns
7. 性能优化模式
Pattern 1: Model Quantization
模式1:模型量化
python
undefinedpython
undefinedGood - Use quantized ONNX model
Good - Use quantized ONNX model
import onnxruntime as ort
class QuantizedDetector:
def init(self, model_path: str):
sess_options = ort.SessionOptions()
sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
self.session = ort.InferenceSession(model_path, sess_options, providers=['CPUExecutionProvider'])
import onnxruntime as ort
class QuantizedDetector:
def init(self, model_path: str):
sess_options = ort.SessionOptions()
sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
self.session = ort.InferenceSession(model_path, sess_options, providers=['CPUExecutionProvider'])
Bad - Full precision model
Bad - Full precision model
class SlowDetector:
def init(self, model_path: str):
self.session = ort.InferenceSession(model_path) # No optimization
undefinedclass SlowDetector:
def init(self, model_path: str):
self.session = ort.InferenceSession(model_path) # No optimization
undefinedPattern 2: Efficient Audio Buffering
模式2:高效音频缓冲
python
undefinedpython
undefinedGood - Pre-allocated numpy buffer with circular indexing
Good - Pre-allocated numpy buffer with circular indexing
class EfficientBuffer:
def init(self, size: int):
self.buffer = np.zeros(size, dtype=np.float32)
self.write_idx = 0
self.size = size
def append(self, audio: np.ndarray):
n = len(audio)
end_idx = (self.write_idx + n) % self.size
if end_idx > self.write_idx:
self.buffer[self.write_idx:end_idx] = audio
else:
self.buffer[self.write_idx:] = audio[:self.size - self.write_idx]
self.buffer[:end_idx] = audio[self.size - self.write_idx:]
self.write_idx = end_idxclass EfficientBuffer:
def init(self, size: int):
self.buffer = np.zeros(size, dtype=np.float32)
self.write_idx = 0
self.size = size
def append(self, audio: np.ndarray):
n = len(audio)
end_idx = (self.write_idx + n) % self.size
if end_idx > self.write_idx:
self.buffer[self.write_idx:end_idx] = audio
else:
self.buffer[self.write_idx:] = audio[:self.size - self.write_idx]
self.buffer[:end_idx] = audio[self.size - self.write_idx:]
self.write_idx = end_idxBad - Individual appends
Bad - Individual appends
class SlowBuffer:
def append(self, audio: np.ndarray):
for sample in audio: # Slow!
self.buffer.append(sample)
undefinedclass SlowBuffer:
def append(self, audio: np.ndarray):
for sample in audio: # Slow!
self.buffer.append(sample)
undefinedPattern 3: VAD Preprocessing
模式3:VAD预处理
python
undefinedpython
undefinedGood - Skip inference on silence
Good - Skip inference on silence
import webrtcvad
class VADOptimizedDetector:
def init(self):
self.vad = webrtcvad.Vad(2)
self.detector = SecureWakeWordDetector()
def process(self, audio: np.ndarray):
audio_int16 = (audio * 32767).astype(np.int16)
if not self.vad.is_speech(audio_int16.tobytes(), 16000):
return None # Skip expensive inference
return self.detector._process_audio()import webrtcvad
class VADOptimizedDetector:
def init(self):
self.vad = webrtcvad.Vad(2)
self.detector = SecureWakeWordDetector()
def process(self, audio: np.ndarray):
audio_int16 = (audio * 32767).astype(np.int16)
if not self.vad.is_speech(audio_int16.tobytes(), 16000):
return None # Skip expensive inference
return self.detector._process_audio()Bad - Always run inference
Bad - Always run inference
class WastefulDetector:
def process(self, audio: np.ndarray):
return self.detector._process_audio() # Even on silence
undefinedclass WastefulDetector:
def process(self, audio: np.ndarray):
return self.detector._process_audio() # Even on silence
undefinedPattern 4: Batch Inference
模式4:批量推理
python
undefinedpython
undefinedGood - Process multiple windows in single inference
Good - Process multiple windows in single inference
class BatchDetector:
def init(self, batch_size: int = 4):
self.batch_size = batch_size
self.pending_windows = []
def add_window(self, audio: np.ndarray):
self.pending_windows.append(audio)
if len(self.pending_windows) >= self.batch_size:
batch = np.stack(self.pending_windows)
results = self.model.predict_batch(batch)
self.pending_windows.clear()
return results
return Noneundefinedclass BatchDetector:
def init(self, batch_size: int = 4):
self.batch_size = batch_size
self.pending_windows = []
def add_window(self, audio: np.ndarray):
self.pending_windows.append(audio)
if len(self.pending_windows) >= self.batch_size:
batch = np.stack(self.pending_windows)
results = self.model.predict_batch(batch)
self.pending_windows.clear()
return results
return NoneundefinedPattern 5: Memory-Mapped Models
模式5:内存映射模型
python
undefinedpython
undefinedGood - Memory-map large model files
Good - Memory-map large model files
import mmap
class MmapModelLoader:
def init(self, model_path: str):
self.file = open(model_path, 'rb')
self.mmap = mmap.mmap(self.file.fileno(), 0, access=mmap.ACCESS_READ)
import mmap
class MmapModelLoader:
def init(self, model_path: str):
self.file = open(model_path, 'rb')
self.mmap = mmap.mmap(self.file.fileno(), 0, access=mmap.ACCESS_READ)
Bad - Load entire model into memory
Bad - Load entire model into memory
class EagerModelLoader:
def init(self, model_path: str):
with open(model_path, 'rb') as f:
self.model_data = f.read() # Entire model in RAM
---class EagerModelLoader:
def init(self, model_path: str):
with open(model_path, 'rb') as f:
self.model_data = f.read() # Entire model in RAM
---8. Security Standards
8. 安全标准
python
class PrivacyController:
"""Ensure privacy in always-listening system."""
def __init__(self):
self.is_enabled = True
self.last_activity = time.time()
def check_privacy_mode(self) -> bool:
if self._is_dnd_enabled():
return False
if time.time() - self.last_activity > 3600:
return False
return self.is_enabledpython
class PrivacyController:
"""Ensure privacy in always-listening system."""
def __init__(self):
self.is_enabled = True
self.last_activity = time.time()
def check_privacy_mode(self) -> bool:
if self._is_dnd_enabled():
return False
if time.time() - self.last_activity > 3600:
return False
return self.is_enabledData minimization
Data minimization
MAX_BUFFER_SECONDS = 2.0
def on_wake_detected():
audio_buffer.clear() # Delete immediately
---MAX_BUFFER_SECONDS = 2.0
def on_wake_detected():
audio_buffer.clear() # Delete immediately
---9. Common Mistakes
9. 常见错误
python
undefinedpython
undefinedBAD - Stores all audio
BAD - Stores all audio
def on_audio(chunk):
with open("audio.raw", "ab") as f:
f.write(chunk)
def on_audio(chunk):
with open("audio.raw", "ab") as f:
f.write(chunk)
GOOD - Discard after processing
GOOD - Discard after processing
def on_audio(chunk):
buffer.extend(chunk)
process_buffer()
def on_audio(chunk):
buffer.extend(chunk)
process_buffer()
BAD - Large buffer
BAD - Large buffer
buffer = deque(maxlen=sample_rate * 60) # 1 minute!
buffer = deque(maxlen=sample_rate * 60) # 1 minute!
GOOD - Minimal buffer
GOOD - Minimal buffer
buffer = deque(maxlen=sample_rate * 1.5) # 1.5 seconds
---buffer = deque(maxlen=sample_rate * 1.5) # 1.5 seconds
---10. Pre-Implementation Checklist
10. 预实现检查清单
Phase 1: Before Writing Code
阶段1:编写代码前
- Read TDD workflow section completely
- Set up test file with detection accuracy tests
- Define threshold and performance targets
- Identify which performance patterns apply
- Review privacy requirements
- 完整阅读TDD工作流章节
- 编写包含检测准确性测试的测试文件
- 定义阈值和性能目标
- 确定适用的性能优化模式
- 审核隐私要求
Phase 2: During Implementation
阶段2:实现过程中
- Write failing test for each feature first
- Implement minimal code to pass test
- Apply performance patterns (VAD, quantization)
- Buffer size minimal (<2 seconds)
- Audio cleared after detection
- 为每个功能先编写失败的测试用例
- 编写满足测试的最小代码
- 应用性能优化模式(VAD、量化)
- 缓冲区大小最小化(<2秒)
- 检测后立即清除音频
Phase 3: Before Committing
阶段3:提交代码前
- All tests pass:
pytest tests/test_wake_word.py -v - Coverage >80%:
pytest --cov=wake_word - False positive rate <1/hour tested
- CPU usage <5% measured
- Memory usage <100MB verified
- Audio never stored to disk
- 所有测试通过:
pytest tests/test_wake_word.py -v - 测试覆盖率>80%:
pytest --cov=wake_word - 验证误报率<1次/小时
- 测量CPU占用<5%
- 验证内存占用<100MB
- 音频绝不会存储到磁盘
11. Summary
11. 总结
Your goal is to create wake word detection that is:
- Private: Audio processed locally, minimal retention
- Efficient: Low CPU (<5%), low memory (<100MB)
- Accurate: Low false positive rate (<1/hour)
- Test-Driven: All features have tests first
Critical Reminders:
- Write tests before implementation
- Never store audio to disk
- Keep buffer minimal (<2 seconds)
- Apply performance patterns (VAD, quantization)
您的目标是创建具备以下特性的唤醒词检测系统:
- 隐私安全:音频本地处理,最小化留存
- 高效:低CPU占用(<5%),低内存占用(<100MB)
- 准确:低误报率(<1次/小时)
- 测试驱动:所有功能先编写测试
重要提醒:
- 先编写测试再实现代码
- 绝不将音频存储到磁盘
- 保持缓冲区最小(<2秒)
- 应用性能优化模式(VAD、量化)