streaming
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseStreaming Skill
流式处理技能
Implement real-time streaming responses from Claude API using Server-Sent Events (SSE).
使用Server-Sent Events (SSE)实现来自Claude API的实时流式响应。
When to Use This Skill
适用场景
- Real-time user interfaces
- Long-running generations
- Progressive output display
- Tool use with streaming
- Extended thinking visualization
- 实时用户界面
- 长时间生成任务
- 渐进式输出展示
- 带流式处理的工具调用
- 深度思考可视化
SSE Event Flow
SSE事件流
message_start
→ content_block_start
→ content_block_delta (repeated)
→ content_block_stop
→ (more blocks...)
→ message_delta
→ message_stopmessage_start
→ content_block_start
→ content_block_delta (repeated)
→ content_block_stop
→ (more blocks...)
→ message_delta
→ message_stopCore Implementation
核心实现
Basic Text Streaming (Python)
基础文本流式处理(Python)
python
import anthropic
client = anthropic.Anthropic()
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": "Write a short story."}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)python
import anthropic
client = anthropic.Anthropic()
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": "Write a short story."}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)Event-Based Streaming
基于事件的流式处理
python
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": "Hello"}]
) as stream:
for event in stream:
if event.type == "content_block_delta":
if event.delta.type == "text_delta":
print(event.delta.text, end="")
elif event.delta.type == "input_json_delta":
# Tool input (accumulate, don't parse yet!)
tool_input_buffer += event.delta.partial_json
elif event.type == "content_block_stop":
# Now safe to parse tool input
if tool_input_buffer:
tool_input = json.loads(tool_input_buffer)python
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": "Hello"}]
) as stream:
for event in stream:
if event.type == "content_block_delta":
if event.delta.type == "text_delta":
print(event.delta.text, end="")
elif event.delta.type == "input_json_delta":
# Tool input (accumulate, don't parse yet!)
tool_input_buffer += event.delta.partial_json
elif event.type == "content_block_stop":
# Now safe to parse tool input
if tool_input_buffer:
tool_input = json.loads(tool_input_buffer)TypeScript Streaming
TypeScript流式处理
typescript
import Anthropic from '@anthropic-ai/sdk';
const client = new Anthropic();
const stream = client.messages.stream({
model: 'claude-sonnet-4-20250514',
max_tokens: 1024,
messages: [{ role: 'user', content: 'Write a story.' }]
});
for await (const event of stream) {
if (event.type === 'content_block_delta' &&
event.delta.type === 'text_delta') {
process.stdout.write(event.delta.text);
}
}
const finalMessage = await stream.finalMessage();typescript
import Anthropic from '@anthropic-ai/sdk';
const client = new Anthropic();
const stream = client.messages.stream({
model: 'claude-sonnet-4-20250514',
max_tokens: 1024,
messages: [{ role: 'user', content: 'Write a story.' }]
});
for await (const event of stream) {
if (event.type === 'content_block_delta' &&
event.delta.type === 'text_delta') {
process.stdout.write(event.delta.text);
}
}
const finalMessage = await stream.finalMessage();Event Types Reference
事件类型参考
| Event | When | Data |
|---|---|---|
| Beginning | Message metadata |
| Block begins | Block type, index |
| Content chunk | Delta content |
| Block ends | - |
| Message update | Stop reason, usage |
| Complete | - |
| 事件 | 触发时机 | 数据 |
|---|---|---|
| 开始时 | 消息元数据 |
| 块开始时 | 块类型、索引 |
| 内容块传输时 | 增量内容 |
| 块结束时 | - |
| 消息更新时 | 停止原因、用量 |
| 完成时 | - |
Delta Types
增量类型
| Delta Type | Content | When |
|---|---|---|
| | Text content |
| | Tool input |
| | Extended thinking |
| | Thinking signature |
| 增量类型 | 内容 | 触发时机 |
|---|---|---|
| | 文本内容 |
| | 工具输入 |
| | 深度思考内容 |
| | 思考签名 |
Tool Use Streaming
工具调用流式处理
Critical Rule: Never Parse JSON Mid-Stream!
重要规则:绝不要在流传输中途解析JSON!
python
undefinedpython
undefinedWRONG - Will fail on partial JSON!
WRONG - Will fail on partial JSON!
for event in stream:
if event.delta.type == "input_json_delta":
tool_input = json.loads(event.delta.partial_json) # FAILS!
for event in stream:
if event.delta.type == "input_json_delta":
tool_input = json.loads(event.delta.partial_json) # FAILS!
CORRECT - Accumulate then parse
CORRECT - Accumulate then parse
tool_json_buffer = ""
for event in stream:
if event.delta.type == "input_json_delta":
tool_json_buffer += event.delta.partial_json
elif event.type == "content_block_stop":
if tool_json_buffer:
tool_input = json.loads(tool_json_buffer) # Safe now!
tool_json_buffer = ""
undefinedtool_json_buffer = ""
for event in stream:
if event.delta.type == "input_json_delta":
tool_json_buffer += event.delta.partial_json
elif event.type == "content_block_stop":
if tool_json_buffer:
tool_input = json.loads(tool_json_buffer) # Safe now!
tool_json_buffer = ""
undefinedComplete Tool Streaming Pattern
完整工具流式处理模式
python
def stream_with_tools(client, messages, tools):
current_block = None
tool_input_buffer = ""
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=4096,
messages=messages,
tools=tools
) as stream:
for event in stream:
if event.type == "content_block_start":
current_block = event.content_block
tool_input_buffer = ""
elif event.type == "content_block_delta":
if event.delta.type == "text_delta":
yield {"type": "text", "content": event.delta.text}
elif event.delta.type == "input_json_delta":
tool_input_buffer += event.delta.partial_json
elif event.type == "content_block_stop":
if current_block.type == "tool_use":
yield {
"type": "tool_call",
"id": current_block.id,
"name": current_block.name,
"input": json.loads(tool_input_buffer)
}python
def stream_with_tools(client, messages, tools):
current_block = None
tool_input_buffer = ""
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=4096,
messages=messages,
tools=tools
) as stream:
for event in stream:
if event.type == "content_block_start":
current_block = event.content_block
tool_input_buffer = ""
elif event.type == "content_block_delta":
if event.delta.type == "text_delta":
yield {"type": "text", "content": event.delta.text}
elif event.delta.type == "input_json_delta":
tool_input_buffer += event.delta.partial_json
elif event.type == "content_block_stop":
if current_block.type == "tool_use":
yield {
"type": "tool_call",
"id": current_block.id,
"name": current_block.name,
"input": json.loads(tool_input_buffer)
}Extended Thinking Streaming
深度思考流式处理
python
thinking_content = ""
signature = ""
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=16000,
thinking={"type": "enabled", "budget_tokens": 10000},
messages=[{"role": "user", "content": "Solve this complex problem..."}]
) as stream:
for event in stream:
if event.type == "content_block_delta":
if event.delta.type == "thinking_delta":
thinking_content += event.delta.thinking
# Optionally display thinking in UI
elif event.delta.type == "signature_delta":
signature = event.delta.signature
elif event.delta.type == "text_delta":
print(event.delta.text, end="")python
thinking_content = ""
signature = ""
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=16000,
thinking={"type": "enabled", "budget_tokens": 10000},
messages=[{"role": "user", "content": "Solve this complex problem..."}]
) as stream:
for event in stream:
if event.type == "content_block_delta":
if event.delta.type == "thinking_delta":
thinking_content += event.delta.thinking
# Optionally display thinking in UI
elif event.delta.type == "signature_delta":
signature = event.delta.signature
elif event.delta.type == "text_delta":
print(event.delta.text, end="")Error Handling
错误处理
Retriable Errors
可重试错误
python
import time
RETRIABLE_ERRORS = [529, 429, 500, 502, 503]
def stream_with_retry(client, **kwargs):
max_retries = 3
base_delay = 1
for attempt in range(max_retries):
try:
with client.messages.stream(**kwargs) as stream:
for event in stream:
yield event
return
except anthropic.APIStatusError as e:
if e.status_code in RETRIABLE_ERRORS and attempt < max_retries - 1:
delay = base_delay * (2 ** attempt)
time.sleep(delay)
else:
raisepython
import time
RETRIABLE_ERRORS = [529, 429, 500, 502, 503]
def stream_with_retry(client, **kwargs):
max_retries = 3
base_delay = 1
for attempt in range(max_retries):
try:
with client.messages.stream(**kwargs) as stream:
for event in stream:
yield event
return
except anthropic.APIStatusError as e:
if e.status_code in RETRIABLE_ERRORS and attempt < max_retries - 1:
delay = base_delay * (2 ** attempt)
time.sleep(delay)
else:
raiseSilent Overloaded Errors
静默过载错误
python
undefinedpython
undefinedCRITICAL: Check for error events even at HTTP 200!
CRITICAL: Check for error events even at HTTP 200!
for event in stream:
if event.type == "error":
if event.error.type == "overloaded_error":
# Retry with backoff
pass
undefinedfor event in stream:
if event.type == "error":
if event.error.type == "overloaded_error":
# Retry with backoff
pass
undefinedConnection Management
连接管理
Keep-Alive Configuration
长连接配置
python
import httpxpython
import httpxProper timeout configuration
Proper timeout configuration
http_client = httpx.Client(
timeout=httpx.Timeout(
connect=10.0, # Connection timeout
read=120.0, # Read timeout (long for streaming!)
write=30.0, # Write timeout
pool=30.0 # Pool timeout
)
)
client = anthropic.Anthropic(http_client=http_client)
undefinedhttp_client = httpx.Client(
timeout=httpx.Timeout(
connect=10.0, # Connection timeout
read=120.0, # Read timeout (long for streaming!)
write=30.0, # Write timeout
pool=30.0 # Pool timeout
)
)
client = anthropic.Anthropic(http_client=http_client)
undefinedConnection Pooling
连接池
python
http_client = httpx.Client(
limits=httpx.Limits(
max_keepalive_connections=20,
max_connections=100,
keepalive_expiry=30.0
)
)python
http_client = httpx.Client(
limits=httpx.Limits(
max_keepalive_connections=20,
max_connections=100,
keepalive_expiry=30.0
)
)Best Practices
最佳实践
DO:
建议:
- Set read timeout >= 60 seconds
- Accumulate tool JSON, parse after block_stop
- Handle error events even at HTTP 200
- Use exponential backoff for retries
- 设置读取超时 ≥60秒
- 先累积工具JSON,在block_stop后再解析
- 即使HTTP状态码为200,也要检查错误事件
- 重试时使用指数退避策略
DON'T:
禁止:
- Parse partial JSON during streaming
- Use short timeouts
- Ignore overloaded_error events
- Leave connections idle >5 minutes
- 在流式传输中途解析部分JSON
- 使用短超时
- 忽略overloaded_error事件
- 让连接闲置超过5分钟
UI Integration Pattern
UI集成模式
python
async def stream_to_ui(websocket, prompt):
"""Stream Claude response to WebSocket client"""
async with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=4096,
messages=[{"role": "user", "content": prompt}]
) as stream:
async for text in stream.text_stream:
await websocket.send_json({
"type": "chunk",
"content": text
})
await websocket.send_json({
"type": "complete",
"usage": stream.get_final_message().usage
})python
async def stream_to_ui(websocket, prompt):
"""Stream Claude response to WebSocket client"""
async with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=4096,
messages=[{"role": "user", "content": prompt}]
) as stream:
async for text in stream.text_stream:
await websocket.send_json({
"type": "chunk",
"content": text
})
await websocket.send_json({
"type": "complete",
"usage": stream.get_final_message().usage
})See Also
相关链接
- [[llm-integration]] - API basics
- [[tool-use]] - Tool calling
- [[extended-thinking]] - Deep reasoning
- [[llm-integration]] - API基础
- [[tool-use]] - 工具调用
- [[extended-thinking]] - 深度推理