sse-streaming
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
Chinesesse-streaming
SSE流处理
Handle SSE streaming implementation, debugging, and reconnection for pplx-sdk.
为pplx-sdk处理SSE流的实现、调试与重连操作。
When to use
使用场景
Use this skill when implementing, debugging, or extending SSE streaming functionality for the Perplexity API.
当你需要为Perplexity API实现、调试或扩展SSE流功能时,可使用本技能。
Instructions
操作指南
SSE Protocol Format
SSE协议格式
The Perplexity API uses standard SSE format:
event: query_progress
data: {"status": "searching", "progress": 0.5}
event: answer_chunk
data: {"text": "partial token", "backend_uuid": "uuid-here"}
event: final_response
data: {"text": "complete answer", "cursor": "cursor-value", "backend_uuid": "uuid-here"}
: [end]Perplexity API采用标准SSE格式:
event: query_progress
data: {"status": "searching", "progress": 0.5}
event: answer_chunk
data: {"text": "partial token", "backend_uuid": "uuid-here"}
event: final_response
data: {"text": "complete answer", "cursor": "cursor-value", "backend_uuid": "uuid-here"}
: [end]Parsing Rules
解析规则
- Read line-by-line from streaming response
- Skip lines starting with (comments) — except check for
:marker[end] - Parse lines → set current event type
event: <type> - Parse lines →
data: <json>into payloadjson.loads() - Empty line → emit event (type + accumulated data), reset buffers
- Stop on marker
[end]
- 从流式响应中逐行读取
- 跳过以开头的行(注释)——但需检查
:标记[end] - 解析行 → 设置当前事件类型
event: <type> - 解析行 → 使用
data: <json>将其转换为负载json.loads() - 空行 → 触发事件(类型+累积数据),重置缓冲区
- 遇到标记时停止
[end]
Event Types
事件类型
| Event | Purpose | Key Fields |
|---|---|---|
| Search progress | |
| Source citations | |
| Partial token | |
| Complete answer | |
| Follow-up suggestions | |
| Server error | |
| 事件 | 用途 | 关键字段 |
|---|---|---|
| 搜索进度 | |
| 来源引用 | |
| 部分令牌 | |
| 完整回答 | |
| 后续建议问题 | |
| 服务器错误 | |
Stream Lifecycle
流生命周期
python
for chunk in transport.stream(query="...", context_uuid="..."):
if chunk.type == "answer_chunk":
print(chunk.text, end="", flush=True) # Incremental display
elif chunk.type == "final_response":
entry = Entry(...) # Build complete entry
breakpython
for chunk in transport.stream(query="...", context_uuid="..."):
if chunk.type == "answer_chunk":
print(chunk.text, end="", flush=True) # Incremental display
elif chunk.type == "final_response":
entry = Entry(...) # Build complete entry
breakReconnection with Cursor
使用Cursor重连
When a stream disconnects, resume using the cursor from the last :
final_responsepython
cursor = last_chunk.data.get("cursor")
backend_uuid = last_chunk.backend_uuid
payload["cursor"] = cursor
payload["resume_entry_uuids"] = [backend_uuid]当流断开连接时,可使用最后一个中的cursor恢复流:
final_responsepython
cursor = last_chunk.data.get("cursor")
backend_uuid = last_chunk.backend_uuid
payload["cursor"] = cursor
payload["resume_entry_uuids"] = [backend_uuid]Retry with Exponential Backoff
指数退避重试
python
from pplx_sdk.shared.retry import RetryConfig
config = RetryConfig(
max_retries=3,
initial_backoff_ms=1000, # 1s, 2s, 4s
max_backoff_ms=30000,
backoff_multiplier=2.0,
jitter=True, # ±25% randomization
)python
from pplx_sdk.shared.retry import RetryConfig
config = RetryConfig(
max_retries=3,
initial_backoff_ms=1000, # 1s, 2s, 4s
max_backoff_ms=30000,
backoff_multiplier=2.0,
jitter=True, # ±25% randomization
)Common Pitfalls
常见误区
- Don't parse non-JSON data lines: Wrap in try/except
json.loads() - Handle empty streams: Raise if no events received
StreamingError - Buffer multi-line data: Some fields span multiple lines; accumulate until empty line
data: - Respect marker: Always check for
[end]in comment lines to stop iteration[end]
- 不要解析非JSON格式的数据行:将包裹在try/except块中
json.loads() - 处理空流:如果未收到任何事件,抛出
StreamingError - 处理多行数据缓冲区:部分字段会跨多行;需累积数据直到遇到空行
data: - 遵守标记:始终检查注释行中的
[end]以停止迭代[end]