streaming-api-patterns

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Streaming API Patterns

流式API模式

Overview

概述

When to use this skill:
  • Streaming LLM responses (ChatGPT-style interfaces)
  • Real-time notifications and updates
  • Live data feeds (stock prices, analytics)
  • Chat applications
  • Progress updates for long-running tasks
  • Collaborative editing features
何时使用该技能:
  • 流式LLM响应(ChatGPT风格界面)
  • 实时通知与更新
  • 实时数据馈送(股票价格、分析数据)
  • 聊天应用
  • 长时间任务的进度更新
  • 协同编辑功能

Core Technologies

核心技术

1. Server-Sent Events (SSE)

1. Server-Sent Events (SSE)

Best for: Server-to-client streaming (LLM responses, notifications)
typescript
// Next.js Route Handler
export async function GET(req: Request) {
  const encoder = new TextEncoder()

  const stream = new ReadableStream({
    async start(controller) {
      // Send data
      controller.enqueue(encoder.encode('data: Hello\n\n'))

      // Keep connection alive
      const interval = setInterval(() => {
        controller.enqueue(encoder.encode(': keepalive\n\n'))
      }, 30000)

      // Cleanup
      req.signal.addEventListener('abort', () => {
        clearInterval(interval)
        controller.close()
      })
    }
  })

  return new Response(stream, {
    headers: {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
      'Connection': 'keep-alive',
    }
  })
}

// Client
const eventSource = new EventSource('/api/stream')
eventSource.onmessage = (event) => {
  console.log(event.data)
}
最适用于: 服务器到客户端的流式传输(LLM响应、通知)
typescript
// Next.js Route Handler
export async function GET(req: Request) {
  const encoder = new TextEncoder()

  const stream = new ReadableStream({
    async start(controller) {
      // Send data
      controller.enqueue(encoder.encode('data: Hello\n\n'))

      // Keep connection alive
      const interval = setInterval(() => {
        controller.enqueue(encoder.encode(': keepalive\n\n'))
      }, 30000)

      // Cleanup
      req.signal.addEventListener('abort', () => {
        clearInterval(interval)
        controller.close()
      })
    }
  })

  return new Response(stream, {
    headers: {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
      'Connection': 'keep-alive',
    }
  })
}

// Client
const eventSource = new EventSource('/api/stream')
eventSource.onmessage = (event) => {
  console.log(event.data)
}

2. WebSockets

2. WebSockets

Best for: Bidirectional real-time communication (chat, collaboration)
typescript
// WebSocket Server (Next.js with ws)
import { WebSocketServer } from 'ws'

const wss = new WebSocketServer({ port: 8080 })

wss.on('connection', (ws) => {
  ws.on('message', (data) => {
    // Broadcast to all clients
    wss.clients.forEach((client) => {
      if (client.readyState === WebSocket.OPEN) {
        client.send(data)
      }
    })
  })
})

// Client
const ws = new WebSocket('ws://localhost:8080')
ws.onmessage = (event) => console.log(event.data)
ws.send(JSON.stringify({ type: 'message', text: 'Hello' }))
最适用于: 双向实时通信(聊天、协作)
typescript
// WebSocket Server (Next.js with ws)
import { WebSocketServer } from 'ws'

const wss = new WebSocketServer({ port: 8080 })

wss.on('connection', (ws) => {
  ws.on('message', (data) => {
    // Broadcast to all clients
    wss.clients.forEach((client) => {
      if (client.readyState === WebSocket.OPEN) {
        client.send(data)
      }
    })
  })
})

// Client
const ws = new WebSocket('ws://localhost:8080')
ws.onmessage = (event) => console.log(event.data)
ws.send(JSON.stringify({ type: 'message', text: 'Hello' }))

3. ReadableStream API

3. ReadableStream API

Best for: Processing large data streams with backpressure
typescript
async function* generateData() {
  for (let i = 0; i < 1000; i++) {
    await new Promise(resolve => setTimeout(resolve, 100))
    yield "data-" + i
  }
}

const stream = new ReadableStream({
  async start(controller) {
    for await (const chunk of generateData()) {
      controller.enqueue(new TextEncoder().encode(chunk + '\n'))
    }
    controller.close()
  }
})
最适用于: 处理带有背压的大型数据流
typescript
async function* generateData() {
  for (let i = 0; i < 1000; i++) {
    await new Promise(resolve => setTimeout(resolve, 100))
    yield "data-" + i
  }
}

const stream = new ReadableStream({
  async start(controller) {
    for await (const chunk of generateData()) {
      controller.enqueue(new TextEncoder().encode(chunk + '\n'))
    }
    controller.close()
  }
})

LLM Streaming Pattern

LLM流式模式

typescript
// Server
import OpenAI from 'openai'

const openai = new OpenAI()

export async function POST(req: Request) {
  const { messages } = await req.json()

  const stream = await openai.chat.completions.create({
    model: 'gpt-5.2',
    messages,
    stream: true
  })

  const encoder = new TextEncoder()

  return new Response(
    new ReadableStream({
      async start(controller) {
        for await (const chunk of stream) {
          const content = chunk.choices[0]?.delta?.content
          if (content) {
            controller.enqueue(encoder.encode("data: " + JSON.stringify({ content }) + "\n\n"))
          }
        }
        controller.enqueue(encoder.encode('data: [DONE]\n\n'))
        controller.close()
      }
    }),
    {
      headers: {
        'Content-Type': 'text/event-stream',
        'Cache-Control': 'no-cache'
      }
    }
  )
}

// Client
async function streamChat(messages) {
  const response = await fetch('/api/chat', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ messages })
  })

  const reader = response.body.getReader()
  const decoder = new TextDecoder()

  while (true) {
    const { done, value } = await reader.read()
    if (done) break

    const chunk = decoder.decode(value)
    const lines = chunk.split('\n')

    for (const line of lines) {
      if (line.startsWith('data: ')) {
        const data = line.slice(6)
        if (data === '[DONE]') return

        const json = JSON.parse(data)
        console.log(json.content) // Stream token
      }
    }
  }
}
typescript
// Server
import OpenAI from 'openai'

const openai = new OpenAI()

export async function POST(req: Request) {
  const { messages } = await req.json()

  const stream = await openai.chat.completions.create({
    model: 'gpt-5.2',
    messages,
    stream: true
  })

  const encoder = new TextEncoder()

  return new Response(
    new ReadableStream({
      async start(controller) {
        for await (const chunk of stream) {
          const content = chunk.choices[0]?.delta?.content
          if (content) {
            controller.enqueue(encoder.encode("data: " + JSON.stringify({ content }) + "\n\n"))
          }
        }
        controller.enqueue(encoder.encode('data: [DONE]\n\n'))
        controller.close()
      }
    }),
    {
      headers: {
        'Content-Type': 'text/event-stream',
        'Cache-Control': 'no-cache'
      }
    }
  )
}

// Client
async function streamChat(messages) {
  const response = await fetch('/api/chat', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ messages })
  })

  const reader = response.body.getReader()
  const decoder = new TextDecoder()

  while (true) {
    const { done, value } = await reader.read()
    if (done) break

    const chunk = decoder.decode(value)
    const lines = chunk.split('\n')

    for (const line of lines) {
      if (line.startsWith('data: ')) {
        const data = line.slice(6)
        if (data === '[DONE]') return

        const json = JSON.parse(data)
        console.log(json.content) // Stream token
      }
    }
  }
}

Reconnection Strategy

重连策略

typescript
class ReconnectingEventSource {
  private eventSource: EventSource | null = null
  private reconnectDelay = 1000
  private maxReconnectDelay = 30000

  constructor(private url: string, private onMessage: (data: string) => void) {
    this.connect()
  }

  private connect() {
    this.eventSource = new EventSource(this.url)

    this.eventSource.onmessage = (event) => {
      this.reconnectDelay = 1000 // Reset on success
      this.onMessage(event.data)
    }

    this.eventSource.onerror = () => {
      this.eventSource?.close()

      // Exponential backoff
      setTimeout(() => this.connect(), this.reconnectDelay)
      this.reconnectDelay = Math.min(this.reconnectDelay * 2, this.maxReconnectDelay)
    }
  }

  close() {
    this.eventSource?.close()
  }
}
typescript
class ReconnectingEventSource {
  private eventSource: EventSource | null = null
  private reconnectDelay = 1000
  private maxReconnectDelay = 30000

  constructor(private url: string, private onMessage: (data: string) => void) {
    this.connect()
  }

  private connect() {
    this.eventSource = new EventSource(this.url)

    this.eventSource.onmessage = (event) => {
      this.reconnectDelay = 1000 // Reset on success
      this.onMessage(event.data)
    }

    this.eventSource.onerror = () => {
      this.eventSource?.close()

      // Exponential backoff
      setTimeout(() => this.connect(), this.reconnectDelay)
      this.reconnectDelay = Math.min(this.reconnectDelay * 2, this.maxReconnectDelay)
    }
  }

  close() {
    this.eventSource?.close()
  }
}

Python Async Generator Cleanup ( Best Practice)

Python异步生成器清理(最佳实践)

CRITICAL: Async generators can leak resources if not properly cleaned up. Python 3.10+ provides
aclosing()
from
contextlib
to guarantee cleanup.
关键提示: 如果清理不当,异步生成器可能会导致资源泄漏。Python 3.10+ 提供了
contextlib
中的
aclosing()
来保证清理。

The Problem

问题示例

python
undefined
python
undefined

❌ DANGEROUS: Generator not closed if exception occurs mid-iteration

❌ 危险:如果迭代中途发生异常,生成器不会关闭

async def stream_analysis(): async for chunk in external_api_stream(): # What if exception here? yield process(chunk) # Generator may be garbage collected without cleanup
async def stream_analysis(): async for chunk in external_api_stream(): # 如果这里发生异常怎么办? yield process(chunk) # 生成器可能在未清理的情况下被垃圾回收

❌ ALSO DANGEROUS: Using .aclose() manually is error-prone

❌ 同样危险:手动使用.aclose()容易出错

gen = stream_analysis() try: async for chunk in gen: process(chunk) finally: await gen.aclose() # Easy to forget, verbose
undefined
gen = stream_analysis() try: async for chunk in gen: process(chunk) finally: await gen.aclose() # 容易遗忘,代码冗长
undefined

The Solution:
aclosing()

解决方案:
aclosing()

python
from contextlib import aclosing
python
from contextlib import aclosing

✅ CORRECT: aclosing() guarantees cleanup

✅ 正确:aclosing()保证清理

async def stream_analysis(): async with aclosing(external_api_stream()) as stream: async for chunk in stream: yield process(chunk)
async def stream_analysis(): async with aclosing(external_api_stream()) as stream: async for chunk in stream: yield process(chunk)

✅ CORRECT: Using aclosing() at consumption site

✅ 正确:在消费端使用aclosing()

async def consume_stream(): async with aclosing(stream_analysis()) as gen: async for chunk in gen: handle(chunk)
undefined
async def consume_stream(): async with aclosing(stream_analysis()) as gen: async for chunk in gen: handle(chunk)
undefined

Real-World Pattern: LLM Streaming

实际场景模式:LLM流式传输

python
from contextlib import aclosing
from langchain_core.runnables import RunnableConfig

async def stream_llm_response(prompt: str, config: RunnableConfig | None = None):
    """Stream LLM tokens with guaranteed cleanup."""
    async with aclosing(llm.astream(prompt, config=config)) as stream:
        async for chunk in stream:
            yield chunk.content
python
from contextlib import aclosing
from langchain_core.runnables import RunnableConfig

async def stream_llm_response(prompt: str, config: RunnableConfig | None = None):
    """流式传输LLM令牌并保证清理。"""
    async with aclosing(llm.astream(prompt, config=config)) as stream:
        async for chunk in stream:
            yield chunk.content

Consumption with proper cleanup

带有适当清理的消费逻辑

async def generate_response(user_input: str): result_chunks = [] async with aclosing(stream_llm_response(user_input)) as response: async for token in response: result_chunks.append(token) yield token # Stream to client
# Post-processing after stream completes
full_response = "".join(result_chunks)
await log_response(full_response)
undefined
async def generate_response(user_input: str): result_chunks = [] async with aclosing(stream_llm_response(user_input)) as response: async for token in response: result_chunks.append(token) yield token # 流式传输到客户端
# 流完成后的后处理
full_response = "".join(result_chunks)
await log_response(full_response)
undefined

Database Connection Pattern

数据库连接模式

python
from contextlib import aclosing
from typing import AsyncIterator
from sqlalchemy.ext.asyncio import AsyncSession

async def stream_large_query(
    session: AsyncSession,
    batch_size: int = 1000
) -> AsyncIterator[Row]:
    """Stream large query results with automatic connection cleanup."""
    result = await session.execute(
        select(Model).execution_options(stream_results=True)
    )

    async with aclosing(result.scalars()) as stream:
        async for row in stream:
            yield row
python
from contextlib import aclosing
from typing import AsyncIterator
from sqlalchemy.ext.asyncio import AsyncSession

async def stream_large_query(
    session: AsyncSession,
    batch_size: int = 1000
) -> AsyncIterator[Row]:
    """流式传输大型查询结果并自动清理连接。"""
    result = await session.execute(
        select(Model).execution_options(stream_results=True)
    )

    async with aclosing(result.scalars()) as stream:
        async for row in stream:
            yield row

When to Use
aclosing()

何时使用
aclosing()

ScenarioUse
aclosing()
External API streaming (LLM, HTTP)Always
Database streaming resultsAlways
File streamingAlways
Simple in-memory generators⚠️ Optional (no cleanup needed)
Generator with
try/finally
cleanup
Always
场景是否使用
aclosing()
外部API流式传输(LLM、HTTP)始终
数据库流式结果始终
文件流式传输始终
简单内存生成器⚠️ 可选(无需清理)
带有
try/finally
清理的生成器
始终

Anti-Patterns to Avoid

需避免的反模式

python
undefined
python
undefined

❌ NEVER: Consuming without aclosing

❌ 绝对不要:不使用aclosing()直接消费

async for chunk in stream_analysis(): process(chunk)
async for chunk in stream_analysis(): process(chunk)

❌ NEVER: Manual try/finally (verbose, error-prone)

❌ 绝对不要:手动try/finally(冗长且易出错)

gen = stream_analysis() try: async for chunk in gen: process(chunk) finally: await gen.aclose()
gen = stream_analysis() try: async for chunk in gen: process(chunk) finally: await gen.aclose()

❌ NEVER: Assuming GC will handle cleanup

❌ 绝对不要:假设垃圾回收会处理清理

gen = stream_analysis()
gen = stream_analysis()

... later gen goes out of scope without close

... 之后gen超出作用域但未关闭

undefined
undefined

Testing Async Generators

测试异步生成器

python
import pytest
from contextlib import aclosing

@pytest.mark.asyncio
async def test_stream_cleanup_on_error():
    """Test that cleanup happens even when exception raised."""
    cleanup_called = False

    async def stream_with_cleanup():
        nonlocal cleanup_called
        try:
            yield "data"
            yield "more"
        finally:
            cleanup_called = True

    with pytest.raises(ValueError):
        async with aclosing(stream_with_cleanup()) as gen:
            async for chunk in gen:
                raise ValueError("simulated error")

    assert cleanup_called, "Cleanup must run even on exception"
python
import pytest
from contextlib import aclosing

@pytest.mark.asyncio
async def test_stream_cleanup_on_error():
    """测试即使发生异常也会执行清理。"""
    cleanup_called = False

    async def stream_with_cleanup():
        nonlocal cleanup_called
        try:
            yield "data"
            yield "more"
        finally:
            cleanup_called = True

    with pytest.raises(ValueError):
        async with aclosing(stream_with_cleanup()) as gen:
            async for chunk in gen:
                raise ValueError("simulated error")

    assert cleanup_called, "即使发生异常也必须执行清理"

Best Practices

最佳实践

SSE

SSE

  • ✅ Use for one-way server-to-client streaming
  • ✅ Implement automatic reconnection
  • ✅ Send keepalive messages every 30s
  • ✅ Handle browser connection limits (6 per domain)
  • ✅ Use HTTP/2 for better performance
  • ✅ 用于单向服务器到客户端流式传输
  • ✅ 实现自动重连
  • ✅ 每30秒发送一次保活消息
  • ✅ 处理浏览器连接限制(每个域名最多6个)
  • ✅ 使用HTTP/2提升性能

WebSockets

WebSockets

  • ✅ Use for bidirectional real-time communication
  • ✅ Implement heartbeat/ping-pong
  • ✅ Handle reconnection with exponential backoff
  • ✅ Validate and sanitize messages
  • ✅ Implement message queuing for offline periods
  • ✅ 用于双向实时通信
  • ✅ 实现心跳/ ping-pong机制
  • ✅ 使用指数退避策略处理重连
  • ✅ 验证并清理消息
  • ✅ 为离线时段实现消息队列

Backpressure

背压

  • ✅ Use ReadableStream with proper flow control
  • ✅ Monitor buffer sizes
  • ✅ Pause production when consumer is slow
  • ✅ Implement timeouts for slow consumers
  • ✅ 使用ReadableStream实现适当的流控制
  • ✅ 监控缓冲区大小
  • ✅ 当消费者速度较慢时暂停生产
  • ✅ 为慢速消费者实现超时机制

Performance

性能

  • ✅ Compress data (gzip/brotli)
  • ✅ Batch small messages
  • ✅ Use binary formats (MessagePack, Protobuf) for large data
  • ✅ Implement client-side buffering
  • ✅ Monitor connection count and resource usage
  • ✅ 压缩数据(gzip/brotli)
  • ✅ 批量处理小消息
  • ✅ 对大型数据使用二进制格式(MessagePack、Protobuf)
  • ✅ 实现客户端缓冲
  • ✅ 监控连接数和资源使用情况

Resources

相关资源

Related Skills

相关技能

  • llm-streaming
    - LLM-specific streaming patterns for token-by-token responses
  • api-design-framework
    - REST API design patterns for streaming endpoints
  • caching-strategies
    - Cache invalidation patterns for real-time data updates
  • edge-computing-patterns
    - Edge function streaming for low-latency delivery
  • llm-streaming
    - 针对令牌式响应的LLM专属流式模式
  • api-design-framework
    - 用于流式端点的REST API设计模式
  • caching-strategies
    - 实时数据更新的缓存失效模式
  • edge-computing-patterns
    - 用于低延迟交付的边缘函数流式处理

Key Decisions

关键决策

DecisionChoiceRationale
Server-to-Client StreamingSSESimple protocol, auto-reconnect, HTTP/2 compatible
Bidirectional CommunicationWebSocketsFull-duplex, low latency, binary support
LLM Token StreamingReadableStream + SSEBackpressure control, standard format
Reconnection StrategyExponential BackoffPrevents thundering herd, graceful recovery
Async Generator Cleanup
aclosing()
Guaranteed resource cleanup on exceptions
决策选择理由
服务器到客户端流式传输SSE协议简单、支持自动重连、兼容HTTP/2
双向通信WebSockets全双工、低延迟、支持二进制数据
LLM令牌流式传输ReadableStream + SSE背压控制、标准格式
重连策略指数退避防止惊群效应、优雅恢复
异步生成器清理
aclosing()
保证异常情况下的资源清理

Capability Details

能力细节

sse

sse

Keywords: sse, server-sent events, event stream, one-way stream Solves:
  • How do I implement SSE?
  • Stream data from server to client
  • Real-time notifications
关键词: sse, server-sent events, event stream, one-way stream 解决问题:
  • 如何实现SSE?
  • 从服务器向客户端流式传输数据
  • 实时通知

sse-protocol

sse-protocol

Keywords: sse protocol, event format, event types, sse headers Solves:
  • SSE protocol fundamentals
  • Event format and types
  • SSE HTTP headers
关键词: sse protocol, event format, event types, sse headers 解决问题:
  • SSE协议基础
  • 事件格式与类型
  • SSE HTTP头

sse-buffering

sse-buffering

Keywords: event buffering, sse race condition, late subscriber, buffer events Solves:
  • How do I buffer SSE events?
  • Fix SSE race condition
  • Handle late-joining subscribers
关键词: event buffering, sse race condition, late subscriber, buffer events 解决问题:
  • 如何缓冲SSE事件?
  • 修复SSE竞态条件
  • 处理延迟加入的订阅者

sse-reconnection

sse-reconnection

Keywords: sse reconnection, reconnect, last-event-id, retry, exponential backoff Solves:
  • How do I handle SSE reconnection?
  • Implement automatic reconnection
  • Resume from Last-Event-ID
关键词: sse reconnection, reconnect, last-event-id, retry, exponential backoff 解决问题:
  • 如何处理SSE重连?
  • 实现自动重连
  • 从Last-Event-ID恢复

orchestkit-sse

orchestkit-sse

Keywords: orchestkit sse, event broadcaster, workflow events, analysis progress Solves:
  • How does OrchestKit SSE work?
  • EventBroadcaster implementation
  • Real-world SSE example
关键词: orchestkit sse, event broadcaster, workflow events, analysis progress 解决问题:
  • OrchestKit SSE如何工作?
  • EventBroadcaster实现
  • 实际场景下的SSE示例

websocket

websocket

Keywords: websocket, ws, bidirectional, real-time chat, socket Solves:
  • How do I set up WebSocket server?
  • Build a chat application
  • Bidirectional real-time communication
关键词: websocket, ws, bidirectional, real-time chat, socket 解决问题:
  • 如何搭建WebSocket服务器?
  • 构建聊天应用
  • 双向实时通信

llm-streaming

llm-streaming

Keywords: llm stream, chatgpt stream, ai stream, token stream, openai stream Solves:
  • How do I stream LLM responses?
  • ChatGPT-style streaming interface
  • Stream tokens as they arrive
关键词: llm stream, chatgpt stream, ai stream, token stream, openai stream 解决问题:
  • 如何流式传输LLM响应?
  • ChatGPT风格的流式界面
  • 令牌到达时实时流式传输

backpressure

backpressure

Keywords: backpressure, flow control, buffer, readable stream, transform stream Solves:
  • Handle slow consumers
  • Implement backpressure
  • Stream large files efficiently
关键词: backpressure, flow control, buffer, readable stream, transform stream 解决问题:
  • 处理慢速消费者
  • 实现背压
  • 高效流式传输大文件

reconnection

reconnection

Keywords: reconnect, connection lost, retry, resilient, heartbeat Solves:
  • Handle connection drops
  • Implement automatic reconnection
  • Keep-alive and heartbeat
关键词: reconnect, connection lost, retry, resilient, heartbeat 解决问题:
  • 处理连接中断
  • 实现自动重连
  • 保活与心跳机制