session-compression
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseAI Session Compression Techniques
AI会话压缩技术
Summary
概述
Compress long AI conversations to fit context windows while preserving critical information.
Session compression enables production AI applications to manage multi-turn conversations efficiently by reducing token usage by 70-95% through summarization, embedding-based retrieval, and intelligent context management. Achieve 3-20x compression ratios with minimal performance degradation.
Key Benefits:
- Cost Reduction: 80-90% token cost savings through hierarchical memory
- Performance: 2x faster responses with compressed context
- Scalability: Handle conversations exceeding 1M tokens
- Quality: Preserve critical information with <2% accuracy loss
压缩长AI对话以适配上下文窗口,同时保留关键信息。
会话压缩通过摘要、基于嵌入的检索和智能上下文管理,将令牌使用量减少70-95%,使生产级AI应用能够高效管理多轮对话。在性能损失极小的情况下,可实现3-20倍的压缩比。
核心优势:
- 成本降低: 通过分层内存实现80-90%的令牌成本节省
- 性能提升: 压缩上下文使响应速度提升2倍
- 可扩展性: 处理超过100万令牌的对话
- 质量保障: 保留关键信息,准确率损失<2%
When to Use
适用场景
Use session compression when:
- Multi-turn conversations approach context window limits (>50% capacity)
- Long-running chat sessions (customer support, tutoring, code assistants)
- Token costs become significant (high-volume applications)
- Response latency increases due to large context
- Managing conversation history across multiple sessions
Don't use when:
- Short conversations (<10 turns) fitting easily in context
- Every detail must be preserved verbatim (legal, compliance)
- Single-turn or stateless interactions
- Context window usage is <30%
Ideal scenarios:
- Chatbots with 50+ turn conversations
- AI code assistants tracking long development sessions
- Customer support with multi-session ticket history
- Educational tutors with student progress tracking
- Multi-day collaborative AI workflows
建议使用会话压缩的场景:
- 多轮对话接近上下文窗口限制(>50%容量)
- 长期运行的聊天会话(客户支持、辅导、代码助手)
- 令牌成本显著增加(高流量应用)
- 因上下文过大导致响应延迟增加
- 跨多个会话管理对话历史
不建议使用的场景:
- 短对话(<10轮),可轻松放入上下文窗口
- 必须逐字保留每个细节的场景(法律、合规)
- 单轮或无状态交互
- 上下文窗口使用率<30%
理想场景:
- 50轮以上对话的聊天机器人
- 跟踪长期开发会话的AI代码助手
- 包含多会话工单历史的客户支持
- 跟踪学生进度的教育辅导AI
- 多日协作AI工作流
Quick Start
快速开始
Basic Setup with LangChain
使用LangChain的基础配置
python
from langchain.memory import ConversationSummaryBufferMemory
from langchain_anthropic import ChatAnthropic
from anthropic import Anthropicpython
from langchain.memory import ConversationSummaryBufferMemory
from langchain_anthropic import ChatAnthropic
from anthropic import AnthropicInitialize Claude client
初始化Claude客户端
llm = ChatAnthropic(
model="claude-3-5-sonnet-20241022",
api_key="your-api-key"
)
llm = ChatAnthropic(
model="claude-3-5-sonnet-20241022",
api_key="your-api-key"
)
Setup memory with automatic summarization
设置自动摘要内存
memory = ConversationSummaryBufferMemory(
llm=llm,
max_token_limit=2000, # Summarize when exceeding this
return_messages=True
)
memory = ConversationSummaryBufferMemory(
llm=llm,
max_token_limit=2000, # 超过此限制时自动摘要
return_messages=True
)
Add conversation turns
添加对话轮次
memory.save_context(
{"input": "What's session compression?"},
{"output": "Session compression reduces conversation token usage..."}
)
memory.save_context(
{"input": "什么是会话压缩?"},
{"output": "会话压缩可减少对话令牌使用量..."}
)
Retrieve compressed context
检索压缩后的上下文
context = memory.load_memory_variables({})
undefinedcontext = memory.load_memory_variables({})
undefinedProgressive Compression Pattern
渐进式压缩模式
python
from anthropic import Anthropic
client = Anthropic(api_key="your-api-key")
class ProgressiveCompressor:
def __init__(self, thresholds=[0.70, 0.85, 0.95]):
self.thresholds = thresholds
self.messages = []
self.max_tokens = 200000 # Claude context window
def add_message(self, role: str, content: str):
self.messages.append({"role": role, "content": content})
# Check if compression needed
current_usage = self._estimate_tokens()
usage_ratio = current_usage / self.max_tokens
if usage_ratio >= self.thresholds[0]:
self._compress(level=self._get_compression_level(usage_ratio))
def _estimate_tokens(self):
return sum(len(m["content"]) // 4 for m in self.messages)
def _get_compression_level(self, ratio):
for i, threshold in enumerate(self.thresholds):
if ratio < threshold:
return i
return len(self.thresholds)
def _compress(self, level: int):
"""Apply compression based on severity level."""
if level == 1: # 70% threshold: Light compression
self._remove_redundant_messages()
elif level == 2: # 85% threshold: Medium compression
self._summarize_old_messages(keep_recent=10)
else: # 95% threshold: Aggressive compression
self._summarize_old_messages(keep_recent=5)
def _remove_redundant_messages(self):
"""Remove duplicate or low-value messages."""
# Implementation: Use semantic deduplication
pass
def _summarize_old_messages(self, keep_recent: int):
"""Summarize older messages, keep recent ones verbatim."""
if len(self.messages) <= keep_recent:
return
# Messages to summarize
to_summarize = self.messages[:-keep_recent]
recent = self.messages[-keep_recent:]
# Generate summary
conversation_text = "\n\n".join([
f"{m['role'].upper()}: {m['content']}"
for m in to_summarize
])
response = client.messages.create(
model="claude-3-5-haiku-20241022",
max_tokens=500,
messages=[{
"role": "user",
"content": f"Summarize this conversation:\n\n{conversation_text}"
}]
)
# Replace old messages with summary
summary = {
"role": "system",
"content": f"[Summary]\n{response.content[0].text}"
}
self.messages = [summary] + recentpython
from anthropic import Anthropic
client = Anthropic(api_key="your-api-key")
class ProgressiveCompressor:
def __init__(self, thresholds=[0.70, 0.85, 0.95]):
self.thresholds = thresholds
self.messages = []
self.max_tokens = 200000 # Claude上下文窗口
def add_message(self, role: str, content: str):
self.messages.append({"role": role, "content": content})
# 检查是否需要压缩
current_usage = self._estimate_tokens()
usage_ratio = current_usage / self.max_tokens
if usage_ratio >= self.thresholds[0]:
self._compress(level=self._get_compression_level(usage_ratio))
def _estimate_tokens(self):
return sum(len(m["content"]) // 4 for m in self.messages)
def _get_compression_level(self, ratio):
for i, threshold in enumerate(self.thresholds):
if ratio < threshold:
return i
return len(self.thresholds)
def _compress(self, level: int):
"""根据严重级别应用压缩。"""
if level == 1: # 70%阈值:轻度压缩
self._remove_redundant_messages()
elif level == 2: # 85%阈值:中度压缩
self._summarize_old_messages(keep_recent=10)
else: # 95%阈值:深度压缩
self._summarize_old_messages(keep_recent=5)
def _remove_redundant_messages(self):
"""移除重复或低价值消息。"""
# 实现:使用语义去重
pass
def _summarize_old_messages(self, keep_recent: int):
"""摘要旧消息,保留最近的消息原文。"""
if len(self.messages) <= keep_recent:
return
# 待摘要的消息
to_summarize = self.messages[:-keep_recent]
recent = self.messages[-keep_recent:]
# 生成摘要
conversation_text = "\n\n".join([
f"{m['role'].upper()}: {m['content']}"
for m in to_summarize
])
response = client.messages.create(
model="claude-3-5-haiku-20241022",
max_tokens=500,
messages=[{
"role": "user",
"content": f"摘要这段对话:\n\n{conversation_text}"
}]
)
# 用摘要替换旧消息
summary = {
"role": "system",
"content": f"[摘要]\n{response.content[0].text}"
}
self.messages = [summary] + recentUsage
使用示例
compressor = ProgressiveCompressor()
for i in range(100):
compressor.add_message("user", f"Message {i}")
compressor.add_message("assistant", f"Response {i}")
undefinedcompressor = ProgressiveCompressor()
for i in range(100):
compressor.add_message("user", f"Message {i}")
compressor.add_message("assistant", f"Response {i}")
undefinedUsing Anthropic Prompt Caching (90% Cost Reduction)
使用Anthropic Prompt Caching(降低90%成本)
python
from anthropic import Anthropic
client = Anthropic(api_key="your-api-key")python
from anthropic import Anthropic
client = Anthropic(api_key="your-api-key")Build context with cache control
构建带缓存控制的上下文
messages = [
{
"role": "user",
"content": [
{
"type": "text",
"text": "Long conversation context here...",
"cache_control": {"type": "ephemeral"} # Cache this
}
]
},
{
"role": "assistant",
"content": "Previous response..."
},
{
"role": "user",
"content": "New question" # Not cached, changes frequently
}
]
response = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=messages
)
messages = [
{
"role": "user",
"content": [
{
"type": "text",
"text": "长对话上下文内容...",
"cache_control": {"type": "ephemeral"} # 缓存此内容
}
]
},
{
"role": "assistant",
"content": "之前的响应..."
},
{
"role": "user",
"content": "新问题" # 不缓存,频繁变化
}
]
response = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=messages
)
Cache hit reduces costs by 90% for cached content
缓存命中可使缓存内容的成本降低90%
---
---Core Concepts
核心概念
Context Windows and Token Limits
上下文窗口与令牌限制
Context window: Maximum tokens an LLM can process in a single request (input + output).
Current limits (2025):
- Claude 3.5 Sonnet: 200K tokens (~150K words, ~600 pages)
- GPT-4 Turbo: 128K tokens (~96K words, ~384 pages)
- Gemini 1.5 Pro: 2M tokens (~1.5M words, ~6000 pages)
Token estimation:
- English: ~4 characters per token
- Code: ~3 characters per token
- Rule of thumb: 1 token ≈ 0.75 words
Why compression matters:
- Cost: Claude Sonnet costs $3/$15 per 1M input/output tokens
- Latency: Larger contexts increase processing time
- Quality: Excessive context can dilute attention on relevant information
上下文窗口: LLM在单个请求中可处理的最大令牌数(输入+输出)。
当前限制(2025年):
- Claude 3.5 Sonnet:20万令牌(约15万字,600页)
- GPT-4 Turbo:12.8万令牌(约9.6万字,384页)
- Gemini 1.5 Pro:200万令牌(约150万字,6000页)
令牌估算:
- 英文:约4个字符/令牌
- 代码:约3个字符/令牌
- 经验法则:1令牌≈0.75个单词
压缩的重要性:
- 成本: Claude Sonnet的输入/输出成本为每100万令牌3/15美元
- 延迟: 更大的上下文会增加处理时间
- 质量: 过多的上下文会分散对相关信息的注意力
Compression Ratios
压缩比
Compression ratio = Original tokens / Compressed tokens
Industry benchmarks:
- Extractive summarization: 2-3x
- Abstractive summarization: 5-10x
- Hierarchical summarization: 20x+
- LLMLingua (prompt compression): 20x with 1.5% accuracy loss
- KVzip (KV cache compression): 3-4x with 2x speed improvement
Target ratios by use case:
- Customer support: 5-7x (preserve details)
- General chat: 8-12x (balance quality/efficiency)
- Code assistants: 3-5x (preserve technical accuracy)
- Long documents: 15-20x (extract key insights)
压缩比 = 原始令牌数 / 压缩后令牌数
行业基准:
- 抽取式摘要:2-3倍
- 生成式摘要:5-10倍
- 分层摘要:20倍以上
- LLMLingua(提示压缩):20倍压缩,准确率损失1.5%
- KVzip(KV缓存压缩):3-4倍压缩,速度提升2倍
按场景划分的目标压缩比:
- 客户支持:5-7倍(保留细节)
- 通用聊天:8-12倍(平衡质量与效率)
- 代码助手:3-5倍(保留技术准确性)
- 长文档:15-20倍(提取关键见解)
Progressive Compression Thresholds
渐进式压缩阈值
Industry standard pattern:
Context Usage Action Technique
─────────────────────────────────────────────────────────
0-70% No compression Store verbatim
70-85% Light compression Remove redundancy
85-95% Medium compression Summarize old messages
95-100% Aggressive compression Hierarchical + RAGImplementation guidelines:
- 70% threshold: Remove duplicate/redundant messages, semantic deduplication
- 85% threshold: Summarize messages older than 20 turns, keep recent 10-15
- 95% threshold: Multi-level hierarchical summarization + vector store archival
- Emergency (100%): Drop least important messages, aggressive summarization
行业标准模式:
上下文使用率 操作 技术
─────────────────────────────────────────────────────────
0-70% 不压缩 原文存储
70-85% 轻度压缩 移除冗余内容
85-95% 中度压缩 摘要旧消息
95-100% 深度压缩 分层+RAG实施指南:
- 70%阈值: 移除重复/冗余消息,语义去重
- 85%阈值: 摘要20轮以上的旧消息,保留最近10-15轮
- 95%阈值: 多级分层摘要+向量存储归档
- 紧急情况(100%): 丢弃最不重要的消息,深度摘要
Compression Techniques
压缩技术
1. Summarization Techniques
1. 摘要技术
1.1 Extractive Summarization
1.1 抽取式摘要
Selects key sentences/phrases without modification.
Pros: No hallucination, fast, deterministic
Cons: Limited compression (2-3x), may feel disjointed
Best for: Legal/compliance, short-term compression
python
from sklearn.feature_extraction.text import TfidfVectorizer
import numpy as np
def extractive_compress(messages: list, compression_ratio: float = 0.3):
"""Extract most important messages using TF-IDF scoring."""
texts = [msg['content'] for msg in messages]
# Calculate TF-IDF scores
vectorizer = TfidfVectorizer()
tfidf_matrix = vectorizer.fit_transform(texts)
scores = np.array(tfidf_matrix.sum(axis=1)).flatten()
# Select top messages
n_keep = max(1, int(len(messages) * compression_ratio))
top_indices = sorted(np.argsort(scores)[-n_keep:])
return [messages[i] for i in top_indices]选择关键句子/短语,不做修改。
优点: 无幻觉,速度快,确定性强
缺点: 压缩有限(2-3倍),可能显得不连贯
最佳场景: 法律/合规领域,短期压缩
python
from sklearn.feature_extraction.text import TfidfVectorizer
import numpy as np
def extractive_compress(messages: list, compression_ratio: float = 0.3):
"""使用TF-IDF评分提取最重要的消息。"""
texts = [msg['content'] for msg in messages]
# 计算TF-IDF分数
vectorizer = TfidfVectorizer()
tfidf_matrix = vectorizer.fit_transform(texts)
scores = np.array(tfidf_matrix.sum(axis=1)).flatten()
# 选择顶级消息
n_keep = max(1, int(len(messages) * compression_ratio))
top_indices = sorted(np.argsort(scores)[-n_keep:])
return [messages[i] for i in top_indices]1.2 Abstractive Summarization
1.2 生成式摘要
Uses LLMs to semantically condense conversation history.
Pros: Higher compression (5-10x), coherent, synthesizes information
Cons: Risk of hallucination, higher cost, less deterministic
Best for: General chat, customer support, multi-session continuity
python
from anthropic import Anthropic
def abstractive_compress(messages: list, client: Anthropic):
"""Generate semantic summary using Claude."""
conversation_text = "\n\n".join([
f"{msg['role'].upper()}: {msg['content']}"
for msg in messages
])
response = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=500,
messages=[{
"role": "user",
"content": f"""Summarize this conversation, preserving:
1. Key decisions made
2. Important context and facts
3. Unresolved questions
4. Action items
Conversation:
{conversation_text}
Summary (aim for 1/5 the original length):"""
}]
)
return {
"role": "assistant",
"content": f"[Summary]\n{response.content[0].text}"
}使用LLM对对话历史进行语义浓缩。
优点: 压缩率更高(5-10倍),内容连贯,可整合信息
缺点: 存在幻觉风险,成本更高,确定性较弱
最佳场景: 通用聊天,客户支持,多会话连续性
python
from anthropic import Anthropic
def abstractive_compress(messages: list, client: Anthropic):
"""使用Claude生成语义摘要。"""
conversation_text = "\n\n".join([
f"{msg['role'].upper()}: {msg['content']}"
for msg in messages
])
response = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=500,
messages=[{
"role": "user",
"content": f"""摘要这段对话,保留以下内容:
1. 做出的关键决策
2. 重要上下文和事实
3. 未解决的问题
4. 行动项
对话内容:
{conversation_text}
摘要(目标长度为原文的1/5):"""
}]
)
return {
"role": "assistant",
"content": f"[摘要]\n{response.content[0].text}"
}1.3 Hierarchical Summarization (Multi-Level)
1.3 分层摘要(多级)
Creates summaries of summaries in a tree structure.
Pros: Extreme compression (20x+), handles 1M+ token conversations
Cons: Complex implementation, multiple LLM calls, information loss accumulates
Best for: Long-running conversations, multi-session applications
Architecture:
Level 0 (Raw): [Msg1][Msg2][Msg3][Msg4][Msg5][Msg6][Msg7][Msg8]
Level 1 (Chunk): [Summary1-2] [Summary3-4] [Summary5-6] [Summary7-8]
Level 2 (Group): [Summary1-4] [Summary5-8]
Level 3 (Session): [Overall Session Summary]python
from anthropic import Anthropic
from typing import List, Dict
class HierarchicalMemory:
def __init__(self, client: Anthropic, chunk_size: int = 10):
self.client = client
self.chunk_size = chunk_size
self.levels: List[List[Dict]] = [[]] # Level 0 = raw messages
def add_message(self, message: Dict):
"""Add message and trigger summarization if needed."""
self.levels[0].append(message)
if len(self.levels[0]) >= self.chunk_size * 2:
self._summarize_level(0)
def _summarize_level(self, level: int):
"""Summarize a level into the next higher level."""
messages = self.levels[level]
# Ensure next level exists
while len(self.levels) <= level + 1:
self.levels.append([])
# Summarize first chunk
chunk = messages[:self.chunk_size]
summary = self._generate_summary(chunk, level)
# Move to next level
self.levels[level + 1].append(summary)
self.levels[level] = messages[self.chunk_size:]
# Recursively check if next level needs summarization
if len(self.levels[level + 1]) >= self.chunk_size * 2:
self._summarize_level(level + 1)
def _generate_summary(self, messages: List[Dict], level: int) -> Dict:
"""Generate summary for a chunk."""
conversation_text = "\n\n".join([
f"{msg['role'].upper()}: {msg['content']}"
for msg in messages
])
response = self.client.messages.create(
model="claude-3-5-haiku-20241022",
max_tokens=300,
messages=[{
"role": "user",
"content": f"Summarize this Level {level} conversation chunk:\n\n{conversation_text}"
}]
)
return {
"role": "system",
"content": f"[L{level+1} Summary] {response.content[0].text}",
"level": level + 1
}
def get_context(self, max_tokens: int = 4000) -> List[Dict]:
"""Retrieve context within token budget."""
context = []
token_count = 0
# Prioritize recent raw messages
for msg in reversed(self.levels[0]):
msg_tokens = len(msg['content']) // 4
if token_count + msg_tokens > max_tokens * 0.6:
break
context.insert(0, msg)
token_count += msg_tokens
# Add summaries from higher levels
for level in range(1, len(self.levels)):
for summary in self.levels[level]:
summary_tokens = len(summary['content']) // 4
if token_count + summary_tokens > max_tokens:
break
context.insert(0, summary)
token_count += summary_tokens
return contextAcademic reference: "Recursively Summarizing Enables Long-Term Dialogue Memory in Large Language Models" (arXiv:2308.15022)
以树状结构创建摘要的摘要。
优点: 极致压缩(20倍以上),可处理100万+令牌的对话
缺点: 实现复杂,需要多次LLM调用,信息损失会累积
最佳场景: 长期运行的对话,多会话应用
架构:
Level 0(原始): [Msg1][Msg2][Msg3][Msg4][Msg5][Msg6][Msg7][Msg8]
Level 1(块): [Summary1-2] [Summary3-4] [Summary5-6] [Summary7-8]
Level 2(组): [Summary1-4] [Summary5-8]
Level 3(会话): [Overall Session Summary]python
from anthropic import Anthropic
from typing import List, Dict
class HierarchicalMemory:
def __init__(self, client: Anthropic, chunk_size: int = 10):
self.client = client
self.chunk_size = chunk_size
self.levels: List[List[Dict]] = [[]] # Level 0 = 原始消息
def add_message(self, message: Dict):
"""添加消息并在需要时触发摘要。"""
self.levels[0].append(message)
if len(self.levels[0]) >= self.chunk_size * 2:
self._summarize_level(0)
def _summarize_level(self, level: int):
"""将一个级别的内容摘要到更高一级。"""
messages = self.levels[level]
# 确保下一级存在
while len(self.levels) <= level + 1:
self.levels.append([])
# 摘要第一个块
chunk = messages[:self.chunk_size]
summary = self._generate_summary(chunk, level)
# 移动到下一级
self.levels[level + 1].append(summary)
self.levels[level] = messages[self.chunk_size:]
# 递归检查下一级是否需要摘要
if len(self.levels[level + 1]) >= self.chunk_size * 2:
self._summarize_level(level + 1)
def _generate_summary(self, messages: List[Dict], level: int) -> Dict:
"""为一个块生成摘要。"""
conversation_text = "\n\n".join([
f"{msg['role'].upper()}: {msg['content']}"
for msg in messages
])
response = self.client.messages.create(
model="claude-3-5-haiku-20241022",
max_tokens=300,
messages=[{
"role": "user",
"content": f"摘要这段Level {level}对话块:\n\n{conversation_text}"
}]
)
return {
"role": "system",
"content": f"[L{level+1} 摘要] {response.content[0].text}",
"level": level + 1
}
def get_context(self, max_tokens: int = 4000) -> List[Dict]:
"""在令牌预算内检索上下文。"""
context = []
token_count = 0
# 优先保留最近的原始消息
for msg in reversed(self.levels[0]):
msg_tokens = len(msg['content']) // 4
if token_count + msg_tokens > max_tokens * 0.6:
break
context.insert(0, msg)
token_count += msg_tokens
# 从更高层级添加摘要
for level in range(1, len(self.levels)):
for summary in self.levels[level]:
summary_tokens = len(summary['content']) // 4
if token_count + summary_tokens > max_tokens:
break
context.insert(0, summary)
token_count += summary_tokens
return context学术参考: "Recursively Summarizing Enables Long-Term Dialogue Memory in Large Language Models" (arXiv:2308.15022)
1.4 Rolling Summarization (Continuous)
1.4 滚动摘要(持续)
Continuously compresses conversation with sliding window.
Pros: Low latency, predictable token usage, simple
Cons: Early details over-compressed, no information recovery
Best for: Real-time chat, streaming conversations
python
from anthropic import Anthropic
class RollingMemory:
def __init__(self, client: Anthropic, window_size: int = 10, compress_threshold: int = 15):
self.client = client
self.window_size = window_size
self.compress_threshold = compress_threshold
self.rolling_summary = None
self.recent_messages = []
def add_message(self, message: dict):
self.recent_messages.append(message)
if len(self.recent_messages) >= self.compress_threshold:
self._compress()
def _compress(self):
"""Compress older messages into rolling summary."""
messages_to_compress = self.recent_messages[:-self.window_size]
parts = []
if self.rolling_summary:
parts.append(f"Existing summary:\n{self.rolling_summary}")
parts.append("\nNew messages:\n" + "\n\n".join([
f"{msg['role']}: {msg['content']}"
for msg in messages_to_compress
]))
response = self.client.messages.create(
model="claude-3-5-haiku-20241022",
max_tokens=400,
messages=[{
"role": "user",
"content": "\n".join(parts) + "\n\nUpdate the summary:"
}]
)
self.rolling_summary = response.content[0].text
self.recent_messages = self.recent_messages[-self.window_size:]
def get_context(self):
context = []
if self.rolling_summary:
context.append({
"role": "system",
"content": f"[Summary]\n{self.rolling_summary}"
})
context.extend(self.recent_messages)
return context使用滑动窗口持续压缩对话。
优点: 低延迟,令牌使用可预测,实现简单
缺点: 早期细节过度压缩,无法恢复信息
最佳场景: 实时聊天,流式对话
python
from anthropic import Anthropic
class RollingMemory:
def __init__(self, client: Anthropic, window_size: int = 10, compress_threshold: int = 15):
self.client = client
self.window_size = window_size
self.compress_threshold = compress_threshold
self.rolling_summary = None
self.recent_messages = []
def add_message(self, message: dict):
self.recent_messages.append(message)
if len(self.recent_messages) >= self.compress_threshold:
self._compress()
def _compress(self):
"""将旧消息压缩为滚动摘要。"""
messages_to_compress = self.recent_messages[:-self.window_size]
parts = []
if self.rolling_summary:
parts.append(f"现有摘要:\n{self.rolling_summary}")
parts.append("\n新消息:\n" + "\n\n".join([
f"{msg['role']}: {msg['content']}"
for msg in messages_to_compress
]))
response = self.client.messages.create(
model="claude-3-5-haiku-20241022",
max_tokens=400,
messages=[{
"role": "user",
"content": "\n".join(parts) + "\n\n更新摘要:"
}]
)
self.rolling_summary = response.content[0].text
self.recent_messages = self.recent_messages[-self.window_size:]
def get_context(self):
context = []
if self.rolling_summary:
context.append({
"role": "system",
"content": f"[摘要]\n{self.rolling_summary}"
})
context.extend(self.recent_messages)
return context2. Embedding-Based Approaches
2. 基于嵌入的方法
2.1 RAG (Retrieval-Augmented Generation)
2.1 RAG(检索增强生成)
Store full conversation in vector database, retrieve only relevant chunks.
Pros: Extremely scalable, no information loss, high relevance
Cons: Requires vector DB infrastructure, retrieval latency
Best for: Knowledge bases, customer support with large history
python
from anthropic import Anthropic
from openai import OpenAI
import chromadb
class RAGMemory:
def __init__(self, anthropic_client: Anthropic, openai_client: OpenAI):
self.anthropic = anthropic_client
self.openai = openai_client
# Initialize vector store
self.chroma = chromadb.Client()
self.collection = self.chroma.create_collection(
name="conversation",
metadata={"hnsw:space": "cosine"}
)
self.recent_messages = []
self.recent_window = 5
self.message_counter = 0
def add_message(self, message: dict):
"""Add to recent memory and vector store."""
self.recent_messages.append(message)
if len(self.recent_messages) > self.recent_window:
old_msg = self.recent_messages.pop(0)
self._store_in_vectordb(old_msg)
def _store_in_vectordb(self, message: dict):
"""Archive to vector database."""
# Generate embedding
response = self.openai.embeddings.create(
model="text-embedding-3-small",
input=message['content']
)
self.collection.add(
embeddings=[response.data[0].embedding],
documents=[message['content']],
metadatas=[{"role": message['role']}],
ids=[f"msg_{self.message_counter}"]
)
self.message_counter += 1
def retrieve_context(self, query: str, max_tokens: int = 4000):
"""Retrieve relevant context using RAG."""
context = []
token_count = 0
# 1. Recent messages (short-term memory)
for msg in self.recent_messages:
context.append(msg)
token_count += len(msg['content']) // 4
# 2. Retrieve relevant historical context
if token_count < max_tokens:
query_embedding = self.openai.embeddings.create(
model="text-embedding-3-small",
input=query
)
n_results = min(10, (max_tokens - token_count) // 100)
results = self.collection.query(
query_embeddings=[query_embedding.data[0].embedding],
n_results=n_results
)
for i, doc in enumerate(results['documents'][0]):
if token_count + len(doc) // 4 > max_tokens:
break
metadata = results['metadatas'][0][i]
context.insert(0, {
"role": metadata['role'],
"content": f"[Retrieved] {doc}"
})
token_count += len(doc) // 4
return contextVector database options:
- ChromaDB: Embedded, easy local development
- Pinecone: Managed, 50ms p95 latency
- Weaviate: Open-source, hybrid search
- Qdrant: High performance, payload filtering
将完整对话存储在向量数据库中,仅检索相关块。
优点: 可扩展性极强,无信息损失,相关性高
缺点: 需要向量数据库基础设施,存在检索延迟
最佳场景: 知识库,包含大量历史的客户支持
python
from anthropic import Anthropic
from openai import OpenAI
import chromadb
class RAGMemory:
def __init__(self, anthropic_client: Anthropic, openai_client: OpenAI):
self.anthropic = anthropic_client
self.openai = openai_client
# 初始化向量存储
self.chroma = chromadb.Client()
self.collection = self.chroma.create_collection(
name="conversation",
metadata={"hnsw:space": "cosine"}
)
self.recent_messages = []
self.recent_window = 5
self.message_counter = 0
def add_message(self, message: dict):
"""添加到近期内存和向量存储。"""
self.recent_messages.append(message)
if len(self.recent_messages) > self.recent_window:
old_msg = self.recent_messages.pop(0)
self._store_in_vectordb(old_msg)
def _store_in_vectordb(self, message: dict):
"""归档到向量数据库。"""
# 生成嵌入
response = self.openai.embeddings.create(
model="text-embedding-3-small",
input=message['content']
)
self.collection.add(
embeddings=[response.data[0].embedding],
documents=[message['content']],
metadatas=[{"role": message['role']}],
ids=[f"msg_{self.message_counter}"]
)
self.message_counter += 1
def retrieve_context(self, query: str, max_tokens: int = 4000):
"""使用RAG检索相关上下文。"""
context = []
token_count = 0
# 1. 近期消息(短期内存)
for msg in self.recent_messages:
context.append(msg)
token_count += len(msg['content']) // 4
# 2. 检索相关历史上下文
if token_count < max_tokens:
query_embedding = self.openai.embeddings.create(
model="text-embedding-3-small",
input=query
)
n_results = min(10, (max_tokens - token_count) // 100)
results = self.collection.query(
query_embeddings=[query_embedding.data[0].embedding],
n_results=n_results
)
for i, doc in enumerate(results['documents'][0]):
if token_count + len(doc) // 4 > max_tokens:
break
metadata = results['metadatas'][0][i]
context.insert(0, {
"role": metadata['role'],
"content": f"[检索内容] {doc}"
})
token_count += len(doc) // 4
return context向量数据库选项:
- ChromaDB: 嵌入式,易于本地开发
- Pinecone: 托管式,p95延迟50ms
- Weaviate: 开源,混合搜索
- Qdrant: 高性能,负载过滤
2.2 Vector Search and Clustering
2.2 向量搜索与聚类
Group similar messages into clusters, represent with centroids.
Pros: Reduces redundancy, identifies themes, multi-topic handling
Cons: Requires sufficient data, may lose nuances
Best for: Multi-topic conversations, meeting summaries
python
from sklearn.cluster import KMeans
from openai import OpenAI
import numpy as np
class ClusteredMemory:
def __init__(self, openai_client: OpenAI, n_clusters: int = 5):
self.client = openai_client
self.n_clusters = n_clusters
self.messages = []
self.embeddings = []
def add_messages(self, messages: list):
for msg in messages:
self.messages.append(msg)
response = self.client.embeddings.create(
model="text-embedding-3-small",
input=msg['content']
)
self.embeddings.append(response.data[0].embedding)
def compress_by_clustering(self):
"""Cluster messages and return representatives."""
if len(self.messages) < self.n_clusters:
return self.messages
embeddings_array = np.array(self.embeddings)
kmeans = KMeans(n_clusters=self.n_clusters, random_state=42)
labels = kmeans.fit_predict(embeddings_array)
# Select message closest to each centroid
compressed = []
for cluster_id in range(self.n_clusters):
cluster_indices = np.where(labels == cluster_id)[0]
centroid = kmeans.cluster_centers_[cluster_id]
cluster_embeddings = embeddings_array[cluster_indices]
distances = np.linalg.norm(cluster_embeddings - centroid, axis=1)
closest_idx = cluster_indices[np.argmin(distances)]
compressed.append({
**self.messages[closest_idx],
"cluster_id": int(cluster_id),
"cluster_size": len(cluster_indices)
})
return compressed将相似消息分组为集群,用质心表示。
优点: 减少冗余,识别主题,处理多主题对话
缺点: 需要足够的数据,可能丢失细节
最佳场景: 多主题对话,会议摘要
python
from sklearn.cluster import KMeans
from openai import OpenAI
import numpy as np
class ClusteredMemory:
def __init__(self, openai_client: OpenAI, n_clusters: int = 5):
self.client = openai_client
self.n_clusters = n_clusters
self.messages = []
self.embeddings = []
def add_messages(self, messages: list):
for msg in messages:
self.messages.append(msg)
response = self.client.embeddings.create(
model="text-embedding-3-small",
input=msg['content']
)
self.embeddings.append(response.data[0].embedding)
def compress_by_clustering(self):
"""聚类消息并返回代表内容。"""
if len(self.messages) < self.n_clusters:
return self.messages
embeddings_array = np.array(self.embeddings)
kmeans = KMeans(n_clusters=self.n_clusters, random_state=42)
labels = kmeans.fit_predict(embeddings_array)
# 选择每个质心最近的消息
compressed = []
for cluster_id in range(self.n_clusters):
cluster_indices = np.where(labels == cluster_id)[0]
centroid = kmeans.cluster_centers_[cluster_id]
cluster_embeddings = embeddings_array[cluster_indices]
distances = np.linalg.norm(cluster_embeddings - centroid, axis=1)
closest_idx = cluster_indices[np.argmin(distances)]
compressed.append({
**self.messages[closest_idx],
"cluster_id": int(cluster_id),
"cluster_size": len(cluster_indices)
})
return compressed2.3 Semantic Deduplication
2.3 语义去重
Remove semantically similar messages that convey redundant information.
Pros: Reduces redundancy without losing unique content
Cons: Requires threshold tuning, O(n²) complexity
Best for: FAQ systems, repetitive conversations
python
from openai import OpenAI
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
class SemanticDeduplicator:
def __init__(self, openai_client: OpenAI, similarity_threshold: float = 0.85):
self.client = openai_client
self.threshold = similarity_threshold
def deduplicate(self, messages: list):
"""Remove semantically similar messages."""
if len(messages) <= 1:
return messages
# Generate embeddings
embeddings = []
for msg in messages:
response = self.client.embeddings.create(
model="text-embedding-3-small",
input=msg['content']
)
embeddings.append(response.data[0].embedding)
embeddings_array = np.array(embeddings)
similarity_matrix = cosine_similarity(embeddings_array)
# Mark unique messages
keep_indices = []
for i in range(len(messages)):
is_unique = True
for j in keep_indices:
if similarity_matrix[i][j] > self.threshold:
is_unique = False
break
if is_unique:
keep_indices.append(i)
return [messages[i] for i in keep_indices]移除语义相似、传达冗余信息的消息。
优点: 在不丢失唯一内容的情况下减少冗余
缺点: 需要调优阈值,时间复杂度O(n²)
最佳场景: FAQ系统,重复性对话
python
from openai import OpenAI
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
class SemanticDeduplicator:
def __init__(self, openai_client: OpenAI, similarity_threshold: float = 0.85):
self.client = openai_client
self.threshold = similarity_threshold
def deduplicate(self, messages: list):
"""移除语义相似的消息。"""
if len(messages) <= 1:
return messages
# 生成嵌入
embeddings = []
for msg in messages:
response = self.client.embeddings.create(
model="text-embedding-3-small",
input=msg['content']
)
embeddings.append(response.data[0].embedding)
embeddings_array = np.array(embeddings)
similarity_matrix = cosine_similarity(embeddings_array)
# 标记唯一消息
keep_indices = []
for i in range(len(messages)):
is_unique = True
for j in keep_indices:
if similarity_matrix[i][j] > self.threshold:
is_unique = False
break
if is_unique:
keep_indices.append(i)
return [messages[i] for i in keep_indices]3. Token-Efficient Strategies
3. 令牌高效策略
3.1 Message Prioritization
3.1 消息优先级
Assign importance scores and retain only high-priority content.
Pros: Retains most important information, flexible criteria
Cons: Scoring is heuristic-based, may break flow
Best for: Mixed-importance conversations, filtering noise
python
import re
class MessagePrioritizer:
def score_message(self, msg: dict, index: int, total: int) -> float:
"""Calculate composite importance score."""
scores = []
# Length score (longer = more info)
scores.append(min(len(msg['content']) / 500, 1.0))
# Question score
if msg['role'] == 'user':
scores.append(min(msg['content'].count('?') * 0.5, 1.0))
# Entity score (capitalized words)
entities = len(re.findall(r'\b[A-Z][a-z]+', msg['content']))
scores.append(min(entities / 10, 1.0))
# Recency score (linear decay)
scores.append(index / max(total - 1, 1))
# Role score
scores.append(0.6 if msg['role'] == 'user' else 0.4)
return sum(scores) / len(scores)
def prioritize(self, messages: list, target_count: int):
"""Select top N messages by priority."""
scored = [
(msg, self.score_message(msg, i, len(messages)), i)
for i, msg in enumerate(messages)
]
scored.sort(key=lambda x: x[1], reverse=True)
top_messages = scored[:target_count]
top_messages.sort(key=lambda x: x[2]) # Restore chronological order
return [msg for msg, score, idx in top_messages]分配重要性分数,仅保留高优先级内容。
优点: 保留最重要的信息,标准灵活
缺点: 评分基于启发式,可能破坏对话流
最佳场景: 混合重要性的对话,过滤噪音
python
import re
class MessagePrioritizer:
def score_message(self, msg: dict, index: int, total: int) -> float:
"""计算综合重要性分数。"""
scores = []
# 长度分数(越长=信息越多)
scores.append(min(len(msg['content']) / 500, 1.0))
# 问题分数
if msg['role'] == 'user':
scores.append(min(msg['content'].count('?') * 0.5, 1.0))
# 实体分数(大写单词)
entities = len(re.findall(r'\b[A-Z][a-z]+', msg['content']))
scores.append(min(entities / 10, 1.0))
# 新鲜度分数(线性衰减)
scores.append(index / max(total - 1, 1))
# 角色分数
scores.append(0.6 if msg['role'] == 'user' else 0.4)
return sum(scores) / len(scores)
def prioritize(self, messages: list, target_count: int):
"""按优先级选择前N条消息。"""
scored = [
(msg, self.score_message(msg, i, len(messages)), i)
for i, msg in enumerate(messages)
]
scored.sort(key=lambda x: x[1], reverse=True)
top_messages = scored[:target_count]
top_messages.sort(key=lambda x: x[2]) # 恢复时间顺序
return [msg for msg, score, idx in top_messages]3.2 Delta Compression
3.2 增量压缩
Store only changes between consecutive messages.
Pros: Highly efficient for incremental changes
Cons: Reconstruction overhead, not suitable for all content
Best for: Code assistants with incremental edits
python
import difflib
class DeltaCompressor:
def __init__(self):
self.base_messages = []
self.deltas = []
def add_message(self, message: dict):
if not self.base_messages:
self.base_messages.append(message)
return
# Find most similar previous message
last_msg = self.base_messages[-1]
if last_msg['role'] == message['role']:
# Calculate delta
diff = list(difflib.unified_diff(
last_msg['content'].splitlines(),
message['content'].splitlines(),
lineterm=''
))
if len('\n'.join(diff)) < len(message['content']) * 0.7:
# Store as delta if compression achieved
self.deltas.append({
'base_index': len(self.base_messages) - 1,
'delta': diff,
'role': message['role']
})
return
# Store as new base message
self.base_messages.append(message)
def reconstruct(self):
"""Reconstruct full conversation from bases + deltas."""
messages = self.base_messages.copy()
for delta_info in self.deltas:
base_content = messages[delta_info['base_index']]['content']
# Apply diff to reconstruct (simplified)
reconstructed = base_content # Full implementation would apply diff
messages.append({
'role': delta_info['role'],
'content': reconstructed
})
return messages仅存储连续消息之间的变化。
优点: 对增量变化的效率极高
缺点: 存在重构开销,不适用于所有内容
最佳场景: 包含增量编辑的代码助手
python
import difflib
class DeltaCompressor:
def __init__(self):
self.base_messages = []
self.deltas = []
def add_message(self, message: dict):
if not self.base_messages:
self.base_messages.append(message)
return
# 找到最相似的前一条消息
last_msg = self.base_messages[-1]
if last_msg['role'] == message['role']:
# 计算增量
diff = list(difflib.unified_diff(
last_msg['content'].splitlines(),
message['content'].splitlines(),
lineterm=''
))
if len('\n'.join(diff)) < len(message['content']) * 0.7:
# 如果实现了压缩,则存储为增量
self.deltas.append({
'base_index': len(self.base_messages) - 1,
'delta': diff,
'role': message['role']
})
return
# 存储为新的基础消息
self.base_messages.append(message)
def reconstruct(self):
"""从基础消息+增量重构完整对话。"""
messages = self.base_messages.copy()
for delta_info in self.deltas:
base_content = messages[delta_info['base_index']]['content']
# 应用diff进行重构(简化版)
reconstructed = base_content # 完整实现会应用diff
messages.append({
'role': delta_info['role'],
'content': reconstructed
})
return messages4. LangChain Memory Types
4. LangChain内存类型
4.1 ConversationSummaryMemory
4.1 ConversationSummaryMemory
Automatically summarizes conversation as it progresses.
python
from langchain.memory import ConversationSummaryMemory
from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-3-5-sonnet-20241022")
memory = ConversationSummaryMemory(llm=llm)随着对话推进自动生成摘要。
python
from langchain.memory import ConversationSummaryMemory
from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-3-5-sonnet-20241022")
memory = ConversationSummaryMemory(llm=llm)Add conversation
添加对话
memory.save_context(
{"input": "Hi, I'm working on a Python project"},
{"output": "Great! How can I help with your Python project?"}
)
memory.save_context(
{"input": "你好,我正在做一个Python项目"},
{"output": "太棒了!我能怎么帮你处理这个Python项目?"}
)
Get summary
获取摘要
summary = memory.load_memory_variables({})
print(summary['history'])
**Pros:** Automatic summarization, simple API
**Cons:** Every turn triggers LLM call
**Best for:** Medium conversations (20-50 turns)summary = memory.load_memory_variables({})
print(summary['history'])
**优点:** 自动摘要,API简单
**缺点:** 每一轮都会触发LLM调用
**最佳场景:** 中等长度对话(20-50轮)4.2 ConversationSummaryBufferMemory
4.2 ConversationSummaryBufferMemory
Hybrid: Recent messages verbatim, older summarized.
python
from langchain.memory import ConversationSummaryBufferMemory
from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-3-5-haiku-20241022")
memory = ConversationSummaryBufferMemory(
llm=llm,
max_token_limit=2000, # Summarize when exceeding
return_messages=True
)混合模式:近期消息原文存储,旧消息摘要存储。
python
from langchain.memory import ConversationSummaryBufferMemory
from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-3-5-haiku-20241022")
memory = ConversationSummaryBufferMemory(
llm=llm,
max_token_limit=2000, # 超过此限制时摘要
return_messages=True
)Add conversation
添加对话
for i in range(50):
memory.save_context(
{"input": f"Question {i}"},
{"output": f"Answer {i}"}
)
for i in range(50):
memory.save_context(
{"input": f"问题 {i}"},
{"output": f"答案 {i}"}
)
Automatically keeps recent messages + summary of old
自动保留近期消息+旧消息摘要
context = memory.load_memory_variables({})
**Pros:** Best balance of detail and compression
**Cons:** Requires token limit tuning
**Best for:** Most production applicationscontext = memory.load_memory_variables({})
**优点:** 在细节和压缩之间实现最佳平衡
**缺点:** 需要调优令牌限制
**最佳场景:** 大多数生产应用4.3 ConversationTokenBufferMemory
4.3 ConversationTokenBufferMemory
Maintains fixed token budget, drops oldest when exceeded.
python
from langchain.memory import ConversationTokenBufferMemory
from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-3-5-sonnet-20241022")
memory = ConversationTokenBufferMemory(
llm=llm,
max_token_limit=2000
)维持固定令牌预算,超过时丢弃最旧的消息。
python
from langchain.memory import ConversationTokenBufferMemory
from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-3-5-sonnet-20241022")
memory = ConversationTokenBufferMemory(
llm=llm,
max_token_limit=2000
)Simple FIFO when token limit exceeded
令牌限制超过时采用简单的FIFO策略
**Pros:** Predictable token usage, simple
**Cons:** Loses old information completely
**Best for:** Real-time chat with strict limits
**优点:** 令牌使用可预测,实现简单
**缺点:** 完全丢失旧信息
**最佳场景:** 有严格限制的实时聊天4.4 VectorStoreRetrieverMemory
4.4 VectorStoreRetrieverMemory
Stores all messages in vector database, retrieves relevant ones.
python
from langchain.memory import VectorStoreRetrieverMemory
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
embeddings = OpenAIEmbeddings()
vectorstore = Chroma(embedding_function=embeddings)
memory = VectorStoreRetrieverMemory(
retriever=vectorstore.as_retriever(search_kwargs={"k": 5})
)将所有消息存储在向量数据库中,检索相关内容。
python
from langchain.memory import VectorStoreRetrieverMemory
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
embeddings = OpenAIEmbeddings()
vectorstore = Chroma(embedding_function=embeddings)
memory = VectorStoreRetrieverMemory(
retriever=vectorstore.as_retriever(search_kwargs={"k": 5})
)Automatically retrieves most relevant context
自动检索最相关的上下文
**Pros:** Infinite conversation length, semantic retrieval
**Cons:** Requires vector DB, retrieval overhead
**Best for:** Long-running conversations, knowledge bases
**优点:** 对话长度无限制,语义检索
**缺点:** 需要向量数据库,存在检索开销
**最佳场景:** 长期运行的对话,知识库5. Anthropic-Specific Patterns
5. Anthropic特定模式
5.1 Prompt Caching (90% Cost Reduction)
5.1 Prompt Caching(降低90%成本)
Cache static context to reduce token costs.
python
from anthropic import Anthropic
client = Anthropic(api_key="your-api-key")缓存静态上下文以减少令牌成本。
python
from anthropic import Anthropic
client = Anthropic(api_key="your-api-key")Long conversation context
长对话上下文
conversation_history = [
{"role": "user", "content": "Message 1"},
{"role": "assistant", "content": "Response 1"},
# ... many more messages
]
conversation_history = [
{"role": "user", "content": "Message 1"},
{"role": "assistant", "content": "Response 1"},
# ...更多消息
]
Mark context for caching
标记上下文以进行缓存
messages = []
for i, msg in enumerate(conversation_history[:-1]):
content = msg['content']
# Add cache control to last context message
if i == len(conversation_history) - 2:
messages.append({
"role": msg['role'],
"content": [
{
"type": "text",
"text": content,
"cache_control": {"type": "ephemeral"}
}
]
})
else:
messages.append(msg)messages = []
for i, msg in enumerate(conversation_history[:-1]):
content = msg['content']
# 为最后一条上下文消息添加缓存控制
if i == len(conversation_history) - 2:
messages.append({
"role": msg['role'],
"content": [
{
"type": "text",
"text": content,
"cache_control": {"type": "ephemeral"}
}
]
})
else:
messages.append(msg)Add new user message (not cached)
添加新的用户消息(不缓存)
messages.append(conversation_history[-1])
response = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=messages
)
messages.append(conversation_history[-1])
response = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=messages
)
Subsequent calls with same cached context cost 90% less
后续使用相同缓存上下文的调用成本降低90%
**Cache TTL:** 5 minutes
**Savings:** 90% cost reduction for cached tokens
**Limits:** Max 4 cache breakpoints per request
**Best practices:**
- Cache conversation history, not current query
- Update cache when context changes significantly
- Combine with summarization for maximum efficiency
**缓存TTL:** 5分钟
**成本节省:** 缓存令牌的成本降低90%
**限制:** 每个请求最多4个缓存断点
**最佳实践:**
- 缓存对话历史,而非当前查询
- 上下文发生显著变化时更新缓存
- 与摘要结合以实现最大效率5.2 Extended Thinking for Compression Planning
5.2 扩展思考用于压缩规划
Use extended thinking to plan optimal compression strategy.
python
from anthropic import Anthropic
client = Anthropic(api_key="your-api-key")
response = client.messages.create(
model="claude-3-7-sonnet-20250219",
max_tokens=16000,
thinking={
"type": "enabled",
"budget_tokens": 10000
},
messages=[{
"role": "user",
"content": f"""Analyze this conversation and recommend compression:
{conversation_text}
Current token count: {current_tokens}
Target: {target_tokens}
Required compression: {compression_ratio}x
Recommend optimal strategy."""
}]
)使用扩展思考来规划最优压缩策略。
python
from anthropic import Anthropic
client = Anthropic(api_key="your-api-key")
response = client.messages.create(
model="claude-3-7-sonnet-20250219",
max_tokens=16000,
thinking={
"type": "enabled",
"budget_tokens": 10000
},
messages=[{
"role": "user",
"content": f"""分析这段对话并推荐压缩策略:
{conversation_text}
当前令牌数:{current_tokens}
目标:{target_tokens}
所需压缩比:{compression_ratio}倍
推荐最优策略。"""
}]
)Access thinking process
访问思考过程
thinking_content = [
block for block in response.content
if block.type == "thinking"
]
thinking_content = [
block for block in response.content
if block.type == "thinking"
]
Get compression recommendation
获取压缩建议
recommendation = response.content[-1].text
---recommendation = response.content[-1].text
---Production Patterns
生产模式
Checkpointing and Persistence
检查点与持久化
Save compression state for recovery and resume.
python
import json
import pickle
from pathlib import Path
class PersistentMemory:
def __init__(self, checkpoint_dir: str = "./checkpoints"):
self.checkpoint_dir = Path(checkpoint_dir)
self.checkpoint_dir.mkdir(exist_ok=True)
self.memory = []
self.summary = None
def save_checkpoint(self, session_id: str):
"""Save current memory state."""
checkpoint = {
'messages': self.memory,
'summary': self.summary,
'timestamp': time.time()
}
checkpoint_file = self.checkpoint_dir / f"{session_id}.json"
with open(checkpoint_file, 'w') as f:
json.dump(checkpoint, f, indent=2)
def load_checkpoint(self, session_id: str):
"""Load memory state from checkpoint."""
checkpoint_file = self.checkpoint_dir / f"{session_id}.json"
if checkpoint_file.exists():
with open(checkpoint_file, 'r') as f:
checkpoint = json.load(f)
self.memory = checkpoint['messages']
self.summary = checkpoint.get('summary')
return True
return False
def auto_checkpoint(self, session_id: str, interval: int = 10):
"""Automatically save every N messages."""
if len(self.memory) % interval == 0:
self.save_checkpoint(session_id)保存压缩状态以进行恢复和续期。
python
import json
import pickle
from pathlib import Path
import time
class PersistentMemory:
def __init__(self, checkpoint_dir: str = "./checkpoints"):
self.checkpoint_dir = Path(checkpoint_dir)
self.checkpoint_dir.mkdir(exist_ok=True)
self.memory = []
self.summary = None
def save_checkpoint(self, session_id: str):
"""保存当前内存状态。"""
checkpoint = {
'messages': self.memory,
'summary': self.summary,
'timestamp': time.time()
}
checkpoint_file = self.checkpoint_dir / f"{session_id}.json"
with open(checkpoint_file, 'w') as f:
json.dump(checkpoint, f, indent=2)
def load_checkpoint(self, session_id: str):
"""从检查点加载内存状态。"""
checkpoint_file = self.checkpoint_dir / f"{session_id}.json"
if checkpoint_file.exists():
with open(checkpoint_file, 'r') as f:
checkpoint = json.load(f)
self.memory = checkpoint['messages']
self.summary = checkpoint.get('summary')
return True
return False
def auto_checkpoint(self, session_id: str, interval: int = 10):
"""每添加N条消息自动保存。"""
if len(self.memory) % interval == 0:
self.save_checkpoint(session_id)Resume Workflows
续期工作流
Continue conversations across sessions.
python
from anthropic import Anthropic
import json
class ResumableConversation:
def __init__(self, client: Anthropic, session_id: str):
self.client = client
self.session_id = session_id
self.memory = self._load_or_create()
def _load_or_create(self):
"""Load existing session or create new."""
try:
with open(f'sessions/{self.session_id}.json', 'r') as f:
return json.load(f)
except FileNotFoundError:
return {
'messages': [],
'summary': None,
'created_at': time.time()
}
def add_turn(self, user_message: str):
"""Add user message and get response."""
# Add user message
self.memory['messages'].append({
'role': 'user',
'content': user_message
})
# Build context (with compression)
context = self._build_context()
# Get response
response = self.client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=context + [{
'role': 'user',
'content': user_message
}]
)
# Save response
assistant_message = response.content[0].text
self.memory['messages'].append({
'role': 'assistant',
'content': assistant_message
})
# Compress if needed
if len(self.memory['messages']) > 20:
self._compress()
# Save state
self._save()
return assistant_message
def _build_context(self):
"""Build context with compression."""
context = []
# Add summary if exists
if self.memory['summary']:
context.append({
'role': 'system',
'content': f"[Previous conversation summary]\n{self.memory['summary']}"
})
# Add recent messages
context.extend(self.memory['messages'][-10:])
return context
def _compress(self):
"""Compress older messages."""
if len(self.memory['messages']) < 15:
return
# Messages to summarize
to_summarize = self.memory['messages'][:-10]
# Generate summary
conversation_text = "\n\n".join([
f"{msg['role']}: {msg['content']}"
for msg in to_summarize
])
response = self.client.messages.create(
model="claude-3-5-haiku-20241022",
max_tokens=500,
messages=[{
'role': 'user',
'content': f"Summarize this conversation:\n\n{conversation_text}"
}]
)
# Update memory
self.memory['summary'] = response.content[0].text
self.memory['messages'] = self.memory['messages'][-10:]
def _save(self):
"""Save session to disk."""
with open(f'sessions/{self.session_id}.json', 'w') as f:
json.dump(self.memory, f, indent=2)跨会话继续对话。
python
from anthropic import Anthropic
import json
import time
class ResumableConversation:
def __init__(self, client: Anthropic, session_id: str):
self.client = client
self.session_id = session_id
self.memory = self._load_or_create()
def _load_or_create(self):
"""加载现有会话或创建新会话。"""
try:
with open(f'sessions/{self.session_id}.json', 'r') as f:
return json.load(f)
except FileNotFoundError:
return {
'messages': [],
'summary': None,
'created_at': time.time()
}
def add_turn(self, user_message: str):
"""添加用户消息并获取响应。"""
# 添加用户消息
self.memory['messages'].append({
'role': 'user',
'content': user_message
})
# 构建上下文(带压缩)
context = self._build_context()
# 获取响应
response = self.client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=context + [{
'role': 'user',
'content': user_message
}]
)
# 保存响应
assistant_message = response.content[0].text
self.memory['messages'].append({
'role': 'assistant',
'content': assistant_message
})
# 必要时压缩
if len(self.memory['messages']) > 20:
self._compress()
# 保存状态
self._save()
return assistant_message
def _build_context(self):
"""构建带压缩的上下文。"""
context = []
# 如果存在摘要则添加
if self.memory['summary']:
context.append({
'role': 'system',
'content': f"[之前对话的摘要]\n{self.memory['summary']}"
})
# 添加近期消息
context.extend(self.memory['messages'][-10:])
return context
def _compress(self):
"""压缩旧消息。"""
if len(self.memory['messages']) < 15:
return
# 待摘要的消息
to_summarize = self.memory['messages'][:-10]
# 生成摘要
conversation_text = "\n\n".join([
f"{msg['role']}: {msg['content']}"
for msg in to_summarize
])
response = self.client.messages.create(
model="claude-3-5-haiku-20241022",
max_tokens=500,
messages=[{
'role': 'user',
'content': f"摘要这段对话:\n\n{conversation_text}"
}]
)
# 更新内存
self.memory['summary'] = response.content[0].text
self.memory['messages'] = self.memory['messages'][-10:]
def _save(self):
"""将会话保存到磁盘。"""
with open(f'sessions/{self.session_id}.json', 'w') as f:
json.dump(self.memory, f, indent=2)Usage
使用示例
client = Anthropic(api_key="your-api-key")
conversation = ResumableConversation(client, session_id="user123_session1")
client = Anthropic(api_key="your-api-key")
conversation = ResumableConversation(client, session_id="user123_session1")
Continue across multiple sessions
跨多个会话继续
response1 = conversation.add_turn("What's Python?")
response1 = conversation.add_turn("什么是Python?")
... later session
...之后的会话
response2 = conversation.add_turn("Show me an example") # Remembers context
undefinedresponse2 = conversation.add_turn("给我看一个示例") # 会记住上下文
undefinedHybrid Approaches (Best Practice)
混合方法(最佳实践)
Combine multiple techniques for optimal results.
python
from anthropic import Anthropic
from openai import OpenAI
import chromadb
class HybridMemorySystem:
"""
Combines:
- Rolling summarization (short-term compression)
- RAG retrieval (long-term memory)
- Prompt caching (cost optimization)
- Progressive compression (adaptive behavior)
"""
def __init__(self, anthropic_client: Anthropic, openai_client: OpenAI):
self.anthropic = anthropic_client
self.openai = openai_client
# Recent messages (verbatim)
self.recent_messages = []
self.recent_window = 10
# Rolling summary
self.rolling_summary = None
# Vector store (long-term)
self.chroma = chromadb.Client()
self.collection = self.chroma.create_collection(name="memory")
self.message_counter = 0
# Compression thresholds
self.thresholds = {
'light': 0.70, # Start basic compression
'medium': 0.85, # Aggressive summarization
'heavy': 0.95 # Emergency measures
}
def add_message(self, message: dict):
"""Add message with intelligent compression."""
self.recent_messages.append(message)
# Check compression needs
usage_ratio = self._estimate_usage()
if usage_ratio >= self.thresholds['heavy']:
self._emergency_compress()
elif usage_ratio >= self.thresholds['medium']:
self._medium_compress()
elif usage_ratio >= self.thresholds['light']:
self._light_compress()
def _light_compress(self):
"""Remove redundancy, archive to vector store."""
if len(self.recent_messages) > self.recent_window * 1.5:
# Archive oldest to vector store
to_archive = self.recent_messages[:5]
for msg in to_archive:
self._archive_to_vectorstore(msg)
self.recent_messages = self.recent_messages[5:]
def _medium_compress(self):
"""Generate rolling summary, aggressive archival."""
if len(self.recent_messages) > self.recent_window:
# Summarize older messages
to_summarize = self.recent_messages[:-self.recent_window]
summary_text = "\n\n".join([
f"{msg['role']}: {msg['content']}"
for msg in to_summarize
])
if self.rolling_summary:
summary_text = f"Existing: {self.rolling_summary}\n\nNew: {summary_text}"
response = self.anthropic.messages.create(
model="claude-3-5-haiku-20241022",
max_tokens=400,
messages=[{
'role': 'user',
'content': f"Update summary:\n{summary_text}"
}]
)
self.rolling_summary = response.content[0].text
# Archive all summarized messages
for msg in to_summarize:
self._archive_to_vectorstore(msg)
self.recent_messages = self.recent_messages[-self.recent_window:]
def _emergency_compress(self):
"""Extreme compression for near-limit situations."""
# Keep only 5 most recent messages
to_archive = self.recent_messages[:-5]
for msg in to_archive:
self._archive_to_vectorstore(msg)
self.recent_messages = self.recent_messages[-5:]
# Compress summary further if needed
if self.rolling_summary and len(self.rolling_summary) > 1000:
response = self.anthropic.messages.create(
model="claude-3-5-haiku-20241022",
max_tokens=200,
messages=[{
'role': 'user',
'content': f"Create ultra-concise summary:\n{self.rolling_summary}"
}]
)
self.rolling_summary = response.content[0].text
def _archive_to_vectorstore(self, message: dict):
"""Store in vector database for retrieval."""
embedding_response = self.openai.embeddings.create(
model="text-embedding-3-small",
input=message['content']
)
self.collection.add(
embeddings=[embedding_response.data[0].embedding],
documents=[message['content']],
metadatas=[{'role': message['role']}],
ids=[f"msg_{self.message_counter}"]
)
self.message_counter += 1
def get_context(self, current_query: str, max_tokens: int = 8000):
"""Build optimal context for current query."""
context = []
token_count = 0
# 1. Add rolling summary (if exists)
if self.rolling_summary:
summary_msg = {
'role': 'system',
'content': [
{
'type': 'text',
'text': f"[Conversation Summary]\n{self.rolling_summary}",
'cache_control': {'type': 'ephemeral'} # Cache it
}
]
}
context.append(summary_msg)
token_count += len(self.rolling_summary) // 4
# 2. Retrieve relevant historical context (RAG)
if token_count < max_tokens * 0.3:
query_embedding = self.openai.embeddings.create(
model="text-embedding-3-small",
input=current_query
)
results = self.collection.query(
query_embeddings=[query_embedding.data[0].embedding],
n_results=5
)
for i, doc in enumerate(results['documents'][0]):
if token_count + len(doc) // 4 > max_tokens * 0.3:
break
metadata = results['metadatas'][0][i]
context.append({
'role': metadata['role'],
'content': f"[Retrieved] {doc}"
})
token_count += len(doc) // 4
# 3. Add recent messages verbatim
for msg in self.recent_messages:
if token_count + len(msg['content']) // 4 > max_tokens * 0.8:
break
context.append(msg)
token_count += len(msg['content']) // 4
return context
def _estimate_usage(self):
"""Estimate current context window usage."""
total_tokens = 0
if self.rolling_summary:
total_tokens += len(self.rolling_summary) // 4
for msg in self.recent_messages:
total_tokens += len(msg['content']) // 4
return total_tokens / 200000 # Claude Sonnet context window结合多种技术以获得最佳结果。
python
from anthropic import Anthropic
from openai import OpenAI
import chromadb
class HybridMemorySystem:
"""
结合了:
- 滚动摘要(短期压缩)
- RAG检索(长期内存)
- Prompt缓存(成本优化)
- 渐进式压缩(自适应行为)
"""
def __init__(self, anthropic_client: Anthropic, openai_client: OpenAI):
self.anthropic = anthropic_client
self.openai = openai_client
# 近期消息(原文)
self.recent_messages = []
self.recent_window = 10
# 滚动摘要
self.rolling_summary = None
# 向量存储(长期)
self.chroma = chromadb.Client()
self.collection = self.chroma.create_collection(name="memory")
self.message_counter = 0
# 压缩阈值
self.thresholds = {
'light': 0.70, # 开始基础压缩
'medium': 0.85, # 深度摘要
'heavy': 0.95 # 紧急措施
}
def add_message(self, message: dict):
"""添加消息并进行智能压缩。"""
self.recent_messages.append(message)
# 检查压缩需求
usage_ratio = self._estimate_usage()
if usage_ratio >= self.thresholds['heavy']:
self._emergency_compress()
elif usage_ratio >= self.thresholds['medium']:
self._medium_compress()
elif usage_ratio >= self.thresholds['light']:
self._light_compress()
def _light_compress(self):
"""移除冗余内容,归档到向量存储。"""
if len(self.recent_messages) > self.recent_window * 1.5:
# 将最旧的消息归档到向量存储
to_archive = self.recent_messages[:5]
for msg in to_archive:
self._archive_to_vectorstore(msg)
self.recent_messages = self.recent_messages[5:]
def _medium_compress(self):
"""生成滚动摘要,深度归档。"""
if len(self.recent_messages) > self.recent_window:
# 摘要旧消息
to_summarize = self.recent_messages[:-self.recent_window]
summary_text = "\n\n".join([
f"{msg['role']}: {msg['content']}"
for msg in to_summarize
])
if self.rolling_summary:
summary_text = f"现有摘要:{self.rolling_summary}\n\n新内容:{summary_text}"
response = self.anthropic.messages.create(
model="claude-3-5-haiku-20241022",
max_tokens=400,
messages=[{
'role': 'user',
'content': f"更新摘要:\n{summary_text}"
}]
)
self.rolling_summary = response.content[0].text
# 归档所有已摘要的消息
for msg in to_summarize:
self._archive_to_vectorstore(msg)
self.recent_messages = self.recent_messages[-self.recent_window:]
def _emergency_compress(self):
"""针对接近限制的情况进行极端压缩。"""
# 仅保留最近5条消息
to_archive = self.recent_messages[:-5]
for msg in to_archive:
self._archive_to_vectorstore(msg)
self.recent_messages = self.recent_messages[-5:]
# 如果需要,进一步压缩摘要
if self.rolling_summary and len(self.rolling_summary) > 1000:
response = self.anthropic.messages.create(
model="claude-3-5-haiku-20241022",
max_tokens=200,
messages=[{
'role': 'user',
'content': f"创建超简洁摘要:\n{self.rolling_summary}"
}]
)
self.rolling_summary = response.content[0].text
def _archive_to_vectorstore(self, message: dict):
"""存储到向量数据库以进行检索。"""
embedding_response = self.openai.embeddings.create(
model="text-embedding-3-small",
input=message['content']
)
self.collection.add(
embeddings=[embedding_response.data[0].embedding],
documents=[message['content']],
metadatas=[{'role': message['role']}],
ids=[f"msg_{self.message_counter}"]
)
self.message_counter += 1
def get_context(self, current_query: str, max_tokens: int = 8000):
"""为当前查询构建最优上下文。"""
context = []
token_count = 0
# 1. 添加滚动摘要(如果存在)
if self.rolling_summary:
summary_msg = {
'role': 'system',
'content': [
{
'type': 'text',
'text': f"[对话摘要]\n{self.rolling_summary}",
'cache_control': {'type': 'ephemeral'} # 缓存
}
]
}
context.append(summary_msg)
token_count += len(self.rolling_summary) // 4
# 2. 检索相关历史上下文(RAG)
if token_count < max_tokens * 0.3:
query_embedding = self.openai.embeddings.create(
model="text-embedding-3-small",
input=current_query
)
results = self.collection.query(
query_embeddings=[query_embedding.data[0].embedding],
n_results=5
)
for i, doc in enumerate(results['documents'][0]):
if token_count + len(doc) // 4 > max_tokens * 0.3:
break
metadata = results['metadatas'][0][i]
context.append({
'role': metadata['role'],
'content': f"[检索内容] {doc}"
})
token_count += len(doc) // 4
# 3. 原文添加近期消息
for msg in self.recent_messages:
if token_count + len(msg['content']) // 4 > max_tokens * 0.8:
break
context.append(msg)
token_count += len(msg['content']) // 4
return context
def _estimate_usage(self):
"""估算当前上下文窗口使用率。"""
total_tokens = 0
if self.rolling_summary:
total_tokens += len(self.rolling_summary) // 4
for msg in self.recent_messages:
total_tokens += len(msg['content']) // 4
return total_tokens / 200000 # Claude Sonnet上下文窗口Usage
使用示例
anthropic_client = Anthropic(api_key="your-anthropic-key")
openai_client = OpenAI(api_key="your-openai-key")
memory = HybridMemorySystem(anthropic_client, openai_client)
anthropic_client = Anthropic(api_key="your-anthropic-key")
openai_client = OpenAI(api_key="your-openai-key")
memory = HybridMemorySystem(anthropic_client, openai_client)
Add messages over time
随时间添加消息
for i in range(1000):
memory.add_message({
'role': 'user' if i % 2 == 0 else 'assistant',
'content': f"Message {i} with some content..."
})
for i in range(1000):
memory.add_message({
'role': 'user' if i % 2 == 0 else 'assistant',
'content': f"Message {i} with some content..."
})
Retrieve optimized context
检索优化后的上下文
current_query = "What did we discuss about pricing?"
context = memory.get_context(current_query)
current_query = "我们讨论过哪些关于定价的内容?"
context = memory.get_context(current_query)
Use with Claude
与Claude一起使用
response = anthropic_client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=context + [{
'role': 'user',
'content': current_query
}]
)
---response = anthropic_client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=context + [{
'role': 'user',
'content': current_query
}]
)
---Performance Benchmarks
性能基准
Compression Efficiency
压缩效率
| Technique | Compression Ratio | Quality Loss | Latency | Cost Impact |
|---|---|---|---|---|
| Extractive | 2-3x | <1% | <10ms | None |
| Abstractive | 5-10x | 2-5% | 1-2s | +$0.001/turn |
| Hierarchical | 20x+ | 5-8% | 2-5s | +$0.003/turn |
| LLMLingua | 20x | 1.5% | 500ms | None |
| RAG | Variable | <1% | 100-300ms | +$0.0005/turn |
| Prompt Caching | N/A | 0% | 0ms | -90% |
| 技术 | 压缩比 | 质量损失 | 延迟 | 成本影响 |
|---|---|---|---|---|
| 抽取式摘要 | 2-3倍 | <1% | <10ms | 无 |
| 生成式摘要 | 5-10倍 | 2-5% | 1-2s | +$0.001/轮 |
| 分层摘要 | 20倍以上 | 5-8% | 2-5s | +$0.003/轮 |
| LLMLingua | 20倍 | 1.5% | 500ms | 无 |
| RAG | 可变 | <1% | 100-300ms | +$0.0005/轮 |
| Prompt Caching | N/A | 0% | 0ms | -90% |
Token Savings by Use Case
按场景划分的令牌节省
Customer Support (50-turn conversation):
- No compression: ~8,000 tokens/request
- Rolling summary: ~2,000 tokens/request (75% reduction)
- Hybrid (RAG + summary): ~1,500 tokens/request (81% reduction)
Code Assistant (100-turn session):
- No compression: ~25,000 tokens/request
- Hierarchical: ~5,000 tokens/request (80% reduction)
- Hybrid + caching: ~1,000 tokens/request effective (96% cost reduction)
Educational Tutor (multi-session):
- No compression: Would exceed context window
- RAG + summarization: ~3,000 tokens/request
- Infinite session length enabled
客户支持(50轮对话):
- 无压缩:~8000令牌/请求
- 滚动摘要:~2000令牌/请求(减少75%)
- 混合(RAG+摘要):~1500令牌/请求(减少81%)
代码助手(100轮会话):
- 无压缩:~25000令牌/请求
- 分层摘要:~5000令牌/请求(减少80%)
- 混合+缓存:实际~1000令牌/请求(成本减少96%)
教育辅导(多会话):
- 无压缩:会超出上下文窗口
- RAG+摘要:~3000令牌/请求
- 支持无限会话长度
Cost Analysis
成本分析
Example: Claude Sonnet pricing ($3 input, $15 output per 1M tokens)
1,000 conversations, 50 turns each:
-
No compression:
- Avg 8K tokens/request × 50K requests = 400M tokens
- Cost: $1,200
-
With rolling summarization:
- Avg 2K tokens/request × 50K requests = 100M tokens
- Summarization overhead: +10M tokens
- Cost: $330 (72% savings)
-
With hybrid system + caching:
- First turn: 2K tokens (no cache)
- Subsequent: 200 tokens effective (90% cache hit)
- Total: ~15M tokens effective
- Cost: $45 (96% savings)
示例:Claude Sonnet定价(每100万令牌输入3美元/输出15美元)
1000次对话,每次50轮:
-
无压缩:
- 平均8K令牌/请求 × 5万请求 = 4亿令牌
- 成本:1200美元
-
使用滚动摘要:
- 平均2K令牌/请求 × 5万请求 = 1亿令牌
- 摘要开销:+1000万令牌
- 成本:330美元(节省72%)
-
使用混合系统+缓存:
- 第一轮:2K令牌(无缓存)
- 后续轮次:实际200令牌(90%缓存命中)
- 总计:实际~1500万令牌
- 成本:45美元(节省96%)
Tool Recommendations
工具推荐
Memory Management Tools
内存管理工具
Mem0 (Recommended for Production)
Mem0(生产环境推荐)
Best for: Hybrid memory systems with minimal code
python
from mem0 import MemoryClient
client = MemoryClient(api_key="your-mem0-key")最佳场景: 混合内存系统,代码量极少
python
from mem0 import MemoryClient
client = MemoryClient(api_key="your-mem0-key")Automatically handles compression, summarization, RAG
自动处理压缩、摘要、RAG
memory = client.create_memory(
user_id="user123",
messages=[
{"role": "user", "content": "I'm working on a Python project"},
{"role": "assistant", "content": "Great! What kind of project?"}
]
)
memory = client.create_memory(
user_id="user123",
messages=[
{"role": "user", "content": "我正在做一个Python项目"},
{"role": "assistant", "content": "太棒了!是什么类型的项目?"}
]
)
Retrieve relevant context
检索相关上下文
context = client.get_memory(
user_id="user123",
query="What programming language am I using?"
)
**Features:**
- Automatic hierarchical summarization
- Built-in RAG retrieval
- Multi-user session management
- Analytics dashboard
**Pricing:** $0.40/1K memory operationscontext = client.get_memory(
user_id="user123",
query="我在使用什么编程语言?"
)
**特性:**
- 自动分层摘要
- 内置RAG检索
- 多用户会话管理
- 分析仪表板
**定价:** 每1000次内存操作0.40美元Zep
Zep
Best for: Low-latency production deployments**
python
from zep_python import ZepClient
client = ZepClient(api_key="your-zep-key")最佳场景: 低延迟生产部署**
python
from zep_python import ZepClient
client = ZepClient(api_key="your-zep-key")Add to session
添加到会话
client.memory.add_memory(
session_id="session123",
messages=[
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi there!"}
]
)
client.memory.add_memory(
session_id="session123",
messages=[
{"role": "user", "content": "你好"},
{"role": "assistant", "content": "你好!"}
]
)
Auto-summarized retrieval
自动摘要检索
memory = client.memory.get_memory(session_id="session123")
**Features:**
- <100ms retrieval latency
- Automatic fact extraction
- Entity recognition
- Session management
**Pricing:** Open-source (self-hosted) or $0.50/1K operations (cloud)memory = client.memory.get_memory(session_id="session123")
**特性:**
- <100ms检索延迟
- 自动事实提取
- 实体识别
- 会话管理
**定价:** 开源(自托管)或每1000次操作0.50美元(云服务)ChromaDB
ChromaDB
Best for: Self-hosted vector storage**
python
import chromadb
client = chromadb.Client()
collection = client.create_collection("conversations")最佳场景: 自托管向量存储**
python
import chromadb
client = chromadb.Client()
collection = client.create_collection("conversations")Store embeddings
存储嵌入
collection.add(
documents=["Message content"],
embeddings=[[0.1, 0.2, ...]],
ids=["msg1"]
)
collection.add(
documents=["Message content"],
embeddings=[[0.1, 0.2, ...]],
ids=["msg1"]
)
Retrieve
检索
results = collection.query(
query_embeddings=[[0.1, 0.2, ...]],
n_results=5
)
**Features:**
- Fully open-source
- Embedded or client-server
- Fast local development
**Pricing:** Free (self-hosted)results = collection.query(
query_embeddings=[[0.1, 0.2, ...]],
n_results=5
)
**特性:**
- 完全开源
- 嵌入式或客户端-服务器模式
- 快速本地开发
**定价:** 免费(自托管)LangChain
LangChain
Best for: Rapid prototyping and experimentation**
python
from langchain.memory import ConversationSummaryBufferMemory
from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-3-5-sonnet-20241022")
memory = ConversationSummaryBufferMemory(llm=llm, max_token_limit=2000)Features:
- Multiple memory types
- Framework integration
- Extensive documentation
Pricing: Free (uses your LLM API costs)
最佳场景: 快速原型开发和实验**
python
from langchain.memory import ConversationSummaryBufferMemory
from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-3-5-sonnet-20241022")
memory = ConversationSummaryBufferMemory(llm=llm, max_token_limit=2000)特性:
- 多种内存类型
- 框架集成
- 丰富的文档
定价: 免费(使用您的LLM API成本)
Compression Libraries
压缩库
LLMLingua
LLMLingua
Best for: Extreme compression with minimal quality loss**
python
from llmlingua import PromptCompressor
compressor = PromptCompressor()
compressed = compressor.compress_prompt(
context="Long conversation history...",
instruction="Current user query",
target_token=500
)最佳场景: 极端压缩,质量损失极小**
python
from llmlingua import PromptCompressor
compressor = PromptCompressor()
compressed = compressor.compress_prompt(
context="长对话历史...",
instruction="当前用户查询",
target_token=500
)Achieves 20x compression with 1.5% accuracy loss
实现20倍压缩,准确率损失1.5%
**Features:**
- 20x compression ratios
- <2% quality degradation
- Fast inference (<500ms)
**Pricing:** Free (open-source)
---
**特性:**
- 20倍压缩比
- <2%质量下降
- 快速推理(<500ms)
**定价:** 免费(开源)
---Use Cases and Patterns
用例与模式
Chatbot (Customer Support)
聊天机器人(客户支持)
Requirements:
- Multi-turn conversations (50-100 turns)
- Preserve customer context
- Fast response times
- Cost-efficient
Recommended approach:
- ConversationSummaryBufferMemory (LangChain)
- 70% threshold: Semantic deduplication
- 85% threshold: Rolling summarization
- Prompt caching for frequent patterns
Implementation:
python
from langchain.memory import ConversationSummaryBufferMemory
from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-3-5-haiku-20241022")
memory = ConversationSummaryBufferMemory(
llm=llm,
max_token_limit=2000,
return_messages=True
)需求:
- 多轮对话(50-100轮)
- 保留客户上下文
- 快速响应
- 成本高效
推荐方法:
- ConversationSummaryBufferMemory(LangChain)
- 70%阈值:语义去重
- 85%阈值:滚动摘要
- 对频繁模式使用Prompt缓存
实现:
python
from langchain.memory import ConversationSummaryBufferMemory
from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-3-5-haiku-20241022")
memory = ConversationSummaryBufferMemory(
llm=llm,
max_token_limit=2000,
return_messages=True
)Add customer conversation
添加客户对话
for turn in customer_conversation:
memory.save_context(
{"input": turn['customer_message']},
{"output": turn['agent_response']}
)
for turn in customer_conversation:
memory.save_context(
{"input": turn['customer_message']},
{"output": turn['agent_response']}
)
Retrieve compressed context
检索压缩后的上下文
context = memory.load_memory_variables({})
undefinedcontext = memory.load_memory_variables({})
undefinedCode Assistant
代码助手
Requirements:
- Long development sessions (100+ turns)
- Preserve technical details
- Handle large code blocks
- Track incremental changes
Recommended approach:
- Hierarchical summarization for overall context
- RAG retrieval for specific code references
- Delta compression for iterative edits
- Prompt caching for system prompts
Implementation:
python
from anthropic import Anthropic
client = Anthropic(api_key="your-api-key")
class CodeAssistantMemory:
def __init__(self):
self.hierarchy = HierarchicalMemory(client, chunk_size=15)
self.rag = RAGMemory(anthropic_client=client, openai_client=openai_client)
self.deltas = DeltaCompressor()
def add_interaction(self, code_change: dict):
# Store in hierarchy
self.hierarchy.add_message({
'role': 'user',
'content': code_change['description']
})
# Store in RAG for retrieval
self.rag.add_message(code_change)
# Store as delta if incremental
if code_change.get('is_incremental'):
self.deltas.add_message(code_change)
def get_context(self, current_query: str):
# Combine hierarchical summary + RAG retrieval
summary_context = self.hierarchy.get_context(max_tokens=2000)
rag_context = self.rag.retrieve_context(current_query, max_tokens=2000)
return summary_context + rag_context需求:
- 长期开发会话(100+轮)
- 保留技术细节
- 处理大代码块
- 跟踪增量变化
推荐方法:
- 分层摘要用于整体上下文
- RAG检索用于特定代码引用
- 增量压缩用于迭代编辑
- 对系统提示使用Prompt缓存
实现:
python
from anthropic import Anthropic
from openai import OpenAI
client = Anthropic(api_key="your-api-key")
openai_client = OpenAI(api_key="your-openai-key")
class CodeAssistantMemory:
def __init__(self):
self.hierarchy = HierarchicalMemory(client, chunk_size=15)
self.rag = RAGMemory(anthropic_client=client, openai_client=openai_client)
self.deltas = DeltaCompressor()
def add_interaction(self, code_change: dict):
# 存储到分层内存
self.hierarchy.add_message({
'role': 'user',
'content': code_change['description']
})
# 存储到RAG以进行检索
self.rag.add_message(code_change)
# 如果是增量变化则存储为增量
if code_change.get('is_incremental'):
self.deltas.add_message(code_change)
def get_context(self, current_query: str):
# 结合分层摘要+RAG检索
summary_context = self.hierarchy.get_context(max_tokens=2000)
rag_context = self.rag.retrieve_context(current_query, max_tokens=2000)
return summary_context + rag_contextEducational Tutor
教育辅导
Requirements:
- Multi-session tracking
- Student progress persistence
- Personalized context retrieval
- Long-term knowledge retention
Recommended approach:
- VectorStoreRetrieverMemory for multi-session
- Fact extraction for student knowledge
- Progressive compression across sessions
- Resumable conversations
Implementation:
python
from langchain.memory import VectorStoreRetrieverMemory
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
class TutorMemory:
def __init__(self, student_id: str):
self.student_id = student_id
# Vector store for all sessions
embeddings = OpenAIEmbeddings()
vectorstore = Chroma(
collection_name=f"student_{student_id}",
embedding_function=embeddings
)
self.memory = VectorStoreRetrieverMemory(
retriever=vectorstore.as_retriever(search_kwargs={"k": 10})
)
def add_lesson_content(self, lesson: dict):
"""Add lesson interaction to student memory."""
self.memory.save_context(
{"input": lesson['topic']},
{"output": lesson['explanation']}
)
def get_student_context(self, current_topic: str):
"""Retrieve relevant past lessons for current topic."""
return self.memory.load_memory_variables({
"prompt": current_topic
})需求:
- 多会话跟踪
- 学生进度持久化
- 个性化上下文检索
- 长期知识保留
推荐方法:
- VectorStoreRetrieverMemory用于多会话
- 事实提取用于学生知识
- 跨会话的渐进式压缩
- 可续期对话
实现:
python
from langchain.memory import VectorStoreRetrieverMemory
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
class TutorMemory:
def __init__(self, student_id: str):
self.student_id = student_id
# 所有会话的向量存储
embeddings = OpenAIEmbeddings()
vectorstore = Chroma(
collection_name=f"student_{student_id}",
embedding_function=embeddings
)
self.memory = VectorStoreRetrieverMemory(
retriever=vectorstore.as_retriever(search_kwargs={"k": 10})
)
def add_lesson_content(self, lesson: dict):
"""将课程交互添加到学生内存。"""
self.memory.save_context(
{"input": lesson['topic']},
{"output": lesson['explanation']}
)
def get_student_context(self, current_topic: str):
"""为当前主题检索相关的过往课程。"""
return self.memory.load_memory_variables({
"prompt": current_topic
})Best Practices
最佳实践
1. Choose the Right Technique for Your Use Case
1. 为您的用例选择合适的技术
- Short conversations (<20 turns): No compression needed
- Medium conversations (20-50 turns): ConversationSummaryBufferMemory
- Long conversations (50-100 turns): Hierarchical or rolling summarization
- Very long (100+ turns): Hybrid (RAG + summarization + caching)
- Multi-session: VectorStoreRetrieverMemory or Mem0
- 短对话(<20轮): 无需压缩
- 中等对话(20-50轮): ConversationSummaryBufferMemory
- 长对话(50-100轮): 分层或滚动摘要
- 极长对话(100+轮): 混合(RAG+摘要+缓存)
- 多会话: VectorStoreRetrieverMemory或Mem0
2. Implement Progressive Compression
2. 实现渐进式压缩
Don't compress aggressively from the start. Use thresholds:
- 0-70%: Store verbatim
- 70-85%: Light compression (deduplication)
- 85-95%: Medium compression (summarization)
- 95-100%: Aggressive compression (hierarchical)
不要从一开始就深度压缩。使用阈值:
- 0-70%:原文存储
- 70-85%:轻度压缩(去重)
- 85-95%:中度压缩(摘要)
- 95-100%:深度压缩(分层)
3. Combine Techniques
3. 结合多种技术
Single-technique approaches are suboptimal. Best production systems use:
- Rolling summarization (short-term)
- RAG retrieval (long-term)
- Prompt caching (cost optimization)
- Semantic deduplication (redundancy removal)
单一技术方法不是最优的。最佳生产系统使用:
- 滚动摘要(短期)
- RAG检索(长期)
- Prompt缓存(成本优化)
- 语义去重(冗余移除)
4. Monitor Quality Metrics
4. 监控质量指标
Track compression impact:
- Response relevance score
- Information retention rate
- User satisfaction metrics
- Token usage reduction
跟踪压缩的影响:
- 响应相关性分数
- 信息保留率
- 用户满意度指标
- 令牌使用减少量
5. Use Prompt Caching Strategically
5. 战略性使用Prompt Caching
Cache stable content:
- Conversation summaries
- System prompts
- Knowledge base context
- User profiles
Don't cache frequently changing content:
- Current user query
- Real-time data
- Session-specific state
缓存稳定内容:
- 对话摘要
- 系统提示
- 知识库上下文
- 用户配置文件
不要缓存频繁变化的内容:
- 当前用户查询
- 实时数据
- 会话特定状态
6. Implement Checkpointing
6. 实现检查点
Save compression state for:
- Recovery from failures
- Multi-session continuity
- Analytics and debugging
- A/B testing different strategies
保存压缩状态以用于:
- 故障恢复
- 多会话连续性
- 分析和调试
- 不同策略的A/B测试
7. Tune Compression Parameters
7. 调优压缩参数
Test and optimize:
- Summary token limits
- Compression thresholds
- Retrieval result counts
- Cache TTLs
- Chunk sizes for hierarchical
测试并优化:
- 摘要令牌限制
- 压缩阈值
- 检索结果数量
- 缓存TTL
- 分层的块大小
8. Handle Edge Cases
8. 处理边缘情况
Plan for:
- Very long messages (split or compress individually)
- Code blocks (preserve formatting)
- Multi-language content
- Rapidly changing context
为以下情况做计划:
- 极长消息(拆分或单独压缩)
- 代码块(保留格式)
- 多语言内容
- 快速变化的上下文
Troubleshooting
故障排除
Problem: Summary loses critical information
问题:摘要丢失关键信息
Solutions:
- Lower compression ratio (less aggressive)
- Implement importance scoring to preserve key messages
- Use extractive summarization for critical sections
- Increase summary token budget
解决方案:
- 降低压缩比(不那么激进)
- 实现重要性评分以保留关键消息
- 对关键部分使用抽取式摘要
- 增加摘要令牌预算
Problem: Retrieval returns irrelevant context
问题:检索返回不相关上下文
Solutions:
- Improve embedding model quality
- Add metadata filtering (timestamps, topics)
- Adjust similarity threshold
- Use hybrid search (semantic + keyword)
解决方案:
- 提升嵌入模型质量
- 添加元数据过滤(时间戳、主题)
- 调整相似性阈值
- 使用混合搜索(语义+关键词)
Problem: High latency from compression
问题:压缩导致高延迟
Solutions:
- Compress asynchronously (background tasks)
- Use faster models for summarization (Haiku instead of Sonnet)
- Cache summaries more aggressively
- Reduce compression frequency
解决方案:
- 异步压缩(后台任务)
- 使用更快的模型进行摘要(Haiku而非Sonnet)
- 更积极地缓存摘要
- 降低压缩频率
Problem: Conversations still exceeding context window
问题:对话仍超出上下文窗口
Solutions:
- Implement hierarchical compression
- Archive to vector database more aggressively
- Use more aggressive compression ratios
- Consider switching to model with larger context window
解决方案:
- 实现分层压缩
- 更积极地归档到向量数据库
- 使用更激进的压缩比
- 考虑切换到更大上下文窗口的模型
Problem: High costs despite compression
问题:尽管压缩但成本仍很高
Solutions:
- Implement prompt caching
- Use cheaper models for summarization (Haiku)
- Batch summarization operations
- Reduce summarization frequency
解决方案:
- 实现Prompt缓存
- 使用更便宜的模型进行摘要(Haiku)
- 批量摘要操作
- 降低摘要频率
Problem: Lost conversation continuity
问题:对话连续性丢失
Solutions:
- Increase recent message window
- Include summary in every request
- Use more descriptive summaries
- Implement session resumption with context injection
解决方案:
- 增加近期消息窗口
- 在每个请求中包含摘要
- 使用更具描述性的摘要
- 实现带上下文注入的会话续期
Advanced Topics
高级主题
Streaming Compression
流式压缩
Compress in real-time as conversation progresses:
python
async def streaming_compress(messages: list):
"""Compress while streaming responses."""
compressor = ProgressiveCompressor()
async for message in conversation_stream:
compressor.add_message(message)
# Compression happens asynchronously
if compressor.should_compress():
asyncio.create_task(compressor.compress_async())
return compressor.get_context()随着对话推进实时压缩:
python
import asyncio
async def streaming_compress(conversation_stream):
"""在流式响应时进行压缩。"""
compressor = ProgressiveCompressor()
async for message in conversation_stream:
compressor.add_message(message)
# 异步进行压缩
if compressor.should_compress():
asyncio.create_task(compressor.compress_async())
return compressor.get_context()Multi-User Session Management
多用户会话管理
Handle concurrent conversations with shared context:
python
class MultiUserMemory:
def __init__(self):
self.user_sessions = {}
def get_or_create_session(self, user_id: str):
if user_id not in self.user_sessions:
self.user_sessions[user_id] = HybridMemorySystem(...)
return self.user_sessions[user_id]
def cleanup_inactive_sessions(self, timeout: int = 3600):
"""Remove sessions inactive for > timeout seconds."""
current_time = time.time()
inactive = [
user_id for user_id, session in self.user_sessions.items()
if current_time - session.last_activity > timeout
]
for user_id in inactive:
self._archive_session(user_id)
del self.user_sessions[user_id]处理带有共享上下文的并发对话:
python
import time
class MultiUserMemory:
def __init__(self):
self.user_sessions = {}
def get_or_create_session(self, user_id: str):
if user_id not in self.user_sessions:
self.user_sessions[user_id] = HybridMemorySystem(...)
return self.user_sessions[user_id]
def cleanup_inactive_sessions(self, timeout: int = 3600):
"""移除超过超时时间未活动的会话。"""
current_time = time.time()
inactive = [
user_id for user_id, session in self.user_sessions.items()
if current_time - session.last_activity > timeout
]
for user_id in inactive:
self._archive_session(user_id)
del self.user_sessions[user_id]Custom Importance Scoring
自定义重要性评分
Train ML models to score message importance:
python
from transformers import pipeline
class MLImportanceScorer:
def __init__(self):
# Use pre-trained classifier or fine-tune on your data
self.classifier = pipeline(
"text-classification",
model="your-importance-model"
)
def score(self, message: dict) -> float:
"""Score message importance (0-1)."""
result = self.classifier(message['content'])
return result[0]['score']训练ML模型为消息重要性评分:
python
from transformers import pipeline
class MLImportanceScorer:
def __init__(self):
# 使用预训练分类器或在您的数据上微调
self.classifier = pipeline(
"text-classification",
model="your-importance-model"
)
def score(self, message: dict) -> float:
"""为消息重要性评分(0-1)。"""
result = self.classifier(message['content'])
return result[0]['score']Context Window Utilization Optimization
上下文窗口利用率优化
Maximize information density within token budget:
python
def optimize_context_allocation(
summary_tokens: int,
recent_tokens: int,
retrieval_tokens: int,
max_tokens: int
):
"""
Optimal allocation (empirically tested):
- 20% summary
- 50% recent messages
- 30% retrieved context
"""
return {
'summary': int(max_tokens * 0.20),
'recent': int(max_tokens * 0.50),
'retrieval': int(max_tokens * 0.30)
}在令牌预算内最大化信息密度:
python
def optimize_context_allocation(
summary_tokens: int,
recent_tokens: int,
retrieval_tokens: int,
max_tokens: int
):
"""
最优分配(经验证):
- 20% 摘要
- 50% 近期消息
- 30% 检索上下文
"""
return {
'summary': int(max_tokens * 0.20),
'recent': int(max_tokens * 0.50),
'retrieval': int(max_tokens * 0.30)
}Future Directions
未来方向
Emerging Techniques (2025+)
新兴技术(2025+)
1. Infinite Attention Mechanisms
- Models with >10M token context windows (Gemini 1.5, future Claude)
- Reduces need for compression but doesn't eliminate cost concerns
2. Learned Compression Models
- Neural networks trained to compress conversation optimally
- Maintain semantic meaning while minimizing tokens
- Examples: LLMLingua v2, PromptCompressor
3. Multimodal Session Compression
- Compress conversations with images, audio, video
- Maintain cross-modal context relationships
4. Federated Memory Systems
- Distributed compression across multiple memory stores
- Privacy-preserving compression for sensitive conversations
5. Adaptive Compression Strategies
- RL-based systems that learn optimal compression per user/domain
- Dynamic threshold adjustment based on conversation importance
1. 无限注意力机制
- 上下文窗口>1000万令牌的模型(Gemini 1.5,未来Claude)
- 减少对压缩的需求,但无法消除成本顾虑
2. 学习压缩模型
- 经过训练可最优压缩对话的神经网络
- 在最小化令牌的同时保持语义含义
- 示例:LLMLingua v2, PromptCompressor
3. 多模态会话压缩
- 压缩包含图像、音频、视频的对话
- 保持跨模态上下文关系
4. 联邦内存系统
- 跨多个内存存储的分布式压缩
- 针对敏感对话的隐私保护压缩
5. 自适应压缩策略
- 基于强化学习的系统,为每个用户/领域学习最优压缩
- 根据对话重要性动态调整阈值
References
参考资料
Academic Papers
学术论文
- "Recursively Summarizing Enables Long-Term Dialogue Memory" (arXiv:2308.15022)
- "LLMLingua: Compressing Prompts for Accelerated Inference" (arXiv:2310.05736)
- "Lost in the Middle: How Language Models Use Long Contexts" (arXiv:2307.03172)
- "Recursively Summarizing Enables Long-Term Dialogue Memory in Large Language Models" (arXiv:2308.15022)
- "LLMLingua: Compressing Prompts for Accelerated Inference" (arXiv:2310.05736)
- "Lost in the Middle: How Language Models Use Long Contexts" (arXiv:2307.03172)