Loading...
Loading...
Server-Sent Events (SSE) streaming for Claude API with support for text, tool use, and extended thinking. Activate for real-time responses, stream handling, and progressive output.
npx skill4agent add lobbi-docs/claude streamingmessage_start
→ content_block_start
→ content_block_delta (repeated)
→ content_block_stop
→ (more blocks...)
→ message_delta
→ message_stopimport anthropic
client = anthropic.Anthropic()
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": "Write a short story."}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": "Hello"}]
) as stream:
for event in stream:
if event.type == "content_block_delta":
if event.delta.type == "text_delta":
print(event.delta.text, end="")
elif event.delta.type == "input_json_delta":
# Tool input (accumulate, don't parse yet!)
tool_input_buffer += event.delta.partial_json
elif event.type == "content_block_stop":
# Now safe to parse tool input
if tool_input_buffer:
tool_input = json.loads(tool_input_buffer)import Anthropic from '@anthropic-ai/sdk';
const client = new Anthropic();
const stream = client.messages.stream({
model: 'claude-sonnet-4-20250514',
max_tokens: 1024,
messages: [{ role: 'user', content: 'Write a story.' }]
});
for await (const event of stream) {
if (event.type === 'content_block_delta' &&
event.delta.type === 'text_delta') {
process.stdout.write(event.delta.text);
}
}
const finalMessage = await stream.finalMessage();| Event | When | Data |
|---|---|---|
| Beginning | Message metadata |
| Block begins | Block type, index |
| Content chunk | Delta content |
| Block ends | - |
| Message update | Stop reason, usage |
| Complete | - |
| Delta Type | Content | When |
|---|---|---|
| | Text content |
| | Tool input |
| | Extended thinking |
| | Thinking signature |
# WRONG - Will fail on partial JSON!
for event in stream:
if event.delta.type == "input_json_delta":
tool_input = json.loads(event.delta.partial_json) # FAILS!
# CORRECT - Accumulate then parse
tool_json_buffer = ""
for event in stream:
if event.delta.type == "input_json_delta":
tool_json_buffer += event.delta.partial_json
elif event.type == "content_block_stop":
if tool_json_buffer:
tool_input = json.loads(tool_json_buffer) # Safe now!
tool_json_buffer = ""def stream_with_tools(client, messages, tools):
current_block = None
tool_input_buffer = ""
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=4096,
messages=messages,
tools=tools
) as stream:
for event in stream:
if event.type == "content_block_start":
current_block = event.content_block
tool_input_buffer = ""
elif event.type == "content_block_delta":
if event.delta.type == "text_delta":
yield {"type": "text", "content": event.delta.text}
elif event.delta.type == "input_json_delta":
tool_input_buffer += event.delta.partial_json
elif event.type == "content_block_stop":
if current_block.type == "tool_use":
yield {
"type": "tool_call",
"id": current_block.id,
"name": current_block.name,
"input": json.loads(tool_input_buffer)
}thinking_content = ""
signature = ""
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=16000,
thinking={"type": "enabled", "budget_tokens": 10000},
messages=[{"role": "user", "content": "Solve this complex problem..."}]
) as stream:
for event in stream:
if event.type == "content_block_delta":
if event.delta.type == "thinking_delta":
thinking_content += event.delta.thinking
# Optionally display thinking in UI
elif event.delta.type == "signature_delta":
signature = event.delta.signature
elif event.delta.type == "text_delta":
print(event.delta.text, end="")import time
RETRIABLE_ERRORS = [529, 429, 500, 502, 503]
def stream_with_retry(client, **kwargs):
max_retries = 3
base_delay = 1
for attempt in range(max_retries):
try:
with client.messages.stream(**kwargs) as stream:
for event in stream:
yield event
return
except anthropic.APIStatusError as e:
if e.status_code in RETRIABLE_ERRORS and attempt < max_retries - 1:
delay = base_delay * (2 ** attempt)
time.sleep(delay)
else:
raise# CRITICAL: Check for error events even at HTTP 200!
for event in stream:
if event.type == "error":
if event.error.type == "overloaded_error":
# Retry with backoff
passimport httpx
# Proper timeout configuration
http_client = httpx.Client(
timeout=httpx.Timeout(
connect=10.0, # Connection timeout
read=120.0, # Read timeout (long for streaming!)
write=30.0, # Write timeout
pool=30.0 # Pool timeout
)
)
client = anthropic.Anthropic(http_client=http_client)http_client = httpx.Client(
limits=httpx.Limits(
max_keepalive_connections=20,
max_connections=100,
keepalive_expiry=30.0
)
)async def stream_to_ui(websocket, prompt):
"""Stream Claude response to WebSocket client"""
async with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=4096,
messages=[{"role": "user", "content": prompt}]
) as stream:
async for text in stream.text_stream:
await websocket.send_json({
"type": "chunk",
"content": text
})
await websocket.send_json({
"type": "complete",
"usage": stream.get_final_message().usage
})