voice-ai-engine-development

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Voice AI Engine Development

语音AI引擎开发

Overview

概述

This skill guides you through building production-ready voice AI engines with real-time conversation capabilities. Voice AI engines enable natural, bidirectional conversations between users and AI agents through streaming audio processing, speech-to-text transcription, LLM-powered responses, and text-to-speech synthesis.
The core architecture uses an async queue-based worker pipeline where each component runs independently and communicates via
asyncio.Queue
objects, enabling concurrent processing, interrupt handling, and real-time streaming at every stage.
本技能将指导您构建具备实时对话能力的生产级语音AI引擎。语音AI引擎通过流式音频处理、语音转文字转录、LLM驱动的响应以及文字转语音合成,实现用户与AI Agent之间自然的双向对话。
核心架构采用基于异步队列的工作流管道,每个组件独立运行并通过
asyncio.Queue
对象通信,支持并发处理、中断处理以及每个阶段的实时流式传输。

When to Use This Skill

适用场景

Use this skill when:
  • Building real-time voice conversation systems
  • Implementing voice assistants or chatbots
  • Creating voice-enabled customer service agents
  • Developing voice AI applications with interrupt capabilities
  • Integrating multiple transcription, LLM, or TTS providers
  • Working with streaming audio processing pipelines
  • The user mentions Vocode, voice engines, or conversational AI
在以下场景中使用本技能:
  • 构建实时语音对话系统
  • 实现语音助手或聊天机器人
  • 创建语音驱动的客服Agent
  • 开发具备中断功能的语音AI应用
  • 集成多种转录、LLM或TTS供应商
  • 处理流式音频处理管道
  • 用户提及Vocode、语音引擎或对话式AI时

Core Architecture Principles

核心架构原则

The Worker Pipeline Pattern

工作流管道模式

Every voice AI engine follows this pipeline:
Audio In → Transcriber → Agent → Synthesizer → Audio Out
           (Worker 1)   (Worker 2)  (Worker 3)
Key Benefits:
  • Decoupling: Workers only know about their input/output queues
  • Concurrency: All workers run simultaneously via asyncio
  • Backpressure: Queues automatically handle rate differences
  • Interruptibility: Everything can be stopped mid-stream
每个语音AI引擎都遵循以下管道:
音频输入 → 转录器 → Agent → 合成器 → 音频输出
           (工作流1)   (工作流2)  (工作流3)
核心优势:
  • 解耦:工作流仅知晓其输入/输出队列
  • 并发:所有工作流通过asyncio同时运行
  • 背压处理:队列自动处理速率差异
  • 可中断性:所有流程可在中途停止

Base Worker Pattern

基础工作流模式

Every worker follows this pattern:
python
class BaseWorker:
    def __init__(self, input_queue, output_queue):
        self.input_queue = input_queue   # asyncio.Queue to consume from
        self.output_queue = output_queue # asyncio.Queue to produce to
        self.active = False
    
    def start(self):
        """Start the worker's processing loop"""
        self.active = True
        asyncio.create_task(self._run_loop())
    
    async def _run_loop(self):
        """Main processing loop - runs forever until terminated"""
        while self.active:
            item = await self.input_queue.get()  # Block until item arrives
            await self.process(item)              # Process the item
    
    async def process(self, item):
        """Override this - does the actual work"""
        raise NotImplementedError
    
    def terminate(self):
        """Stop the worker"""
        self.active = False
每个工作流都遵循以下模式:
python
class BaseWorker:
    def __init__(self, input_queue, output_queue):
        self.input_queue = input_queue   # asyncio.Queue 用于消费数据
        self.output_queue = output_queue # asyncio.Queue 用于生产数据
        self.active = False
    
    def start(self):
        """启动工作流的处理循环"""
        self.active = True
        asyncio.create_task(self._run_loop())
    
    async def _run_loop(self):
        """主处理循环 - 持续运行直至终止"""
        while self.active:
            item = await self.input_queue.get()  # 阻塞直到有数据到达
            await self.process(item)              # 处理数据
    
    async def process(self, item):
        """需重写此方法 - 执行实际处理逻辑"""
        raise NotImplementedError
    
    def terminate(self):
        """停止工作流"""
        self.active = False

Component Implementation Guide

组件实现指南

1. Transcriber (Audio → Text)

1. 转录器(音频 → 文字)

Purpose: Converts incoming audio chunks to text transcriptions
Interface Requirements:
python
class BaseTranscriber:
    def __init__(self, transcriber_config):
        self.input_queue = asyncio.Queue()   # Audio chunks (bytes)
        self.output_queue = asyncio.Queue()  # Transcriptions
        self.is_muted = False
    
    def send_audio(self, chunk: bytes):
        """Client calls this to send audio"""
        if not self.is_muted:
            self.input_queue.put_nowait(chunk)
        else:
            # Send silence instead (prevents echo during bot speech)
            self.input_queue.put_nowait(self.create_silent_chunk(len(chunk)))
    
    def mute(self):
        """Called when bot starts speaking (prevents echo)"""
        self.is_muted = True
    
    def unmute(self):
        """Called when bot stops speaking"""
        self.is_muted = False
Output Format:
python
class Transcription:
    message: str          # "Hello, how are you?"
    confidence: float     # 0.95
    is_final: bool        # True = complete sentence, False = partial
    is_interrupt: bool    # Set by TranscriptionsWorker
Supported Providers:
  • Deepgram - Fast, accurate, streaming
  • AssemblyAI - High accuracy, good for accents
  • Azure Speech - Enterprise-grade
  • Google Cloud Speech - Multi-language support
Critical Implementation Details:
  • Use WebSocket for bidirectional streaming
  • Run sender and receiver tasks concurrently with
    asyncio.gather()
  • Mute transcriber when bot speaks to prevent echo/feedback loops
  • Handle both final and partial transcriptions
用途:将传入的音频块转换为文字转录结果
接口要求:
python
class BaseTranscriber:
    def __init__(self, transcriber_config):
        self.input_queue = asyncio.Queue()   # 音频块(字节)
        self.output_queue = asyncio.Queue()  # 转录结果
        self.is_muted = False
    
    def send_audio(self, chunk: bytes):
        """客户端调用此方法发送音频"""
        if not self.is_muted:
            self.input_queue.put_nowait(chunk)
        else:
            # 发送静音块(防止机器人说话时产生回声)
            self.input_queue.put_nowait(self.create_silent_chunk(len(chunk)))
    
    def mute(self):
        """机器人开始说话时调用(防止回声)"""
        self.is_muted = True
    
    def unmute(self):
        """机器人停止说话时调用"""
        self.is_muted = False
输出格式:
python
class Transcription:
    message: str          # "Hello, how are you?"
    confidence: float     # 0.95
    is_final: bool        # True = 完整语句, False = 部分语句
    is_interrupt: bool    # 由TranscriptionsWorker设置
支持的供应商:
  • Deepgram - 快速、准确、流式处理
  • AssemblyAI - 高精度,适合带口音的语音
  • Azure Speech - 企业级服务
  • Google Cloud Speech - 多语言支持
关键实现细节:
  • 使用WebSocket进行双向流式传输
  • 通过
    asyncio.gather()
    并发运行发送方和接收方任务
  • 机器人说话时静音转录器以防止回声/反馈循环
  • 处理完整和部分转录结果

2. Agent (Text → Response)

2. Agent(文字 → 响应)

Purpose: Processes user input and generates conversational responses
Interface Requirements:
python
class BaseAgent:
    def __init__(self, agent_config):
        self.input_queue = asyncio.Queue()   # TranscriptionAgentInput
        self.output_queue = asyncio.Queue()  # AgentResponse
        self.transcript = None               # Conversation history
    
    async def generate_response(self, human_input, is_interrupt, conversation_id):
        """Override this - returns AsyncGenerator of responses"""
        raise NotImplementedError
Why Streaming Responses?
  • Lower latency: Start speaking as soon as first sentence is ready
  • Better interrupts: Can stop mid-response
  • Sentence-by-sentence: More natural conversation flow
Supported Providers:
  • OpenAI (GPT-4, GPT-3.5) - High quality, fast
  • Google Gemini - Multimodal, cost-effective
  • Anthropic Claude - Long context, nuanced responses
Critical Implementation Details:
  • Maintain conversation history in
    Transcript
    object
  • Stream responses using
    AsyncGenerator
  • IMPORTANT: Buffer entire LLM response before yielding to synthesizer (prevents audio jumping)
  • Handle interrupts by canceling current generation task
  • Update conversation history with partial messages on interrupt
用途:处理用户输入并生成对话式响应
接口要求:
python
class BaseAgent:
    def __init__(self, agent_config):
        self.input_queue = asyncio.Queue()   # TranscriptionAgentInput
        self.output_queue = asyncio.Queue()  # AgentResponse
        self.transcript = None               # 对话历史
    
    async def generate_response(self, human_input, is_interrupt, conversation_id):
        """需重写此方法 - 返回响应的AsyncGenerator"""
        raise NotImplementedError
为什么使用流式响应?
  • 更低延迟:第一句准备好后立即开始语音输出
  • 更好的中断支持:可在中途停止响应
  • 逐句输出:更自然的对话流程
支持的供应商:
  • OpenAI (GPT-4, GPT-3.5) - 高质量、快速
  • Google Gemini - 多模态、高性价比
  • Anthropic Claude - 长上下文、细致响应
关键实现细节:
  • Transcript
    对象中维护对话历史
  • 使用
    AsyncGenerator
    流式输出响应
  • 重要:将完整LLM响应缓冲后再传递给合成器(防止音频跳变)
  • 通过取消当前生成任务处理中断
  • 中断时用部分消息更新对话历史

3. Synthesizer (Text → Audio)

3. 合成器(文字 → 音频)

Purpose: Converts agent text responses to speech audio
Interface Requirements:
python
class BaseSynthesizer:
    async def create_speech(self, message: BaseMessage, chunk_size: int) -> SynthesisResult:
        """
        Returns a SynthesisResult containing:
        - chunk_generator: AsyncGenerator that yields audio chunks
        - get_message_up_to: Function to get partial text (for interrupts)
        """
        raise NotImplementedError
SynthesisResult Structure:
python
class SynthesisResult:
    chunk_generator: AsyncGenerator[ChunkResult, None]
    get_message_up_to: Callable[[float], str]  # seconds → partial text
    
    class ChunkResult:
        chunk: bytes          # Raw PCM audio
        is_last_chunk: bool
Supported Providers:
  • ElevenLabs - Most natural voices, streaming
  • Azure TTS - Enterprise-grade, many languages
  • Google Cloud TTS - Cost-effective, good quality
  • Amazon Polly - AWS integration
  • Play.ht - Voice cloning
Critical Implementation Details:
  • Stream audio chunks as they're generated
  • Convert audio to LINEAR16 PCM format (16kHz sample rate)
  • Implement
    get_message_up_to()
    for interrupt handling
  • Handle audio format conversion (MP3 → PCM)
用途:将Agent的文字响应转换为语音音频
接口要求:
python
class BaseSynthesizer:
    async def create_speech(self, message: BaseMessage, chunk_size: int) -> SynthesisResult:
        """
        返回包含以下内容的SynthesisResult:
        - chunk_generator: 生成音频块的AsyncGenerator
        - get_message_up_to: 获取部分文本的函数(用于中断)
        """
        raise NotImplementedError
SynthesisResult结构:
python
class SynthesisResult:
    chunk_generator: AsyncGenerator[ChunkResult, None]
    get_message_up_to: Callable[[float], str]  # 秒数 → 部分文本
    
    class ChunkResult:
        chunk: bytes          # 原始PCM音频
        is_last_chunk: bool
支持的供应商:
  • ElevenLabs - 最自然的语音、流式处理
  • Azure TTS - 企业级、多语言
  • Google Cloud TTS - 高性价比、质量优良
  • Amazon Polly - AWS集成
  • Play.ht - 语音克隆
关键实现细节:
  • 生成时流式输出音频块
  • 将音频转换为LINEAR16 PCM格式(16kHz采样率)
  • 实现
    get_message_up_to()
    用于中断处理
  • 处理音频格式转换(MP3 → PCM)

4. Output Device (Audio → Client)

4. 输出设备(音频 → 客户端)

Purpose: Sends synthesized audio back to the client
CRITICAL: Rate Limiting for Interrupts
python
async def send_speech_to_output(self, message, synthesis_result,
                                stop_event, seconds_per_chunk):
    chunk_idx = 0
    async for chunk_result in synthesis_result.chunk_generator:
        # Check for interrupt
        if stop_event.is_set():
            logger.debug(f"Interrupted after {chunk_idx} chunks")
            message_sent = synthesis_result.get_message_up_to(
                chunk_idx * seconds_per_chunk
            )
            return message_sent, True  # cut_off = True
        
        start_time = time.time()
        
        # Send chunk to output device
        self.output_device.consume_nonblocking(chunk_result.chunk)
        
        # CRITICAL: Wait for chunk to play before sending next one
        # This is what makes interrupts work!
        speech_length = seconds_per_chunk
        processing_time = time.time() - start_time
        await asyncio.sleep(max(speech_length - processing_time, 0))
        
        chunk_idx += 1
    
    return message, False  # cut_off = False
Why Rate Limiting? Without rate limiting, all audio chunks would be sent immediately, which would:
  • Buffer entire message on client side
  • Make interrupts impossible (all audio already sent)
  • Cause timing issues
By sending one chunk every N seconds:
  • Real-time playback is maintained
  • Interrupts can stop mid-sentence
  • Natural conversation flow is preserved
用途:将合成的音频发送回客户端
关键:中断的速率限制
python
async def send_speech_to_output(self, message, synthesis_result,
                                stop_event, seconds_per_chunk):
    chunk_idx = 0
    async for chunk_result in synthesis_result.chunk_generator:
        # 检查中断
        if stop_event.is_set():
            logger.debug(f"{chunk_idx}个块后中断")
            message_sent = synthesis_result.get_message_up_to(
                chunk_idx * seconds_per_chunk
            )
            return message_sent, True  # cut_off = True
        
        start_time = time.time()
        
        # 发送块到输出设备
        self.output_device.consume_nonblocking(chunk_result.chunk)
        
        # 关键:发送下一个块前等待当前块播放完成
        # 这是中断功能生效的核心!
        speech_length = seconds_per_chunk
        processing_time = time.time() - start_time
        await asyncio.sleep(max(speech_length - processing_time, 0))
        
        chunk_idx += 1
    
    return message, False  # cut_off = False
为什么需要速率限制? 如果没有速率限制,所有音频块会立即发送,导致:
  • 客户端缓冲整个消息
  • 中断功能失效(所有音频已发送)
  • 时序问题
通过每隔N秒发送一个块:
  • 维持实时播放
  • 可在句子中途触发中断
  • 保留自然的对话流程

The Interrupt System

中断系统

The interrupt system is critical for natural conversations.
中断系统是实现自然对话的关键。

How Interrupts Work

中断工作原理

Scenario: Bot is saying "I think the weather will be nice today and tomorrow and—" when user interrupts with "Stop".
Step 1: User starts speaking
python
undefined
场景:机器人正在说"我认为今天和明天的天气都会很好,还有——"时,用户打断说"停"。
步骤1:用户开始说话
python
undefined

TranscriptionsWorker detects new transcription while bot speaking

TranscriptionsWorker在机器人说话时检测到新的转录结果

async def process(self, transcription): if not self.conversation.is_human_speaking: # Bot was speaking! # Broadcast interrupt to all in-flight events interrupted = self.conversation.broadcast_interrupt() transcription.is_interrupt = interrupted

**Step 2: broadcast_interrupt() stops everything**
```python
def broadcast_interrupt(self):
    num_interrupts = 0
    # Interrupt all queued events
    while True:
        try:
            interruptible_event = self.interruptible_events.get_nowait()
            if interruptible_event.interrupt():  # Sets interruption_event
                num_interrupts += 1
        except queue.Empty:
            break
    
    # Cancel current tasks
    self.agent.cancel_current_task()              # Stop generating text
    self.agent_responses_worker.cancel_current_task()  # Stop synthesizing
    return num_interrupts > 0
Step 3: SynthesisResultsWorker detects interrupt
python
async def send_speech_to_output(self, synthesis_result, stop_event, ...):
    async for chunk_result in synthesis_result.chunk_generator:
        # Check stop_event (this is the interruption_event)
        if stop_event.is_set():
            logger.debug("Interrupted! Stopping speech.")
            # Calculate what was actually spoken
            seconds_spoken = chunk_idx * seconds_per_chunk
            partial_message = synthesis_result.get_message_up_to(seconds_spoken)
            # e.g., "I think the weather will be nice today"
            return partial_message, True  # cut_off = True
Step 4: Agent updates history
python
if cut_off:
    # Update conversation history with partial message
    self.agent.update_last_bot_message_on_cut_off(message_sent)
    # History now shows:
    # Bot: "I think the weather will be nice today" (incomplete)
async def process(self, transcription): if not self.conversation.is_human_speaking: # 机器人正在说话! # 向所有进行中的事件广播中断 interrupted = self.conversation.broadcast_interrupt() transcription.is_interrupt = interrupted

**步骤2:broadcast_interrupt()停止所有流程**
```python
def broadcast_interrupt(self):
    num_interrupts = 0
    # 中断所有排队的事件
    while True:
        try:
            interruptible_event = self.interruptible_events.get_nowait()
            if interruptible_event.interrupt():  # 设置interruption_event
                num_interrupts += 1
        except queue.Empty:
            break
    
    # 取消当前任务
    self.agent.cancel_current_task()              # 停止生成文本
    self.agent_responses_worker.cancel_current_task()  # 停止合成
    return num_interrupts > 0
步骤3:SynthesisResultsWorker检测到中断
python
async def send_speech_to_output(self, synthesis_result, stop_event, ...):
    async for chunk_result in synthesis_result.chunk_generator:
        # 检查stop_event(即interruption_event)
        if stop_event.is_set():
            logger.debug("已中断!停止语音输出。")
            # 计算实际已说出的内容
            seconds_spoken = chunk_idx * seconds_per_chunk
            partial_message = synthesis_result.get_message_up_to(seconds_spoken)
            # 例如:"我认为今天的天气会很好"
            return partial_message, True  # cut_off = True
步骤4:Agent更新历史
python
if cut_off:
    # 用部分消息更新对话历史
    self.agent.update_last_bot_message_on_cut_off(message_sent)
    # 历史记录现在显示:
    # 机器人:"我认为今天的天气会很好"(不完整)

InterruptibleEvent Pattern

可中断事件模式

Every event in the pipeline is wrapped in an
InterruptibleEvent
:
python
class InterruptibleEvent:
    def __init__(self, payload, is_interruptible=True):
        self.payload = payload
        self.is_interruptible = is_interruptible
        self.interruption_event = threading.Event()  # Initially not set
        self.interrupted = False
    
    def interrupt(self) -> bool:
        """Interrupt this event"""
        if not self.is_interruptible:
            return False
        if not self.interrupted:
            self.interruption_event.set()  # Signal to stop!
            self.interrupted = True
            return True
        return False
    
    def is_interrupted(self) -> bool:
        return self.interruption_event.is_set()
管道中的每个事件都被包装在
InterruptibleEvent
中:
python
class InterruptibleEvent:
    def __init__(self, payload, is_interruptible=True):
        self.payload = payload
        self.is_interruptible = is_interruptible
        self.interruption_event = threading.Event()  # 初始未设置
        self.interrupted = False
    
    def interrupt(self) -> bool:
        """中断此事件"""
        if not self.is_interruptible:
            return False
        if not self.interrupted:
            self.interruption_event.set()  # 发出停止信号!
            self.interrupted = True
            return True
        return False
    
    def is_interrupted(self) -> bool:
        return self.interruption_event.is_set()

Multi-Provider Factory Pattern

多供应商工厂模式

Support multiple providers with a factory pattern:
python
class VoiceHandler:
    """Multi-provider factory for voice components"""
    
    def create_transcriber(self, agent_config: Dict):
        """Create transcriber based on transcriberProvider"""
        provider = agent_config.get("transcriberProvider", "deepgram")
        
        if provider == "deepgram":
            return self._create_deepgram_transcriber(agent_config)
        elif provider == "assemblyai":
            return self._create_assemblyai_transcriber(agent_config)
        elif provider == "azure":
            return self._create_azure_transcriber(agent_config)
        elif provider == "google":
            return self._create_google_transcriber(agent_config)
        else:
            raise ValueError(f"Unknown transcriber provider: {provider}")
    
    def create_agent(self, agent_config: Dict):
        """Create LLM agent based on llmProvider"""
        provider = agent_config.get("llmProvider", "openai")
        
        if provider == "openai":
            return self._create_openai_agent(agent_config)
        elif provider == "gemini":
            return self._create_gemini_agent(agent_config)
        else:
            raise ValueError(f"Unknown LLM provider: {provider}")
    
    def create_synthesizer(self, agent_config: Dict):
        """Create voice synthesizer based on voiceProvider"""
        provider = agent_config.get("voiceProvider", "elevenlabs")
        
        if provider == "elevenlabs":
            return self._create_elevenlabs_synthesizer(agent_config)
        elif provider == "azure":
            return self._create_azure_synthesizer(agent_config)
        elif provider == "google":
            return self._create_google_synthesizer(agent_config)
        elif provider == "polly":
            return self._create_polly_synthesizer(agent_config)
        elif provider == "playht":
            return self._create_playht_synthesizer(agent_config)
        else:
            raise ValueError(f"Unknown voice provider: {provider}")
通过工厂模式支持多供应商:
python
class VoiceHandler:
    """语音组件的多供应商工厂"""
    
    def create_transcriber(self, agent_config: Dict):
        """根据transcriberProvider创建转录器"""
        provider = agent_config.get("transcriberProvider", "deepgram")
        
        if provider == "deepgram":
            return self._create_deepgram_transcriber(agent_config)
        elif provider == "assemblyai":
            return self._create_assemblyai_transcriber(agent_config)
        elif provider == "azure":
            return self._create_azure_transcriber(agent_config)
        elif provider == "google":
            return self._create_google_transcriber(agent_config)
        else:
            raise ValueError(f"未知的转录器供应商: {provider}")
    
    def create_agent(self, agent_config: Dict):
        """根据llmProvider创建LLM Agent"""
        provider = agent_config.get("llmProvider", "openai")
        
        if provider == "openai":
            return self._create_openai_agent(agent_config)
        elif provider == "gemini":
            return self._create_gemini_agent(agent_config)
        else:
            raise ValueError(f"未知的LLM供应商: {provider}")
    
    def create_synthesizer(self, agent_config: Dict):
        """根据voiceProvider创建语音合成器"""
        provider = agent_config.get("voiceProvider", "elevenlabs")
        
        if provider == "elevenlabs":
            return self._create_elevenlabs_synthesizer(agent_config)
        elif provider == "azure":
            return self._create_azure_synthesizer(agent_config)
        elif provider == "google":
            return self._create_google_synthesizer(agent_config)
        elif provider == "polly":
            return self._create_polly_synthesizer(agent_config)
        elif provider == "playht":
            return self._create_playht_synthesizer(agent_config)
        else:
            raise ValueError(f"未知的语音供应商: {provider}")

WebSocket Integration

WebSocket集成

Voice AI engines typically use WebSocket for bidirectional audio streaming:
python
@app.websocket("/conversation")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    
    # Create voice components
    voice_handler = VoiceHandler()
    transcriber = voice_handler.create_transcriber(agent_config)
    agent = voice_handler.create_agent(agent_config)
    synthesizer = voice_handler.create_synthesizer(agent_config)
    
    # Create output device
    output_device = WebsocketOutputDevice(
        ws=websocket,
        sampling_rate=16000,
        audio_encoding=AudioEncoding.LINEAR16
    )
    
    # Create conversation orchestrator
    conversation = StreamingConversation(
        output_device=output_device,
        transcriber=transcriber,
        agent=agent,
        synthesizer=synthesizer
    )
    
    # Start all workers
    await conversation.start()
    
    try:
        # Receive audio from client
        async for message in websocket.iter_bytes():
            conversation.receive_audio(message)
    except WebSocketDisconnect:
        logger.info("Client disconnected")
    finally:
        await conversation.terminate()
语音AI引擎通常使用WebSocket进行双向音频流式传输:
python
@app.websocket("/conversation")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    
    # 创建语音组件
    voice_handler = VoiceHandler()
    transcriber = voice_handler.create_transcriber(agent_config)
    agent = voice_handler.create_agent(agent_config)
    synthesizer = voice_handler.create_synthesizer(agent_config)
    
    # 创建输出设备
    output_device = WebsocketOutputDevice(
        ws=websocket,
        sampling_rate=16000,
        audio_encoding=AudioEncoding.LINEAR16
    )
    
    # 创建对话编排器
    conversation = StreamingConversation(
        output_device=output_device,
        transcriber=transcriber,
        agent=agent,
        synthesizer=synthesizer
    )
    
    # 启动所有工作流
    await conversation.start()
    
    try:
        # 从客户端接收音频
        async for message in websocket.iter_bytes():
            conversation.receive_audio(message)
    except WebSocketDisconnect:
        logger.info("客户端已断开连接")
    finally:
        await conversation.terminate()

Common Pitfalls and Solutions

常见问题与解决方案

1. Audio Jumping/Cutting Off

1. 音频跳变/中断

Problem: Bot's audio jumps or cuts off mid-response.
Cause: Sending text to synthesizer in small chunks causes multiple TTS calls.
Solution: Buffer the entire LLM response before sending to synthesizer:
python
undefined
问题:机器人的音频在响应中途跳变或中断。
原因:将小文本块发送给合成器导致多次TTS调用。
解决方案:将完整LLM响应缓冲后再发送给合成器:
python
undefined

❌ Bad: Yields sentence-by-sentence

❌ 错误:逐句输出

async for sentence in llm_stream: yield GeneratedResponse(message=BaseMessage(text=sentence))
async for sentence in llm_stream: yield GeneratedResponse(message=BaseMessage(text=sentence))

✅ Good: Buffer entire response

✅ 正确:缓冲完整响应

full_response = "" async for chunk in llm_stream: full_response += chunk yield GeneratedResponse(message=BaseMessage(text=full_response))
undefined
full_response = "" async for chunk in llm_stream: full_response += chunk yield GeneratedResponse(message=BaseMessage(text=full_response))
undefined

2. Echo/Feedback Loop

2. 回声/反馈循环

Problem: Bot hears itself speaking and responds to its own audio.
Cause: Transcriber not muted during bot speech.
Solution: Mute transcriber when bot starts speaking:
python
undefined
问题:机器人听到自己的声音并对自身音频做出响应。
原因:机器人说话时转录器未静音。
解决方案:机器人开始说话时静音转录器:
python
undefined

Before sending audio to output

向输出设备发送音频前

self.transcriber.mute()
self.transcriber.mute()

After audio playback complete

音频播放完成后

self.transcriber.unmute()
undefined
self.transcriber.unmute()
undefined

3. Interrupts Not Working

3. 中断功能失效

Problem: User can't interrupt bot mid-sentence.
Cause: All audio chunks sent at once instead of rate-limited.
Solution: Rate-limit audio chunks to match real-time playback:
python
async for chunk in synthesis_result.chunk_generator:
    start_time = time.time()
    
    # Send chunk
    output_device.consume_nonblocking(chunk)
    
    # Wait for chunk duration before sending next
    processing_time = time.time() - start_time
    await asyncio.sleep(max(seconds_per_chunk - processing_time, 0))
问题:用户无法在机器人说话中途打断。
原因:所有音频块一次性发送而非速率限制发送。
解决方案:对音频块进行速率限制以匹配实时播放:
python
async for chunk in synthesis_result.chunk_generator:
    start_time = time.time()
    
    # 发送块
    output_device.consume_nonblocking(chunk)
    
    # 等待块播放完成后再发送下一个
    processing_time = time.time() - start_time
    await asyncio.sleep(max(seconds_per_chunk - processing_time, 0))

4. Memory Leaks from Unclosed Streams

4. 未关闭流导致内存泄漏

Problem: Memory usage grows over time.
Cause: WebSocket connections or API streams not properly closed.
Solution: Always use context managers and cleanup:
python
try:
    async with websockets.connect(url) as ws:
        # Use websocket
        pass
finally:
    # Cleanup
    await conversation.terminate()
    await transcriber.terminate()
问题:内存使用随时间增长。
原因:WebSocket连接或API流未正确关闭。
解决方案:始终使用上下文管理器并进行清理:
python
try:
    async with websockets.connect(url) as ws:
        # 使用WebSocket
        pass
finally:
    # 清理
    await conversation.terminate()
    await transcriber.terminate()

Production Considerations

生产环境注意事项

1. Error Handling

1. 错误处理

python
async def _run_loop(self):
    while self.active:
        try:
            item = await self.input_queue.get()
            await self.process(item)
        except Exception as e:
            logger.error(f"Worker error: {e}", exc_info=True)
            # Don't crash the worker, continue processing
python
async def _run_loop(self):
    while self.active:
        try:
            item = await self.input_queue.get()
            await self.process(item)
        except Exception as e:
            logger.error(f"工作流错误: {e}", exc_info=True)
            # 不终止工作流,继续处理

2. Graceful Shutdown

2. 优雅关闭

python
async def terminate(self):
    """Gracefully shut down all workers"""
    self.active = False
    
    # Stop all workers
    self.transcriber.terminate()
    self.agent.terminate()
    self.synthesizer.terminate()
    
    # Wait for queues to drain
    await asyncio.sleep(0.5)
    
    # Close connections
    if self.websocket:
        await self.websocket.close()
python
async def terminate(self):
    """优雅关闭所有工作流"""
    self.active = False
    
    # 停止所有工作流
    self.transcriber.terminate()
    self.agent.terminate()
    self.synthesizer.terminate()
    
    # 等待队列排空
    await asyncio.sleep(0.5)
    
    # 关闭连接
    if self.websocket:
        await self.websocket.close()

3. Monitoring and Logging

3. 监控与日志

python
undefined
python
undefined

Log key events

记录关键事件

logger.info(f"🎤 [TRANSCRIBER] Received: '{transcription.message}'") logger.info(f"🤖 [AGENT] Generating response...") logger.info(f"🔊 [SYNTHESIZER] Synthesizing {len(text)} characters") logger.info(f"⚠️ [INTERRUPT] User interrupted bot")
logger.info(f"🎤 [转录器] 收到: '{transcription.message}'") logger.info(f"🤖 [Agent] 正在生成响应...") logger.info(f"🔊 [合成器] 正在合成 {len(text)} 个字符") logger.info(f"⚠️ [中断] 用户打断了机器人")

Track metrics

跟踪指标

metrics.increment("transcriptions.count") metrics.timing("agent.response_time", duration) metrics.gauge("active_conversations", count)
undefined
metrics.increment("transcriptions.count") metrics.timing("agent.response_time", duration) metrics.gauge("active_conversations", count)
undefined

4. Rate Limiting and Quotas

4. 速率限制与配额

python
undefined
python
undefined

Implement rate limiting for API calls

为API调用实现速率限制

from aiolimiter import AsyncLimiter
rate_limiter = AsyncLimiter(max_rate=10, time_period=1) # 10 calls/second
async def call_api(self, data): async with rate_limiter: return await self.client.post(data)
undefined
from aiolimiter import AsyncLimiter
rate_limiter = AsyncLimiter(max_rate=10, time_period=1) # 10次调用/秒
async def call_api(self, data): async with rate_limiter: return await self.client.post(data)
undefined

Key Design Patterns

核心设计模式

1. Producer-Consumer with Queues

1. 生产者-消费者队列模式

python
undefined
python
undefined

Producer

生产者

async def producer(queue): while True: item = await generate_item() queue.put_nowait(item)
async def producer(queue): while True: item = await generate_item() queue.put_nowait(item)

Consumer

消费者

async def consumer(queue): while True: item = await queue.get() await process_item(item)
undefined
async def consumer(queue): while True: item = await queue.get() await process_item(item)
undefined

2. Streaming Generators

2. 流式生成器

Instead of returning complete results:
python
undefined
不要返回完整结果,而是流式输出:
python
undefined

❌ Bad: Wait for entire response

❌ 错误:等待完整响应

async def generate_response(prompt): response = await openai.complete(prompt) # 5 seconds return response
async def generate_response(prompt): response = await openai.complete(prompt) # 5秒 return response

✅ Good: Stream chunks as they arrive

✅ 正确:到达时流式输出块

async def generate_response(prompt): async for chunk in openai.complete(prompt, stream=True): yield chunk # Yield after 0.1s, 0.2s, etc.
undefined
async def generate_response(prompt): async for chunk in openai.complete(prompt, stream=True): yield chunk # 0.1秒、0.2秒等时间点输出
undefined

3. Conversation State Management

3. 对话状态管理

Maintain conversation history for context:
python
class Transcript:
    event_logs: List[Message] = []
    
    def add_human_message(self, text):
        self.event_logs.append(Message(sender=Sender.HUMAN, text=text))
    
    def add_bot_message(self, text):
        self.event_logs.append(Message(sender=Sender.BOT, text=text))
    
    def to_openai_messages(self):
        return [
            {"role": "user" if msg.sender == Sender.HUMAN else "assistant",
             "content": msg.text}
            for msg in self.event_logs
        ]
维护对话历史以保留上下文:
python
class Transcript:
    event_logs: List[Message] = []
    
    def add_human_message(self, text):
        self.event_logs.append(Message(sender=Sender.HUMAN, text=text))
    
    def add_bot_message(self, text):
        self.event_logs.append(Message(sender=Sender.BOT, text=text))
    
    def to_openai_messages(self):
        return [
            {"role": "user" if msg.sender == Sender.HUMAN else "assistant",
             "content": msg.text}
            for msg in self.event_logs
        ]

Testing Strategies

测试策略

1. Unit Test Workers in Isolation

1. 独立单元测试工作流

python
async def test_transcriber():
    transcriber = DeepgramTranscriber(config)
    
    # Mock audio input
    audio_chunk = b'\x00\x01\x02...'
    transcriber.send_audio(audio_chunk)
    
    # Check output
    transcription = await transcriber.output_queue.get()
    assert transcription.message == "expected text"
python
async def test_transcriber():
    transcriber = DeepgramTranscriber(config)
    
    # 模拟音频输入
    audio_chunk = b'\x00\x01\x02...'
    transcriber.send_audio(audio_chunk)
    
    # 检查输出
    transcription = await transcriber.output_queue.get()
    assert transcription.message == "预期文本"

2. Integration Test Pipeline

2. 集成测试管道

python
async def test_full_pipeline():
    # Create all components
    conversation = create_test_conversation()
    
    # Send test audio
    conversation.receive_audio(test_audio_chunk)
    
    # Wait for response
    response = await wait_for_audio_output(timeout=5)
    
    assert response is not None
python
async def test_full_pipeline():
    # 创建所有组件
    conversation = create_test_conversation()
    
    # 发送测试音频
    conversation.receive_audio(test_audio_chunk)
    
    # 等待响应
    response = await wait_for_audio_output(timeout=5)
    
    assert response is not None

3. Test Interrupts

3. 测试中断功能

python
async def test_interrupt():
    conversation = create_test_conversation()
    
    # Start bot speaking
    await conversation.agent.generate_response("Tell me a long story")
    
    # Interrupt mid-response
    await asyncio.sleep(1)  # Let it speak for 1 second
    conversation.broadcast_interrupt()
    
    # Verify partial message in transcript
    last_message = conversation.transcript.event_logs[-1]
    assert last_message.text != full_expected_message
python
async def test_interrupt():
    conversation = create_test_conversation()
    
    # 让机器人开始说话
    await conversation.agent.generate_response("给我讲一个长故事")
    
    # 中途中断
    await asyncio.sleep(1)  # 让它说话1秒
    conversation.broadcast_interrupt()
    
    # 验证转录中的部分消息
    last_message = conversation.transcript.event_logs[-1]
    assert last_message.text != full_expected_message

Implementation Workflow

实现流程

When implementing a voice AI engine:
  1. Start with Base Workers: Implement the base worker pattern first
  2. Add Transcriber: Choose a provider and implement streaming transcription
  3. Add Agent: Implement LLM integration with streaming responses
  4. Add Synthesizer: Implement TTS with audio streaming
  5. Connect Pipeline: Wire all workers together with queues
  6. Add Interrupts: Implement the interrupt system
  7. Add WebSocket: Create WebSocket endpoint for client communication
  8. Test Components: Unit test each worker in isolation
  9. Test Integration: Test the full pipeline end-to-end
  10. Add Error Handling: Implement robust error handling and logging
  11. Optimize: Add rate limiting, monitoring, and performance optimizations
实现语音AI引擎时遵循以下步骤:
  1. 基础工作流:首先实现基础工作流模式
  2. 添加转录器:选择供应商并实现流式转录
  3. 添加Agent:实现带流式响应的LLM集成
  4. 添加合成器:实现带音频流式输出的TTS
  5. 连接管道:通过队列将所有工作流连接起来
  6. 添加中断功能:实现中断系统
  7. 添加WebSocket:创建客户端通信的WebSocket端点
  8. 组件测试:独立单元测试每个工作流
  9. 集成测试:端到端测试完整管道
  10. 错误处理:实现健壮的错误处理和日志
  11. 优化:添加速率限制、监控和性能优化

Related Skills

相关技能

  • @websocket-patterns
    - For WebSocket implementation details
  • @async-python
    - For asyncio and async patterns
  • @streaming-apis
    - For streaming API integration
  • @audio-processing
    - For audio format conversion and processing
  • @systematic-debugging
    - For debugging complex async pipelines
  • @websocket-patterns
    - WebSocket实现细节
  • @async-python
    - asyncio和异步模式
  • @streaming-apis
    - 流式API集成
  • @audio-processing
    - 音频格式转换和处理
  • @systematic-debugging
    - 调试复杂异步管道

Resources

资源

Libraries:
  • asyncio
    - Async programming
  • websockets
    - WebSocket client/server
  • FastAPI
    - WebSocket server framework
  • pydub
    - Audio manipulation
  • numpy
    - Audio data processing
API Providers:
  • Transcription: Deepgram, AssemblyAI, Azure Speech, Google Cloud Speech
  • LLM: OpenAI, Google Gemini, Anthropic Claude
  • TTS: ElevenLabs, Azure TTS, Google Cloud TTS, Amazon Polly, Play.ht
:
  • asyncio
    - 异步编程
  • websockets
    - WebSocket客户端/服务器
  • FastAPI
    - WebSocket服务器框架
  • pydub
    - 音频处理
  • numpy
    - 音频数据处理
API供应商:
  • 转录:Deepgram, AssemblyAI, Azure Speech, Google Cloud Speech
  • LLM: OpenAI, Google Gemini, Anthropic Claude
  • TTS: ElevenLabs, Azure TTS, Google Cloud TTS, Amazon Polly, Play.ht

Summary

总结

Building a voice AI engine requires:
  • ✅ Async worker pipeline for concurrent processing
  • ✅ Queue-based communication between components
  • ✅ Streaming at every stage (transcription, LLM, synthesis)
  • ✅ Interrupt system for natural conversations
  • ✅ Rate limiting for real-time audio playback
  • ✅ Multi-provider support for flexibility
  • ✅ Proper error handling and graceful shutdown
The key insight: Everything must stream and everything must be interruptible for natural, real-time conversations.
构建语音AI引擎需要:
  • ✅ 用于并发处理的异步工作流管道
  • ✅ 基于队列的组件间通信
  • ✅ 每个阶段的流式处理(转录、LLM、合成)
  • ✅ 用于自然对话的中断系统
  • ✅ 实时音频播放的速率限制
  • ✅ 多供应商支持以提升灵活性
  • ✅ 完善的错误处理和优雅关闭
核心要点:所有流程必须支持流式处理和中断,才能实现自然的实时对话。