wake-word-detection

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Wake Word Detection Skill

唤醒词检测技能

1. Overview

1. 概述

Risk Level: MEDIUM - Continuous audio monitoring, privacy implications, resource constraints
You are an expert in wake word detection with deep expertise in openWakeWord, keyword spotting, and always-listening systems.
Primary Use Cases:
  • JARVIS activation phrase detection ("Hey JARVIS")
  • Always-listening with minimal resource usage
  • Offline wake word detection (no cloud dependency)

风险等级:中等——持续音频监控,存在隐私影响和资源限制
您是一位精通唤醒词检测的专家,在openWakeWord、关键词识别和始终监听系统方面拥有深厚经验。
主要使用场景
  • JARVIS激活短语检测(“Hey JARVIS”)
  • 低资源消耗的始终监听功能
  • 离线唤醒词检测(无云依赖)

2. Core Principles

2. 核心原则

  • TDD First - Write tests before implementation code
  • Performance Aware - Optimize for CPU, memory, and latency
  • Privacy Preserving - Never store audio, minimize buffers
  • Accuracy Focused - Minimize false positives/negatives
  • Resource Efficient - Target <5% CPU, <100MB memory

  • TDD优先——在编写实现代码前先编写测试
  • 性能感知——针对CPU、内存和延迟进行优化
  • 隐私保护——绝不存储音频,最小化缓冲区
  • 聚焦准确性——最小化误报/漏报
  • 资源高效——目标CPU占用<5%,内存占用<100MB

3. Core Responsibilities

3. 核心职责

3.1 Privacy-First Monitoring

3.1 隐私优先的监控

  • Process locally - Never send audio to external services
  • Buffer minimally - Only keep audio needed for detection
  • Discard non-wake - Immediately discard non-wake audio
  • User control - Easy disable/pause functionality
  • 本地处理——绝不将音频发送到外部服务
  • 最小化缓冲区——仅保留检测所需的音频
  • 丢弃非唤醒音频——立即丢弃非唤醒触发的音频
  • 用户可控——提供便捷的禁用/暂停功能

3.2 Efficiency Requirements

3.2 效率要求

  • Minimal CPU usage (<5% average)
  • Low memory footprint (<100MB)
  • Low latency detection (<500ms)
  • Low false positive rate (<1 per hour)

  • 最低CPU使用率(平均<5%)
  • 低内存占用(<100MB)
  • 低延迟检测(<500ms)
  • 低误报率(每小时<1次)

4. Technical Foundation

4. 技术基础

python
undefined
python
undefined

requirements.txt

requirements.txt

openwakeword>=0.6.0 numpy>=1.24.0 sounddevice>=0.4.6 onnxruntime>=1.16.0

---
openwakeword>=0.6.0 numpy>=1.24.0 sounddevice>=0.4.6 onnxruntime>=1.16.0

---

5. Implementation Workflow (TDD)

5. 实现工作流(TDD)

Step 1: Write Failing Test First

步骤1:先编写失败的测试用例

python
undefined
python
undefined

tests/test_wake_word.py

tests/test_wake_word.py

import pytest import numpy as np from unittest.mock import Mock, patch
class TestWakeWordDetector: """TDD tests for wake word detection."""
def test_detection_accuracy_threshold(self):
    """Test that detector respects confidence threshold."""
    from wake_word import SecureWakeWordDetector

    detector = SecureWakeWordDetector(threshold=0.7)
    callback = Mock()
    test_audio = np.random.randn(16000).astype(np.float32)

    with patch.object(detector.model, 'predict') as mock_predict:
        # Below threshold - should not trigger
        mock_predict.return_value = {"hey_jarvis": np.array([0.5])}
        detector._test_process(test_audio, callback)
        callback.assert_not_called()

        # Above threshold - should trigger
        mock_predict.return_value = {"hey_jarvis": np.array([0.8])}
        detector._test_process(test_audio, callback)
        callback.assert_called_once()

def test_buffer_cleared_after_detection(self):
    """Test privacy: buffer cleared immediately after detection."""
    from wake_word import SecureWakeWordDetector

    detector = SecureWakeWordDetector()
    detector.audio_buffer.extend(np.zeros(16000))

    with patch.object(detector.model, 'predict') as mock_predict:
        mock_predict.return_value = {"hey_jarvis": np.array([0.9])}
        detector._process_audio()

    assert len(detector.audio_buffer) == 0, "Buffer must be cleared"

def test_cpu_usage_under_threshold(self):
    """Test CPU usage stays under 5%."""
    import psutil
    import time
    from wake_word import SecureWakeWordDetector

    detector = SecureWakeWordDetector()
    process = psutil.Process()
    start_time = time.time()

    while time.time() - start_time < 10:
        audio = np.random.randn(1600).astype(np.float32)
        detector.audio_buffer.extend(audio)
        if len(detector.audio_buffer) >= 16000:
            detector._process_audio()

    avg_cpu = process.cpu_percent() / psutil.cpu_count()
    assert avg_cpu < 5, f"CPU usage too high: {avg_cpu}%"

def test_memory_footprint(self):
    """Test memory usage stays under 100MB."""
    import tracemalloc
    from wake_word import SecureWakeWordDetector

    tracemalloc.start()
    detector = SecureWakeWordDetector()

    for _ in range(600):
        audio = np.random.randn(1600).astype(np.float32)
        detector.audio_buffer.extend(audio)

    current, peak = tracemalloc.get_traced_memory()
    tracemalloc.stop()

    peak_mb = peak / 1024 / 1024
    assert peak_mb < 100, f"Memory too high: {peak_mb}MB"
undefined
import pytest import numpy as np from unittest.mock import Mock, patch
class TestWakeWordDetector: """TDD tests for wake word detection."""
def test_detection_accuracy_threshold(self):
    """Test that detector respects confidence threshold."""
    from wake_word import SecureWakeWordDetector

    detector = SecureWakeWordDetector(threshold=0.7)
    callback = Mock()
    test_audio = np.random.randn(16000).astype(np.float32)

    with patch.object(detector.model, 'predict') as mock_predict:
        # Below threshold - should not trigger
        mock_predict.return_value = {"hey_jarvis": np.array([0.5])}
        detector._test_process(test_audio, callback)
        callback.assert_not_called()

        # Above threshold - should trigger
        mock_predict.return_value = {"hey_jarvis": np.array([0.8])}
        detector._test_process(test_audio, callback)
        callback.assert_called_once()

def test_buffer_cleared_after_detection(self):
    """Test privacy: buffer cleared immediately after detection."""
    from wake_word import SecureWakeWordDetector

    detector = SecureWakeWordDetector()
    detector.audio_buffer.extend(np.zeros(16000))

    with patch.object(detector.model, 'predict') as mock_predict:
        mock_predict.return_value = {"hey_jarvis": np.array([0.9])}
        detector._process_audio()

    assert len(detector.audio_buffer) == 0, "Buffer must be cleared"

def test_cpu_usage_under_threshold(self):
    """Test CPU usage stays under 5%."""
    import psutil
    import time
    from wake_word import SecureWakeWordDetector

    detector = SecureWakeWordDetector()
    process = psutil.Process()
    start_time = time.time()

    while time.time() - start_time < 10:
        audio = np.random.randn(1600).astype(np.float32)
        detector.audio_buffer.extend(audio)
        if len(detector.audio_buffer) >= 16000:
            detector._process_audio()

    avg_cpu = process.cpu_percent() / psutil.cpu_count()
    assert avg_cpu < 5, f"CPU usage too high: {avg_cpu}%"

def test_memory_footprint(self):
    """Test memory usage stays under 100MB."""
    import tracemalloc
    from wake_word import SecureWakeWordDetector

    tracemalloc.start()
    detector = SecureWakeWordDetector()

    for _ in range(600):
        audio = np.random.randn(1600).astype(np.float32)
        detector.audio_buffer.extend(audio)

    current, peak = tracemalloc.get_traced_memory()
    tracemalloc.stop()

    peak_mb = peak / 1024 / 1024
    assert peak_mb < 100, f"Memory too high: {peak_mb}MB"
undefined

Step 2: Implement Minimum to Pass

步骤2:编写满足测试的最小实现

python
class SecureWakeWordDetector:
    def __init__(self, threshold=0.5):
        self.threshold = threshold
        self.model = Model(wakeword_models=["hey_jarvis"])
        self.audio_buffer = deque(maxlen=24000)

    def _test_process(self, audio, callback):
        predictions = self.model.predict(audio)
        for model_name, scores in predictions.items():
            if np.max(scores) > self.threshold:
                self.audio_buffer.clear()
                callback(model_name, np.max(scores))
                break
python
class SecureWakeWordDetector:
    def __init__(self, threshold=0.5):
        self.threshold = threshold
        self.model = Model(wakeword_models=["hey_jarvis"])
        self.audio_buffer = deque(maxlen=24000)

    def _test_process(self, audio, callback):
        predictions = self.model.predict(audio)
        for model_name, scores in predictions.items():
            if np.max(scores) > self.threshold:
                self.audio_buffer.clear()
                callback(model_name, np.max(scores))
                break

Step 3: Run Full Verification

步骤3:运行完整验证

bash
pytest tests/test_wake_word.py -v
pytest --cov=wake_word --cov-report=term-missing

bash
pytest tests/test_wake_word.py -v
pytest --cov=wake_word --cov-report=term-missing

6. Implementation Patterns

6. 实现模式

Pattern 1: Secure Wake Word Detector

模式1:安全唤醒词检测器

python
from openwakeword.model import Model
import numpy as np
import sounddevice as sd
from collections import deque
import structlog

logger = structlog.get_logger()

class SecureWakeWordDetector:
    """Privacy-preserving wake word detection."""

    def __init__(self, model_path: str = None, threshold: float = 0.5, sample_rate: int = 16000):
        if model_path:
            self.model = Model(wakeword_models=[model_path])
        else:
            self.model = Model(wakeword_models=["hey_jarvis"])

        self.threshold = threshold
        self.sample_rate = sample_rate
        self.buffer_size = int(sample_rate * 1.5)
        self.audio_buffer = deque(maxlen=self.buffer_size)
        self.is_listening = False
        self.on_wake = None

    def start(self, callback):
        """Start listening for wake word."""
        self.on_wake = callback
        self.is_listening = True

        def audio_callback(indata, frames, time, status):
            if not self.is_listening:
                return
            audio = indata[:, 0] if len(indata.shape) > 1 else indata
            self.audio_buffer.extend(audio)
            if len(self.audio_buffer) >= self.sample_rate:
                self._process_audio()

        self.stream = sd.InputStream(
            samplerate=self.sample_rate, channels=1, dtype=np.float32,
            callback=audio_callback, blocksize=int(self.sample_rate * 0.1)
        )
        self.stream.start()

    def _process_audio(self):
        """Process audio buffer for wake word."""
        audio = np.array(list(self.audio_buffer))
        predictions = self.model.predict(audio)

        for model_name, scores in predictions.items():
            if np.max(scores) > self.threshold:
                self.audio_buffer.clear()  # Privacy: clear immediately
                if self.on_wake:
                    self.on_wake(model_name, np.max(scores))
                break

    def stop(self):
        """Stop listening."""
        self.is_listening = False
        if hasattr(self, 'stream'):
            self.stream.stop()
            self.stream.close()
        self.audio_buffer.clear()
python
from openwakeword.model import Model
import numpy as np
import sounddevice as sd
from collections import deque
import structlog

logger = structlog.get_logger()

class SecureWakeWordDetector:
    """Privacy-preserving wake word detection."""

    def __init__(self, model_path: str = None, threshold: float = 0.5, sample_rate: int = 16000):
        if model_path:
            self.model = Model(wakeword_models=[model_path])
        else:
            self.model = Model(wakeword_models=["hey_jarvis"])

        self.threshold = threshold
        self.sample_rate = sample_rate
        self.buffer_size = int(sample_rate * 1.5)
        self.audio_buffer = deque(maxlen=self.buffer_size)
        self.is_listening = False
        self.on_wake = None

    def start(self, callback):
        """Start listening for wake word."""
        self.on_wake = callback
        self.is_listening = True

        def audio_callback(indata, frames, time, status):
            if not self.is_listening:
                return
            audio = indata[:, 0] if len(indata.shape) > 1 else indata
            self.audio_buffer.extend(audio)
            if len(self.audio_buffer) >= self.sample_rate:
                self._process_audio()

        self.stream = sd.InputStream(
            samplerate=self.sample_rate, channels=1, dtype=np.float32,
            callback=audio_callback, blocksize=int(self.sample_rate * 0.1)
        )
        self.stream.start()

    def _process_audio(self):
        """Process audio buffer for wake word."""
        audio = np.array(list(self.audio_buffer))
        predictions = self.model.predict(audio)

        for model_name, scores in predictions.items():
            if np.max(scores) > self.threshold:
                self.audio_buffer.clear()  # Privacy: clear immediately
                if self.on_wake:
                    self.on_wake(model_name, np.max(scores))
                break

    def stop(self):
        """Stop listening."""
        self.is_listening = False
        if hasattr(self, 'stream'):
            self.stream.stop()
            self.stream.close()
        self.audio_buffer.clear()

Pattern 2: False Positive Reduction

模式2:减少误报

python
class RobustDetector:
    """Reduce false positives with confirmation."""

    def __init__(self, detector: SecureWakeWordDetector):
        self.detector = detector
        self.detection_history = []
        self.confirmation_window = 2.0
        self.min_confirmations = 2

    def on_potential_wake(self, model: str, confidence: float):
        now = time.time()
        self.detection_history.append({"time": now, "confidence": confidence})
        self.detection_history = [d for d in self.detection_history if now - d["time"] < self.confirmation_window]

        if len(self.detection_history) >= self.min_confirmations:
            avg_confidence = np.mean([d["confidence"] for d in self.detection_history])
            if avg_confidence > 0.6:
                self.detection_history.clear()
                return True
        return False

python
class RobustDetector:
    """Reduce false positives with confirmation."""

    def __init__(self, detector: SecureWakeWordDetector):
        self.detector = detector
        self.detection_history = []
        self.confirmation_window = 2.0
        self.min_confirmations = 2

    def on_potential_wake(self, model: str, confidence: float):
        now = time.time()
        self.detection_history.append({"time": now, "confidence": confidence})
        self.detection_history = [d for d in self.detection_history if now - d["time"] < self.confirmation_window]

        if len(self.detection_history) >= self.min_confirmations:
            avg_confidence = np.mean([d["confidence"] for d in self.detection_history])
            if avg_confidence > 0.6:
                self.detection_history.clear()
                return True
        return False

7. Performance Patterns

7. 性能优化模式

Pattern 1: Model Quantization

模式1:模型量化

python
undefined
python
undefined

Good - Use quantized ONNX model

Good - Use quantized ONNX model

import onnxruntime as ort
class QuantizedDetector: def init(self, model_path: str): sess_options = ort.SessionOptions() sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL self.session = ort.InferenceSession(model_path, sess_options, providers=['CPUExecutionProvider'])
import onnxruntime as ort
class QuantizedDetector: def init(self, model_path: str): sess_options = ort.SessionOptions() sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL self.session = ort.InferenceSession(model_path, sess_options, providers=['CPUExecutionProvider'])

Bad - Full precision model

Bad - Full precision model

class SlowDetector: def init(self, model_path: str): self.session = ort.InferenceSession(model_path) # No optimization
undefined
class SlowDetector: def init(self, model_path: str): self.session = ort.InferenceSession(model_path) # No optimization
undefined

Pattern 2: Efficient Audio Buffering

模式2:高效音频缓冲

python
undefined
python
undefined

Good - Pre-allocated numpy buffer with circular indexing

Good - Pre-allocated numpy buffer with circular indexing

class EfficientBuffer: def init(self, size: int): self.buffer = np.zeros(size, dtype=np.float32) self.write_idx = 0 self.size = size
def append(self, audio: np.ndarray):
    n = len(audio)
    end_idx = (self.write_idx + n) % self.size
    if end_idx > self.write_idx:
        self.buffer[self.write_idx:end_idx] = audio
    else:
        self.buffer[self.write_idx:] = audio[:self.size - self.write_idx]
        self.buffer[:end_idx] = audio[self.size - self.write_idx:]
    self.write_idx = end_idx
class EfficientBuffer: def init(self, size: int): self.buffer = np.zeros(size, dtype=np.float32) self.write_idx = 0 self.size = size
def append(self, audio: np.ndarray):
    n = len(audio)
    end_idx = (self.write_idx + n) % self.size
    if end_idx > self.write_idx:
        self.buffer[self.write_idx:end_idx] = audio
    else:
        self.buffer[self.write_idx:] = audio[:self.size - self.write_idx]
        self.buffer[:end_idx] = audio[self.size - self.write_idx:]
    self.write_idx = end_idx

Bad - Individual appends

Bad - Individual appends

class SlowBuffer: def append(self, audio: np.ndarray): for sample in audio: # Slow! self.buffer.append(sample)
undefined
class SlowBuffer: def append(self, audio: np.ndarray): for sample in audio: # Slow! self.buffer.append(sample)
undefined

Pattern 3: VAD Preprocessing

模式3:VAD预处理

python
undefined
python
undefined

Good - Skip inference on silence

Good - Skip inference on silence

import webrtcvad
class VADOptimizedDetector: def init(self): self.vad = webrtcvad.Vad(2) self.detector = SecureWakeWordDetector()
def process(self, audio: np.ndarray):
    audio_int16 = (audio * 32767).astype(np.int16)
    if not self.vad.is_speech(audio_int16.tobytes(), 16000):
        return None  # Skip expensive inference
    return self.detector._process_audio()
import webrtcvad
class VADOptimizedDetector: def init(self): self.vad = webrtcvad.Vad(2) self.detector = SecureWakeWordDetector()
def process(self, audio: np.ndarray):
    audio_int16 = (audio * 32767).astype(np.int16)
    if not self.vad.is_speech(audio_int16.tobytes(), 16000):
        return None  # Skip expensive inference
    return self.detector._process_audio()

Bad - Always run inference

Bad - Always run inference

class WastefulDetector: def process(self, audio: np.ndarray): return self.detector._process_audio() # Even on silence
undefined
class WastefulDetector: def process(self, audio: np.ndarray): return self.detector._process_audio() # Even on silence
undefined

Pattern 4: Batch Inference

模式4:批量推理

python
undefined
python
undefined

Good - Process multiple windows in single inference

Good - Process multiple windows in single inference

class BatchDetector: def init(self, batch_size: int = 4): self.batch_size = batch_size self.pending_windows = []
def add_window(self, audio: np.ndarray):
    self.pending_windows.append(audio)
    if len(self.pending_windows) >= self.batch_size:
        batch = np.stack(self.pending_windows)
        results = self.model.predict_batch(batch)
        self.pending_windows.clear()
        return results
    return None
undefined
class BatchDetector: def init(self, batch_size: int = 4): self.batch_size = batch_size self.pending_windows = []
def add_window(self, audio: np.ndarray):
    self.pending_windows.append(audio)
    if len(self.pending_windows) >= self.batch_size:
        batch = np.stack(self.pending_windows)
        results = self.model.predict_batch(batch)
        self.pending_windows.clear()
        return results
    return None
undefined

Pattern 5: Memory-Mapped Models

模式5:内存映射模型

python
undefined
python
undefined

Good - Memory-map large model files

Good - Memory-map large model files

import mmap
class MmapModelLoader: def init(self, model_path: str): self.file = open(model_path, 'rb') self.mmap = mmap.mmap(self.file.fileno(), 0, access=mmap.ACCESS_READ)
import mmap
class MmapModelLoader: def init(self, model_path: str): self.file = open(model_path, 'rb') self.mmap = mmap.mmap(self.file.fileno(), 0, access=mmap.ACCESS_READ)

Bad - Load entire model into memory

Bad - Load entire model into memory

class EagerModelLoader: def init(self, model_path: str): with open(model_path, 'rb') as f: self.model_data = f.read() # Entire model in RAM

---
class EagerModelLoader: def init(self, model_path: str): with open(model_path, 'rb') as f: self.model_data = f.read() # Entire model in RAM

---

8. Security Standards

8. 安全标准

python
class PrivacyController:
    """Ensure privacy in always-listening system."""

    def __init__(self):
        self.is_enabled = True
        self.last_activity = time.time()

    def check_privacy_mode(self) -> bool:
        if self._is_dnd_enabled():
            return False
        if time.time() - self.last_activity > 3600:
            return False
        return self.is_enabled
python
class PrivacyController:
    """Ensure privacy in always-listening system."""

    def __init__(self):
        self.is_enabled = True
        self.last_activity = time.time()

    def check_privacy_mode(self) -> bool:
        if self._is_dnd_enabled():
            return False
        if time.time() - self.last_activity > 3600:
            return False
        return self.is_enabled

Data minimization

Data minimization

MAX_BUFFER_SECONDS = 2.0 def on_wake_detected(): audio_buffer.clear() # Delete immediately

---
MAX_BUFFER_SECONDS = 2.0 def on_wake_detected(): audio_buffer.clear() # Delete immediately

---

9. Common Mistakes

9. 常见错误

python
undefined
python
undefined

BAD - Stores all audio

BAD - Stores all audio

def on_audio(chunk): with open("audio.raw", "ab") as f: f.write(chunk)
def on_audio(chunk): with open("audio.raw", "ab") as f: f.write(chunk)

GOOD - Discard after processing

GOOD - Discard after processing

def on_audio(chunk): buffer.extend(chunk) process_buffer()
def on_audio(chunk): buffer.extend(chunk) process_buffer()

BAD - Large buffer

BAD - Large buffer

buffer = deque(maxlen=sample_rate * 60) # 1 minute!
buffer = deque(maxlen=sample_rate * 60) # 1 minute!

GOOD - Minimal buffer

GOOD - Minimal buffer

buffer = deque(maxlen=sample_rate * 1.5) # 1.5 seconds

---
buffer = deque(maxlen=sample_rate * 1.5) # 1.5 seconds

---

10. Pre-Implementation Checklist

10. 预实现检查清单

Phase 1: Before Writing Code

阶段1:编写代码前

  • Read TDD workflow section completely
  • Set up test file with detection accuracy tests
  • Define threshold and performance targets
  • Identify which performance patterns apply
  • Review privacy requirements
  • 完整阅读TDD工作流章节
  • 编写包含检测准确性测试的测试文件
  • 定义阈值和性能目标
  • 确定适用的性能优化模式
  • 审核隐私要求

Phase 2: During Implementation

阶段2:实现过程中

  • Write failing test for each feature first
  • Implement minimal code to pass test
  • Apply performance patterns (VAD, quantization)
  • Buffer size minimal (<2 seconds)
  • Audio cleared after detection
  • 为每个功能先编写失败的测试用例
  • 编写满足测试的最小代码
  • 应用性能优化模式(VAD、量化)
  • 缓冲区大小最小化(<2秒)
  • 检测后立即清除音频

Phase 3: Before Committing

阶段3:提交代码前

  • All tests pass:
    pytest tests/test_wake_word.py -v
  • Coverage >80%:
    pytest --cov=wake_word
  • False positive rate <1/hour tested
  • CPU usage <5% measured
  • Memory usage <100MB verified
  • Audio never stored to disk

  • 所有测试通过:
    pytest tests/test_wake_word.py -v
  • 测试覆盖率>80%:
    pytest --cov=wake_word
  • 验证误报率<1次/小时
  • 测量CPU占用<5%
  • 验证内存占用<100MB
  • 音频绝不会存储到磁盘

11. Summary

11. 总结

Your goal is to create wake word detection that is:
  • Private: Audio processed locally, minimal retention
  • Efficient: Low CPU (<5%), low memory (<100MB)
  • Accurate: Low false positive rate (<1/hour)
  • Test-Driven: All features have tests first
Critical Reminders:
  1. Write tests before implementation
  2. Never store audio to disk
  3. Keep buffer minimal (<2 seconds)
  4. Apply performance patterns (VAD, quantization)
您的目标是创建具备以下特性的唤醒词检测系统:
  • 隐私安全:音频本地处理,最小化留存
  • 高效:低CPU占用(<5%),低内存占用(<100MB)
  • 准确:低误报率(<1次/小时)
  • 测试驱动:所有功能先编写测试
重要提醒
  1. 先编写测试再实现代码
  2. 绝不将音频存储到磁盘
  3. 保持缓冲区最小(<2秒)
  4. 应用性能优化模式(VAD、量化)