sse-streaming

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

sse-streaming

SSE流处理

Handle SSE streaming implementation, debugging, and reconnection for pplx-sdk.
为pplx-sdk处理SSE流的实现、调试与重连操作。

When to use

使用场景

Use this skill when implementing, debugging, or extending SSE streaming functionality for the Perplexity API.
当你需要为Perplexity API实现、调试或扩展SSE流功能时,可使用本技能。

Instructions

操作指南

SSE Protocol Format

SSE协议格式

The Perplexity API uses standard SSE format:
event: query_progress
data: {"status": "searching", "progress": 0.5}

event: answer_chunk
data: {"text": "partial token", "backend_uuid": "uuid-here"}

event: final_response
data: {"text": "complete answer", "cursor": "cursor-value", "backend_uuid": "uuid-here"}

: [end]
Perplexity API采用标准SSE格式:
event: query_progress
data: {"status": "searching", "progress": 0.5}

event: answer_chunk
data: {"text": "partial token", "backend_uuid": "uuid-here"}

event: final_response
data: {"text": "complete answer", "cursor": "cursor-value", "backend_uuid": "uuid-here"}

: [end]

Parsing Rules

解析规则

  1. Read line-by-line from streaming response
  2. Skip lines starting with
    :
    (comments) — except check for
    [end]
    marker
  3. Parse
    event: <type>
    lines → set current event type
  4. Parse
    data: <json>
    lines →
    json.loads()
    into payload
  5. Empty line → emit event (type + accumulated data), reset buffers
  6. Stop on
    [end]
    marker
  1. 从流式响应中逐行读取
  2. 跳过以
    :
    开头的行(注释)——但需检查
    [end]
    标记
  3. 解析
    event: <type>
    行 → 设置当前事件类型
  4. 解析
    data: <json>
    行 → 使用
    json.loads()
    将其转换为负载
  5. 空行 → 触发事件(类型+累积数据),重置缓冲区
  6. 遇到
    [end]
    标记时停止

Event Types

事件类型

EventPurposeKey Fields
query_progress
Search progress
status
,
progress
search_results
Source citations
sources[]
answer_chunk
Partial token
text
final_response
Complete answer
text
,
cursor
,
backend_uuid
related_questions
Follow-up suggestions
questions[]
error
Server error
message
,
code
事件用途关键字段
query_progress
搜索进度
status
,
progress
search_results
来源引用
sources[]
answer_chunk
部分令牌
text
final_response
完整回答
text
,
cursor
,
backend_uuid
related_questions
后续建议问题
questions[]
error
服务器错误
message
,
code

Stream Lifecycle

流生命周期

python
for chunk in transport.stream(query="...", context_uuid="..."):
    if chunk.type == "answer_chunk":
        print(chunk.text, end="", flush=True)  # Incremental display
    elif chunk.type == "final_response":
        entry = Entry(...)  # Build complete entry
        break
python
for chunk in transport.stream(query="...", context_uuid="..."):
    if chunk.type == "answer_chunk":
        print(chunk.text, end="", flush=True)  # Incremental display
    elif chunk.type == "final_response":
        entry = Entry(...)  # Build complete entry
        break

Reconnection with Cursor

使用Cursor重连

When a stream disconnects, resume using the cursor from the last
final_response
:
python
cursor = last_chunk.data.get("cursor")
backend_uuid = last_chunk.backend_uuid

payload["cursor"] = cursor
payload["resume_entry_uuids"] = [backend_uuid]
当流断开连接时,可使用最后一个
final_response
中的cursor恢复流:
python
cursor = last_chunk.data.get("cursor")
backend_uuid = last_chunk.backend_uuid

payload["cursor"] = cursor
payload["resume_entry_uuids"] = [backend_uuid]

Retry with Exponential Backoff

指数退避重试

python
from pplx_sdk.shared.retry import RetryConfig

config = RetryConfig(
    max_retries=3,
    initial_backoff_ms=1000,   # 1s, 2s, 4s
    max_backoff_ms=30000,
    backoff_multiplier=2.0,
    jitter=True,               # ±25% randomization
)
python
from pplx_sdk.shared.retry import RetryConfig

config = RetryConfig(
    max_retries=3,
    initial_backoff_ms=1000,   # 1s, 2s, 4s
    max_backoff_ms=30000,
    backoff_multiplier=2.0,
    jitter=True,               # ±25% randomization
)

Common Pitfalls

常见误区

  • Don't parse non-JSON data lines: Wrap
    json.loads()
    in try/except
  • Handle empty streams: Raise
    StreamingError
    if no events received
  • Buffer multi-line data: Some
    data:
    fields span multiple lines; accumulate until empty line
  • Respect
    [end]
    marker
    : Always check for
    [end]
    in comment lines to stop iteration
  • 不要解析非JSON格式的数据行:将
    json.loads()
    包裹在try/except块中
  • 处理空流:如果未收到任何事件,抛出
    StreamingError
  • 处理多行数据缓冲区:部分
    data:
    字段会跨多行;需累积数据直到遇到空行
  • 遵守
    [end]
    标记
    :始终检查注释行中的
    [end]
    以停止迭代