streaming

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Streaming Skill

流式处理技能

Implement real-time streaming responses from Claude API using Server-Sent Events (SSE).
使用Server-Sent Events (SSE)实现来自Claude API的实时流式响应。

When to Use This Skill

适用场景

  • Real-time user interfaces
  • Long-running generations
  • Progressive output display
  • Tool use with streaming
  • Extended thinking visualization
  • 实时用户界面
  • 长时间生成任务
  • 渐进式输出展示
  • 带流式处理的工具调用
  • 深度思考可视化

SSE Event Flow

SSE事件流

message_start
    → content_block_start
        → content_block_delta (repeated)
    → content_block_stop
    → (more blocks...)
→ message_delta
→ message_stop
message_start
    → content_block_start
        → content_block_delta (repeated)
    → content_block_stop
    → (more blocks...)
→ message_delta
→ message_stop

Core Implementation

核心实现

Basic Text Streaming (Python)

基础文本流式处理(Python)

python
import anthropic

client = anthropic.Anthropic()

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Write a short story."}]
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)
python
import anthropic

client = anthropic.Anthropic()

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Write a short story."}]
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)

Event-Based Streaming

基于事件的流式处理

python
with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Hello"}]
) as stream:
    for event in stream:
        if event.type == "content_block_delta":
            if event.delta.type == "text_delta":
                print(event.delta.text, end="")
            elif event.delta.type == "input_json_delta":
                # Tool input (accumulate, don't parse yet!)
                tool_input_buffer += event.delta.partial_json
        elif event.type == "content_block_stop":
            # Now safe to parse tool input
            if tool_input_buffer:
                tool_input = json.loads(tool_input_buffer)
python
with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Hello"}]
) as stream:
    for event in stream:
        if event.type == "content_block_delta":
            if event.delta.type == "text_delta":
                print(event.delta.text, end="")
            elif event.delta.type == "input_json_delta":
                # Tool input (accumulate, don't parse yet!)
                tool_input_buffer += event.delta.partial_json
        elif event.type == "content_block_stop":
            # Now safe to parse tool input
            if tool_input_buffer:
                tool_input = json.loads(tool_input_buffer)

TypeScript Streaming

TypeScript流式处理

typescript
import Anthropic from '@anthropic-ai/sdk';

const client = new Anthropic();

const stream = client.messages.stream({
    model: 'claude-sonnet-4-20250514',
    max_tokens: 1024,
    messages: [{ role: 'user', content: 'Write a story.' }]
});

for await (const event of stream) {
    if (event.type === 'content_block_delta' &&
        event.delta.type === 'text_delta') {
        process.stdout.write(event.delta.text);
    }
}

const finalMessage = await stream.finalMessage();
typescript
import Anthropic from '@anthropic-ai/sdk';

const client = new Anthropic();

const stream = client.messages.stream({
    model: 'claude-sonnet-4-20250514',
    max_tokens: 1024,
    messages: [{ role: 'user', content: 'Write a story.' }]
});

for await (const event of stream) {
    if (event.type === 'content_block_delta' &&
        event.delta.type === 'text_delta') {
        process.stdout.write(event.delta.text);
    }
}

const finalMessage = await stream.finalMessage();

Event Types Reference

事件类型参考

EventWhenData
message_start
BeginningMessage metadata
content_block_start
Block beginsBlock type, index
content_block_delta
Content chunkDelta content
content_block_stop
Block ends-
message_delta
Message updateStop reason, usage
message_stop
Complete-
事件触发时机数据
message_start
开始时消息元数据
content_block_start
块开始时块类型、索引
content_block_delta
内容块传输时增量内容
content_block_stop
块结束时-
message_delta
消息更新时停止原因、用量
message_stop
完成时-

Delta Types

增量类型

Delta TypeContentWhen
text_delta
.text
Text content
input_json_delta
.partial_json
Tool input
thinking_delta
.thinking
Extended thinking
signature_delta
.signature
Thinking signature
增量类型内容触发时机
text_delta
.text
文本内容
input_json_delta
.partial_json
工具输入
thinking_delta
.thinking
深度思考内容
signature_delta
.signature
思考签名

Tool Use Streaming

工具调用流式处理

Critical Rule: Never Parse JSON Mid-Stream!

重要规则:绝不要在流传输中途解析JSON!

python
undefined
python
undefined

WRONG - Will fail on partial JSON!

WRONG - Will fail on partial JSON!

for event in stream: if event.delta.type == "input_json_delta": tool_input = json.loads(event.delta.partial_json) # FAILS!
for event in stream: if event.delta.type == "input_json_delta": tool_input = json.loads(event.delta.partial_json) # FAILS!

CORRECT - Accumulate then parse

CORRECT - Accumulate then parse

tool_json_buffer = "" for event in stream: if event.delta.type == "input_json_delta": tool_json_buffer += event.delta.partial_json elif event.type == "content_block_stop": if tool_json_buffer: tool_input = json.loads(tool_json_buffer) # Safe now! tool_json_buffer = ""
undefined
tool_json_buffer = "" for event in stream: if event.delta.type == "input_json_delta": tool_json_buffer += event.delta.partial_json elif event.type == "content_block_stop": if tool_json_buffer: tool_input = json.loads(tool_json_buffer) # Safe now! tool_json_buffer = ""
undefined

Complete Tool Streaming Pattern

完整工具流式处理模式

python
def stream_with_tools(client, messages, tools):
    current_block = None
    tool_input_buffer = ""

    with client.messages.stream(
        model="claude-sonnet-4-20250514",
        max_tokens=4096,
        messages=messages,
        tools=tools
    ) as stream:
        for event in stream:
            if event.type == "content_block_start":
                current_block = event.content_block
                tool_input_buffer = ""

            elif event.type == "content_block_delta":
                if event.delta.type == "text_delta":
                    yield {"type": "text", "content": event.delta.text}
                elif event.delta.type == "input_json_delta":
                    tool_input_buffer += event.delta.partial_json

            elif event.type == "content_block_stop":
                if current_block.type == "tool_use":
                    yield {
                        "type": "tool_call",
                        "id": current_block.id,
                        "name": current_block.name,
                        "input": json.loads(tool_input_buffer)
                    }
python
def stream_with_tools(client, messages, tools):
    current_block = None
    tool_input_buffer = ""

    with client.messages.stream(
        model="claude-sonnet-4-20250514",
        max_tokens=4096,
        messages=messages,
        tools=tools
    ) as stream:
        for event in stream:
            if event.type == "content_block_start":
                current_block = event.content_block
                tool_input_buffer = ""

            elif event.type == "content_block_delta":
                if event.delta.type == "text_delta":
                    yield {"type": "text", "content": event.delta.text}
                elif event.delta.type == "input_json_delta":
                    tool_input_buffer += event.delta.partial_json

            elif event.type == "content_block_stop":
                if current_block.type == "tool_use":
                    yield {
                        "type": "tool_call",
                        "id": current_block.id,
                        "name": current_block.name,
                        "input": json.loads(tool_input_buffer)
                    }

Extended Thinking Streaming

深度思考流式处理

python
thinking_content = ""
signature = ""

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=16000,
    thinking={"type": "enabled", "budget_tokens": 10000},
    messages=[{"role": "user", "content": "Solve this complex problem..."}]
) as stream:
    for event in stream:
        if event.type == "content_block_delta":
            if event.delta.type == "thinking_delta":
                thinking_content += event.delta.thinking
                # Optionally display thinking in UI
            elif event.delta.type == "signature_delta":
                signature = event.delta.signature
            elif event.delta.type == "text_delta":
                print(event.delta.text, end="")
python
thinking_content = ""
signature = ""

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=16000,
    thinking={"type": "enabled", "budget_tokens": 10000},
    messages=[{"role": "user", "content": "Solve this complex problem..."}]
) as stream:
    for event in stream:
        if event.type == "content_block_delta":
            if event.delta.type == "thinking_delta":
                thinking_content += event.delta.thinking
                # Optionally display thinking in UI
            elif event.delta.type == "signature_delta":
                signature = event.delta.signature
            elif event.delta.type == "text_delta":
                print(event.delta.text, end="")

Error Handling

错误处理

Retriable Errors

可重试错误

python
import time

RETRIABLE_ERRORS = [529, 429, 500, 502, 503]

def stream_with_retry(client, **kwargs):
    max_retries = 3
    base_delay = 1

    for attempt in range(max_retries):
        try:
            with client.messages.stream(**kwargs) as stream:
                for event in stream:
                    yield event
            return
        except anthropic.APIStatusError as e:
            if e.status_code in RETRIABLE_ERRORS and attempt < max_retries - 1:
                delay = base_delay * (2 ** attempt)
                time.sleep(delay)
            else:
                raise
python
import time

RETRIABLE_ERRORS = [529, 429, 500, 502, 503]

def stream_with_retry(client, **kwargs):
    max_retries = 3
    base_delay = 1

    for attempt in range(max_retries):
        try:
            with client.messages.stream(**kwargs) as stream:
                for event in stream:
                    yield event
            return
        except anthropic.APIStatusError as e:
            if e.status_code in RETRIABLE_ERRORS and attempt < max_retries - 1:
                delay = base_delay * (2 ** attempt)
                time.sleep(delay)
            else:
                raise

Silent Overloaded Errors

静默过载错误

python
undefined
python
undefined

CRITICAL: Check for error events even at HTTP 200!

CRITICAL: Check for error events even at HTTP 200!

for event in stream: if event.type == "error": if event.error.type == "overloaded_error": # Retry with backoff pass
undefined
for event in stream: if event.type == "error": if event.error.type == "overloaded_error": # Retry with backoff pass
undefined

Connection Management

连接管理

Keep-Alive Configuration

长连接配置

python
import httpx
python
import httpx

Proper timeout configuration

Proper timeout configuration

http_client = httpx.Client( timeout=httpx.Timeout( connect=10.0, # Connection timeout read=120.0, # Read timeout (long for streaming!) write=30.0, # Write timeout pool=30.0 # Pool timeout ) )
client = anthropic.Anthropic(http_client=http_client)
undefined
http_client = httpx.Client( timeout=httpx.Timeout( connect=10.0, # Connection timeout read=120.0, # Read timeout (long for streaming!) write=30.0, # Write timeout pool=30.0 # Pool timeout ) )
client = anthropic.Anthropic(http_client=http_client)
undefined

Connection Pooling

连接池

python
http_client = httpx.Client(
    limits=httpx.Limits(
        max_keepalive_connections=20,
        max_connections=100,
        keepalive_expiry=30.0
    )
)
python
http_client = httpx.Client(
    limits=httpx.Limits(
        max_keepalive_connections=20,
        max_connections=100,
        keepalive_expiry=30.0
    )
)

Best Practices

最佳实践

DO:

建议:

  • Set read timeout >= 60 seconds
  • Accumulate tool JSON, parse after block_stop
  • Handle error events even at HTTP 200
  • Use exponential backoff for retries
  • 设置读取超时 ≥60秒
  • 先累积工具JSON,在block_stop后再解析
  • 即使HTTP状态码为200,也要检查错误事件
  • 重试时使用指数退避策略

DON'T:

禁止:

  • Parse partial JSON during streaming
  • Use short timeouts
  • Ignore overloaded_error events
  • Leave connections idle >5 minutes
  • 在流式传输中途解析部分JSON
  • 使用短超时
  • 忽略overloaded_error事件
  • 让连接闲置超过5分钟

UI Integration Pattern

UI集成模式

python
async def stream_to_ui(websocket, prompt):
    """Stream Claude response to WebSocket client"""
    async with client.messages.stream(
        model="claude-sonnet-4-20250514",
        max_tokens=4096,
        messages=[{"role": "user", "content": prompt}]
    ) as stream:
        async for text in stream.text_stream:
            await websocket.send_json({
                "type": "chunk",
                "content": text
            })

        await websocket.send_json({
            "type": "complete",
            "usage": stream.get_final_message().usage
        })
python
async def stream_to_ui(websocket, prompt):
    """Stream Claude response to WebSocket client"""
    async with client.messages.stream(
        model="claude-sonnet-4-20250514",
        max_tokens=4096,
        messages=[{"role": "user", "content": prompt}]
    ) as stream:
        async for text in stream.text_stream:
            await websocket.send_json({
                "type": "chunk",
                "content": text
            })

        await websocket.send_json({
            "type": "complete",
            "usage": stream.get_final_message().usage
        })

See Also

相关链接

  • [[llm-integration]] - API basics
  • [[tool-use]] - Tool calling
  • [[extended-thinking]] - Deep reasoning
  • [[llm-integration]] - API基础
  • [[tool-use]] - 工具调用
  • [[extended-thinking]] - 深度推理