streaming-api-patterns
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseStreaming API Patterns
流式API模式
Overview
概述
When to use this skill:
- Streaming LLM responses (ChatGPT-style interfaces)
- Real-time notifications and updates
- Live data feeds (stock prices, analytics)
- Chat applications
- Progress updates for long-running tasks
- Collaborative editing features
何时使用该技能:
- 流式LLM响应(ChatGPT风格界面)
- 实时通知与更新
- 实时数据馈送(股票价格、分析数据)
- 聊天应用
- 长时间任务的进度更新
- 协同编辑功能
Core Technologies
核心技术
1. Server-Sent Events (SSE)
1. Server-Sent Events (SSE)
Best for: Server-to-client streaming (LLM responses, notifications)
typescript
// Next.js Route Handler
export async function GET(req: Request) {
const encoder = new TextEncoder()
const stream = new ReadableStream({
async start(controller) {
// Send data
controller.enqueue(encoder.encode('data: Hello\n\n'))
// Keep connection alive
const interval = setInterval(() => {
controller.enqueue(encoder.encode(': keepalive\n\n'))
}, 30000)
// Cleanup
req.signal.addEventListener('abort', () => {
clearInterval(interval)
controller.close()
})
}
})
return new Response(stream, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
}
})
}
// Client
const eventSource = new EventSource('/api/stream')
eventSource.onmessage = (event) => {
console.log(event.data)
}最适用于: 服务器到客户端的流式传输(LLM响应、通知)
typescript
// Next.js Route Handler
export async function GET(req: Request) {
const encoder = new TextEncoder()
const stream = new ReadableStream({
async start(controller) {
// Send data
controller.enqueue(encoder.encode('data: Hello\n\n'))
// Keep connection alive
const interval = setInterval(() => {
controller.enqueue(encoder.encode(': keepalive\n\n'))
}, 30000)
// Cleanup
req.signal.addEventListener('abort', () => {
clearInterval(interval)
controller.close()
})
}
})
return new Response(stream, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
}
})
}
// Client
const eventSource = new EventSource('/api/stream')
eventSource.onmessage = (event) => {
console.log(event.data)
}2. WebSockets
2. WebSockets
Best for: Bidirectional real-time communication (chat, collaboration)
typescript
// WebSocket Server (Next.js with ws)
import { WebSocketServer } from 'ws'
const wss = new WebSocketServer({ port: 8080 })
wss.on('connection', (ws) => {
ws.on('message', (data) => {
// Broadcast to all clients
wss.clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(data)
}
})
})
})
// Client
const ws = new WebSocket('ws://localhost:8080')
ws.onmessage = (event) => console.log(event.data)
ws.send(JSON.stringify({ type: 'message', text: 'Hello' }))最适用于: 双向实时通信(聊天、协作)
typescript
// WebSocket Server (Next.js with ws)
import { WebSocketServer } from 'ws'
const wss = new WebSocketServer({ port: 8080 })
wss.on('connection', (ws) => {
ws.on('message', (data) => {
// Broadcast to all clients
wss.clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(data)
}
})
})
})
// Client
const ws = new WebSocket('ws://localhost:8080')
ws.onmessage = (event) => console.log(event.data)
ws.send(JSON.stringify({ type: 'message', text: 'Hello' }))3. ReadableStream API
3. ReadableStream API
Best for: Processing large data streams with backpressure
typescript
async function* generateData() {
for (let i = 0; i < 1000; i++) {
await new Promise(resolve => setTimeout(resolve, 100))
yield "data-" + i
}
}
const stream = new ReadableStream({
async start(controller) {
for await (const chunk of generateData()) {
controller.enqueue(new TextEncoder().encode(chunk + '\n'))
}
controller.close()
}
})最适用于: 处理带有背压的大型数据流
typescript
async function* generateData() {
for (let i = 0; i < 1000; i++) {
await new Promise(resolve => setTimeout(resolve, 100))
yield "data-" + i
}
}
const stream = new ReadableStream({
async start(controller) {
for await (const chunk of generateData()) {
controller.enqueue(new TextEncoder().encode(chunk + '\n'))
}
controller.close()
}
})LLM Streaming Pattern
LLM流式模式
typescript
// Server
import OpenAI from 'openai'
const openai = new OpenAI()
export async function POST(req: Request) {
const { messages } = await req.json()
const stream = await openai.chat.completions.create({
model: 'gpt-5.2',
messages,
stream: true
})
const encoder = new TextEncoder()
return new Response(
new ReadableStream({
async start(controller) {
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content
if (content) {
controller.enqueue(encoder.encode("data: " + JSON.stringify({ content }) + "\n\n"))
}
}
controller.enqueue(encoder.encode('data: [DONE]\n\n'))
controller.close()
}
}),
{
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache'
}
}
)
}
// Client
async function streamChat(messages) {
const response = await fetch('/api/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ messages })
})
const reader = response.body.getReader()
const decoder = new TextDecoder()
while (true) {
const { done, value } = await reader.read()
if (done) break
const chunk = decoder.decode(value)
const lines = chunk.split('\n')
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6)
if (data === '[DONE]') return
const json = JSON.parse(data)
console.log(json.content) // Stream token
}
}
}
}typescript
// Server
import OpenAI from 'openai'
const openai = new OpenAI()
export async function POST(req: Request) {
const { messages } = await req.json()
const stream = await openai.chat.completions.create({
model: 'gpt-5.2',
messages,
stream: true
})
const encoder = new TextEncoder()
return new Response(
new ReadableStream({
async start(controller) {
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content
if (content) {
controller.enqueue(encoder.encode("data: " + JSON.stringify({ content }) + "\n\n"))
}
}
controller.enqueue(encoder.encode('data: [DONE]\n\n'))
controller.close()
}
}),
{
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache'
}
}
)
}
// Client
async function streamChat(messages) {
const response = await fetch('/api/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ messages })
})
const reader = response.body.getReader()
const decoder = new TextDecoder()
while (true) {
const { done, value } = await reader.read()
if (done) break
const chunk = decoder.decode(value)
const lines = chunk.split('\n')
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6)
if (data === '[DONE]') return
const json = JSON.parse(data)
console.log(json.content) // Stream token
}
}
}
}Reconnection Strategy
重连策略
typescript
class ReconnectingEventSource {
private eventSource: EventSource | null = null
private reconnectDelay = 1000
private maxReconnectDelay = 30000
constructor(private url: string, private onMessage: (data: string) => void) {
this.connect()
}
private connect() {
this.eventSource = new EventSource(this.url)
this.eventSource.onmessage = (event) => {
this.reconnectDelay = 1000 // Reset on success
this.onMessage(event.data)
}
this.eventSource.onerror = () => {
this.eventSource?.close()
// Exponential backoff
setTimeout(() => this.connect(), this.reconnectDelay)
this.reconnectDelay = Math.min(this.reconnectDelay * 2, this.maxReconnectDelay)
}
}
close() {
this.eventSource?.close()
}
}typescript
class ReconnectingEventSource {
private eventSource: EventSource | null = null
private reconnectDelay = 1000
private maxReconnectDelay = 30000
constructor(private url: string, private onMessage: (data: string) => void) {
this.connect()
}
private connect() {
this.eventSource = new EventSource(this.url)
this.eventSource.onmessage = (event) => {
this.reconnectDelay = 1000 // Reset on success
this.onMessage(event.data)
}
this.eventSource.onerror = () => {
this.eventSource?.close()
// Exponential backoff
setTimeout(() => this.connect(), this.reconnectDelay)
this.reconnectDelay = Math.min(this.reconnectDelay * 2, this.maxReconnectDelay)
}
}
close() {
this.eventSource?.close()
}
}Python Async Generator Cleanup ( Best Practice)
Python异步生成器清理(最佳实践)
CRITICAL: Async generators can leak resources if not properly cleaned up. Python 3.10+ provides from to guarantee cleanup.
aclosing()contextlib关键提示: 如果清理不当,异步生成器可能会导致资源泄漏。Python 3.10+ 提供了中的来保证清理。
contextlibaclosing()The Problem
问题示例
python
undefinedpython
undefined❌ DANGEROUS: Generator not closed if exception occurs mid-iteration
❌ 危险:如果迭代中途发生异常,生成器不会关闭
async def stream_analysis():
async for chunk in external_api_stream(): # What if exception here?
yield process(chunk) # Generator may be garbage collected without cleanup
async def stream_analysis():
async for chunk in external_api_stream(): # 如果这里发生异常怎么办?
yield process(chunk) # 生成器可能在未清理的情况下被垃圾回收
❌ ALSO DANGEROUS: Using .aclose() manually is error-prone
❌ 同样危险:手动使用.aclose()容易出错
gen = stream_analysis()
try:
async for chunk in gen:
process(chunk)
finally:
await gen.aclose() # Easy to forget, verbose
undefinedgen = stream_analysis()
try:
async for chunk in gen:
process(chunk)
finally:
await gen.aclose() # 容易遗忘,代码冗长
undefinedThe Solution: aclosing()
aclosing()解决方案:aclosing()
aclosing()python
from contextlib import aclosingpython
from contextlib import aclosing✅ CORRECT: aclosing() guarantees cleanup
✅ 正确:aclosing()保证清理
async def stream_analysis():
async with aclosing(external_api_stream()) as stream:
async for chunk in stream:
yield process(chunk)
async def stream_analysis():
async with aclosing(external_api_stream()) as stream:
async for chunk in stream:
yield process(chunk)
✅ CORRECT: Using aclosing() at consumption site
✅ 正确:在消费端使用aclosing()
async def consume_stream():
async with aclosing(stream_analysis()) as gen:
async for chunk in gen:
handle(chunk)
undefinedasync def consume_stream():
async with aclosing(stream_analysis()) as gen:
async for chunk in gen:
handle(chunk)
undefinedReal-World Pattern: LLM Streaming
实际场景模式:LLM流式传输
python
from contextlib import aclosing
from langchain_core.runnables import RunnableConfig
async def stream_llm_response(prompt: str, config: RunnableConfig | None = None):
"""Stream LLM tokens with guaranteed cleanup."""
async with aclosing(llm.astream(prompt, config=config)) as stream:
async for chunk in stream:
yield chunk.contentpython
from contextlib import aclosing
from langchain_core.runnables import RunnableConfig
async def stream_llm_response(prompt: str, config: RunnableConfig | None = None):
"""流式传输LLM令牌并保证清理。"""
async with aclosing(llm.astream(prompt, config=config)) as stream:
async for chunk in stream:
yield chunk.contentConsumption with proper cleanup
带有适当清理的消费逻辑
async def generate_response(user_input: str):
result_chunks = []
async with aclosing(stream_llm_response(user_input)) as response:
async for token in response:
result_chunks.append(token)
yield token # Stream to client
# Post-processing after stream completes
full_response = "".join(result_chunks)
await log_response(full_response)undefinedasync def generate_response(user_input: str):
result_chunks = []
async with aclosing(stream_llm_response(user_input)) as response:
async for token in response:
result_chunks.append(token)
yield token # 流式传输到客户端
# 流完成后的后处理
full_response = "".join(result_chunks)
await log_response(full_response)undefinedDatabase Connection Pattern
数据库连接模式
python
from contextlib import aclosing
from typing import AsyncIterator
from sqlalchemy.ext.asyncio import AsyncSession
async def stream_large_query(
session: AsyncSession,
batch_size: int = 1000
) -> AsyncIterator[Row]:
"""Stream large query results with automatic connection cleanup."""
result = await session.execute(
select(Model).execution_options(stream_results=True)
)
async with aclosing(result.scalars()) as stream:
async for row in stream:
yield rowpython
from contextlib import aclosing
from typing import AsyncIterator
from sqlalchemy.ext.asyncio import AsyncSession
async def stream_large_query(
session: AsyncSession,
batch_size: int = 1000
) -> AsyncIterator[Row]:
"""流式传输大型查询结果并自动清理连接。"""
result = await session.execute(
select(Model).execution_options(stream_results=True)
)
async with aclosing(result.scalars()) as stream:
async for row in stream:
yield rowWhen to Use aclosing()
aclosing()何时使用aclosing()
aclosing()| Scenario | Use |
|---|---|
| External API streaming (LLM, HTTP) | ✅ Always |
| Database streaming results | ✅ Always |
| File streaming | ✅ Always |
| Simple in-memory generators | ⚠️ Optional (no cleanup needed) |
Generator with | ✅ Always |
| 场景 | 是否使用 |
|---|---|
| 外部API流式传输(LLM、HTTP) | ✅ 始终 |
| 数据库流式结果 | ✅ 始终 |
| 文件流式传输 | ✅ 始终 |
| 简单内存生成器 | ⚠️ 可选(无需清理) |
带有 | ✅ 始终 |
Anti-Patterns to Avoid
需避免的反模式
python
undefinedpython
undefined❌ NEVER: Consuming without aclosing
❌ 绝对不要:不使用aclosing()直接消费
async for chunk in stream_analysis():
process(chunk)
async for chunk in stream_analysis():
process(chunk)
❌ NEVER: Manual try/finally (verbose, error-prone)
❌ 绝对不要:手动try/finally(冗长且易出错)
gen = stream_analysis()
try:
async for chunk in gen:
process(chunk)
finally:
await gen.aclose()
gen = stream_analysis()
try:
async for chunk in gen:
process(chunk)
finally:
await gen.aclose()
❌ NEVER: Assuming GC will handle cleanup
❌ 绝对不要:假设垃圾回收会处理清理
gen = stream_analysis()
gen = stream_analysis()
... later gen goes out of scope without close
... 之后gen超出作用域但未关闭
undefinedundefinedTesting Async Generators
测试异步生成器
python
import pytest
from contextlib import aclosing
@pytest.mark.asyncio
async def test_stream_cleanup_on_error():
"""Test that cleanup happens even when exception raised."""
cleanup_called = False
async def stream_with_cleanup():
nonlocal cleanup_called
try:
yield "data"
yield "more"
finally:
cleanup_called = True
with pytest.raises(ValueError):
async with aclosing(stream_with_cleanup()) as gen:
async for chunk in gen:
raise ValueError("simulated error")
assert cleanup_called, "Cleanup must run even on exception"python
import pytest
from contextlib import aclosing
@pytest.mark.asyncio
async def test_stream_cleanup_on_error():
"""测试即使发生异常也会执行清理。"""
cleanup_called = False
async def stream_with_cleanup():
nonlocal cleanup_called
try:
yield "data"
yield "more"
finally:
cleanup_called = True
with pytest.raises(ValueError):
async with aclosing(stream_with_cleanup()) as gen:
async for chunk in gen:
raise ValueError("simulated error")
assert cleanup_called, "即使发生异常也必须执行清理"Best Practices
最佳实践
SSE
SSE
- ✅ Use for one-way server-to-client streaming
- ✅ Implement automatic reconnection
- ✅ Send keepalive messages every 30s
- ✅ Handle browser connection limits (6 per domain)
- ✅ Use HTTP/2 for better performance
- ✅ 用于单向服务器到客户端流式传输
- ✅ 实现自动重连
- ✅ 每30秒发送一次保活消息
- ✅ 处理浏览器连接限制(每个域名最多6个)
- ✅ 使用HTTP/2提升性能
WebSockets
WebSockets
- ✅ Use for bidirectional real-time communication
- ✅ Implement heartbeat/ping-pong
- ✅ Handle reconnection with exponential backoff
- ✅ Validate and sanitize messages
- ✅ Implement message queuing for offline periods
- ✅ 用于双向实时通信
- ✅ 实现心跳/ ping-pong机制
- ✅ 使用指数退避策略处理重连
- ✅ 验证并清理消息
- ✅ 为离线时段实现消息队列
Backpressure
背压
- ✅ Use ReadableStream with proper flow control
- ✅ Monitor buffer sizes
- ✅ Pause production when consumer is slow
- ✅ Implement timeouts for slow consumers
- ✅ 使用ReadableStream实现适当的流控制
- ✅ 监控缓冲区大小
- ✅ 当消费者速度较慢时暂停生产
- ✅ 为慢速消费者实现超时机制
Performance
性能
- ✅ Compress data (gzip/brotli)
- ✅ Batch small messages
- ✅ Use binary formats (MessagePack, Protobuf) for large data
- ✅ Implement client-side buffering
- ✅ Monitor connection count and resource usage
- ✅ 压缩数据(gzip/brotli)
- ✅ 批量处理小消息
- ✅ 对大型数据使用二进制格式(MessagePack、Protobuf)
- ✅ 实现客户端缓冲
- ✅ 监控连接数和资源使用情况
Resources
相关资源
Related Skills
相关技能
- - LLM-specific streaming patterns for token-by-token responses
llm-streaming - - REST API design patterns for streaming endpoints
api-design-framework - - Cache invalidation patterns for real-time data updates
caching-strategies - - Edge function streaming for low-latency delivery
edge-computing-patterns
- - 针对令牌式响应的LLM专属流式模式
llm-streaming - - 用于流式端点的REST API设计模式
api-design-framework - - 实时数据更新的缓存失效模式
caching-strategies - - 用于低延迟交付的边缘函数流式处理
edge-computing-patterns
Key Decisions
关键决策
| Decision | Choice | Rationale |
|---|---|---|
| Server-to-Client Streaming | SSE | Simple protocol, auto-reconnect, HTTP/2 compatible |
| Bidirectional Communication | WebSockets | Full-duplex, low latency, binary support |
| LLM Token Streaming | ReadableStream + SSE | Backpressure control, standard format |
| Reconnection Strategy | Exponential Backoff | Prevents thundering herd, graceful recovery |
| Async Generator Cleanup | | Guaranteed resource cleanup on exceptions |
| 决策 | 选择 | 理由 |
|---|---|---|
| 服务器到客户端流式传输 | SSE | 协议简单、支持自动重连、兼容HTTP/2 |
| 双向通信 | WebSockets | 全双工、低延迟、支持二进制数据 |
| LLM令牌流式传输 | ReadableStream + SSE | 背压控制、标准格式 |
| 重连策略 | 指数退避 | 防止惊群效应、优雅恢复 |
| 异步生成器清理 | | 保证异常情况下的资源清理 |
Capability Details
能力细节
sse
sse
Keywords: sse, server-sent events, event stream, one-way stream
Solves:
- How do I implement SSE?
- Stream data from server to client
- Real-time notifications
关键词: sse, server-sent events, event stream, one-way stream
解决问题:
- 如何实现SSE?
- 从服务器向客户端流式传输数据
- 实时通知
sse-protocol
sse-protocol
Keywords: sse protocol, event format, event types, sse headers
Solves:
- SSE protocol fundamentals
- Event format and types
- SSE HTTP headers
关键词: sse protocol, event format, event types, sse headers
解决问题:
- SSE协议基础
- 事件格式与类型
- SSE HTTP头
sse-buffering
sse-buffering
Keywords: event buffering, sse race condition, late subscriber, buffer events
Solves:
- How do I buffer SSE events?
- Fix SSE race condition
- Handle late-joining subscribers
关键词: event buffering, sse race condition, late subscriber, buffer events
解决问题:
- 如何缓冲SSE事件?
- 修复SSE竞态条件
- 处理延迟加入的订阅者
sse-reconnection
sse-reconnection
Keywords: sse reconnection, reconnect, last-event-id, retry, exponential backoff
Solves:
- How do I handle SSE reconnection?
- Implement automatic reconnection
- Resume from Last-Event-ID
关键词: sse reconnection, reconnect, last-event-id, retry, exponential backoff
解决问题:
- 如何处理SSE重连?
- 实现自动重连
- 从Last-Event-ID恢复
orchestkit-sse
orchestkit-sse
Keywords: orchestkit sse, event broadcaster, workflow events, analysis progress
Solves:
- How does OrchestKit SSE work?
- EventBroadcaster implementation
- Real-world SSE example
关键词: orchestkit sse, event broadcaster, workflow events, analysis progress
解决问题:
- OrchestKit SSE如何工作?
- EventBroadcaster实现
- 实际场景下的SSE示例
websocket
websocket
Keywords: websocket, ws, bidirectional, real-time chat, socket
Solves:
- How do I set up WebSocket server?
- Build a chat application
- Bidirectional real-time communication
关键词: websocket, ws, bidirectional, real-time chat, socket
解决问题:
- 如何搭建WebSocket服务器?
- 构建聊天应用
- 双向实时通信
llm-streaming
llm-streaming
Keywords: llm stream, chatgpt stream, ai stream, token stream, openai stream
Solves:
- How do I stream LLM responses?
- ChatGPT-style streaming interface
- Stream tokens as they arrive
关键词: llm stream, chatgpt stream, ai stream, token stream, openai stream
解决问题:
- 如何流式传输LLM响应?
- ChatGPT风格的流式界面
- 令牌到达时实时流式传输
backpressure
backpressure
Keywords: backpressure, flow control, buffer, readable stream, transform stream
Solves:
- Handle slow consumers
- Implement backpressure
- Stream large files efficiently
关键词: backpressure, flow control, buffer, readable stream, transform stream
解决问题:
- 处理慢速消费者
- 实现背压
- 高效流式传输大文件
reconnection
reconnection
Keywords: reconnect, connection lost, retry, resilient, heartbeat
Solves:
- Handle connection drops
- Implement automatic reconnection
- Keep-alive and heartbeat
关键词: reconnect, connection lost, retry, resilient, heartbeat
解决问题:
- 处理连接中断
- 实现自动重连
- 保活与心跳机制