session-compression

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

AI Session Compression Techniques

AI会话压缩技术

Summary

概述

Compress long AI conversations to fit context windows while preserving critical information.
Session compression enables production AI applications to manage multi-turn conversations efficiently by reducing token usage by 70-95% through summarization, embedding-based retrieval, and intelligent context management. Achieve 3-20x compression ratios with minimal performance degradation.
Key Benefits:
  • Cost Reduction: 80-90% token cost savings through hierarchical memory
  • Performance: 2x faster responses with compressed context
  • Scalability: Handle conversations exceeding 1M tokens
  • Quality: Preserve critical information with <2% accuracy loss
压缩长AI对话以适配上下文窗口,同时保留关键信息。
会话压缩通过摘要、基于嵌入的检索和智能上下文管理,将令牌使用量减少70-95%,使生产级AI应用能够高效管理多轮对话。在性能损失极小的情况下,可实现3-20倍的压缩比。
核心优势:
  • 成本降低: 通过分层内存实现80-90%的令牌成本节省
  • 性能提升: 压缩上下文使响应速度提升2倍
  • 可扩展性: 处理超过100万令牌的对话
  • 质量保障: 保留关键信息,准确率损失<2%

When to Use

适用场景

Use session compression when:
  • Multi-turn conversations approach context window limits (>50% capacity)
  • Long-running chat sessions (customer support, tutoring, code assistants)
  • Token costs become significant (high-volume applications)
  • Response latency increases due to large context
  • Managing conversation history across multiple sessions
Don't use when:
  • Short conversations (<10 turns) fitting easily in context
  • Every detail must be preserved verbatim (legal, compliance)
  • Single-turn or stateless interactions
  • Context window usage is <30%
Ideal scenarios:
  • Chatbots with 50+ turn conversations
  • AI code assistants tracking long development sessions
  • Customer support with multi-session ticket history
  • Educational tutors with student progress tracking
  • Multi-day collaborative AI workflows
建议使用会话压缩的场景:
  • 多轮对话接近上下文窗口限制(>50%容量)
  • 长期运行的聊天会话(客户支持、辅导、代码助手)
  • 令牌成本显著增加(高流量应用)
  • 因上下文过大导致响应延迟增加
  • 跨多个会话管理对话历史
不建议使用的场景:
  • 短对话(<10轮),可轻松放入上下文窗口
  • 必须逐字保留每个细节的场景(法律、合规)
  • 单轮或无状态交互
  • 上下文窗口使用率<30%
理想场景:
  • 50轮以上对话的聊天机器人
  • 跟踪长期开发会话的AI代码助手
  • 包含多会话工单历史的客户支持
  • 跟踪学生进度的教育辅导AI
  • 多日协作AI工作流

Quick Start

快速开始

Basic Setup with LangChain

使用LangChain的基础配置

python
from langchain.memory import ConversationSummaryBufferMemory
from langchain_anthropic import ChatAnthropic
from anthropic import Anthropic
python
from langchain.memory import ConversationSummaryBufferMemory
from langchain_anthropic import ChatAnthropic
from anthropic import Anthropic

Initialize Claude client

初始化Claude客户端

llm = ChatAnthropic( model="claude-3-5-sonnet-20241022", api_key="your-api-key" )
llm = ChatAnthropic( model="claude-3-5-sonnet-20241022", api_key="your-api-key" )

Setup memory with automatic summarization

设置自动摘要内存

memory = ConversationSummaryBufferMemory( llm=llm, max_token_limit=2000, # Summarize when exceeding this return_messages=True )
memory = ConversationSummaryBufferMemory( llm=llm, max_token_limit=2000, # 超过此限制时自动摘要 return_messages=True )

Add conversation turns

添加对话轮次

memory.save_context( {"input": "What's session compression?"}, {"output": "Session compression reduces conversation token usage..."} )
memory.save_context( {"input": "什么是会话压缩?"}, {"output": "会话压缩可减少对话令牌使用量..."} )

Retrieve compressed context

检索压缩后的上下文

context = memory.load_memory_variables({})
undefined
context = memory.load_memory_variables({})
undefined

Progressive Compression Pattern

渐进式压缩模式

python
from anthropic import Anthropic

client = Anthropic(api_key="your-api-key")

class ProgressiveCompressor:
    def __init__(self, thresholds=[0.70, 0.85, 0.95]):
        self.thresholds = thresholds
        self.messages = []
        self.max_tokens = 200000  # Claude context window

    def add_message(self, role: str, content: str):
        self.messages.append({"role": role, "content": content})

        # Check if compression needed
        current_usage = self._estimate_tokens()
        usage_ratio = current_usage / self.max_tokens

        if usage_ratio >= self.thresholds[0]:
            self._compress(level=self._get_compression_level(usage_ratio))

    def _estimate_tokens(self):
        return sum(len(m["content"]) // 4 for m in self.messages)

    def _get_compression_level(self, ratio):
        for i, threshold in enumerate(self.thresholds):
            if ratio < threshold:
                return i
        return len(self.thresholds)

    def _compress(self, level: int):
        """Apply compression based on severity level."""
        if level == 1:  # 70% threshold: Light compression
            self._remove_redundant_messages()
        elif level == 2:  # 85% threshold: Medium compression
            self._summarize_old_messages(keep_recent=10)
        else:  # 95% threshold: Aggressive compression
            self._summarize_old_messages(keep_recent=5)

    def _remove_redundant_messages(self):
        """Remove duplicate or low-value messages."""
        # Implementation: Use semantic deduplication
        pass

    def _summarize_old_messages(self, keep_recent: int):
        """Summarize older messages, keep recent ones verbatim."""
        if len(self.messages) <= keep_recent:
            return

        # Messages to summarize
        to_summarize = self.messages[:-keep_recent]
        recent = self.messages[-keep_recent:]

        # Generate summary
        conversation_text = "\n\n".join([
            f"{m['role'].upper()}: {m['content']}"
            for m in to_summarize
        ])

        response = client.messages.create(
            model="claude-3-5-haiku-20241022",
            max_tokens=500,
            messages=[{
                "role": "user",
                "content": f"Summarize this conversation:\n\n{conversation_text}"
            }]
        )

        # Replace old messages with summary
        summary = {
            "role": "system",
            "content": f"[Summary]\n{response.content[0].text}"
        }
        self.messages = [summary] + recent
python
from anthropic import Anthropic

client = Anthropic(api_key="your-api-key")

class ProgressiveCompressor:
    def __init__(self, thresholds=[0.70, 0.85, 0.95]):
        self.thresholds = thresholds
        self.messages = []
        self.max_tokens = 200000  # Claude上下文窗口

    def add_message(self, role: str, content: str):
        self.messages.append({"role": role, "content": content})

        # 检查是否需要压缩
        current_usage = self._estimate_tokens()
        usage_ratio = current_usage / self.max_tokens

        if usage_ratio >= self.thresholds[0]:
            self._compress(level=self._get_compression_level(usage_ratio))

    def _estimate_tokens(self):
        return sum(len(m["content"]) // 4 for m in self.messages)

    def _get_compression_level(self, ratio):
        for i, threshold in enumerate(self.thresholds):
            if ratio < threshold:
                return i
        return len(self.thresholds)

    def _compress(self, level: int):
        """根据严重级别应用压缩。"""
        if level == 1:  # 70%阈值:轻度压缩
            self._remove_redundant_messages()
        elif level == 2:  # 85%阈值:中度压缩
            self._summarize_old_messages(keep_recent=10)
        else:  # 95%阈值:深度压缩
            self._summarize_old_messages(keep_recent=5)

    def _remove_redundant_messages(self):
        """移除重复或低价值消息。"""
        # 实现:使用语义去重
        pass

    def _summarize_old_messages(self, keep_recent: int):
        """摘要旧消息,保留最近的消息原文。"""
        if len(self.messages) <= keep_recent:
            return

        # 待摘要的消息
        to_summarize = self.messages[:-keep_recent]
        recent = self.messages[-keep_recent:]

        # 生成摘要
        conversation_text = "\n\n".join([
            f"{m['role'].upper()}: {m['content']}"
            for m in to_summarize
        ])

        response = client.messages.create(
            model="claude-3-5-haiku-20241022",
            max_tokens=500,
            messages=[{
                "role": "user",
                "content": f"摘要这段对话:\n\n{conversation_text}"
            }]
        )

        # 用摘要替换旧消息
        summary = {
            "role": "system",
            "content": f"[摘要]\n{response.content[0].text}"
        }
        self.messages = [summary] + recent

Usage

使用示例

compressor = ProgressiveCompressor()
for i in range(100): compressor.add_message("user", f"Message {i}") compressor.add_message("assistant", f"Response {i}")
undefined
compressor = ProgressiveCompressor()
for i in range(100): compressor.add_message("user", f"Message {i}") compressor.add_message("assistant", f"Response {i}")
undefined

Using Anthropic Prompt Caching (90% Cost Reduction)

使用Anthropic Prompt Caching(降低90%成本)

python
from anthropic import Anthropic

client = Anthropic(api_key="your-api-key")
python
from anthropic import Anthropic

client = Anthropic(api_key="your-api-key")

Build context with cache control

构建带缓存控制的上下文

messages = [ { "role": "user", "content": [ { "type": "text", "text": "Long conversation context here...", "cache_control": {"type": "ephemeral"} # Cache this } ] }, { "role": "assistant", "content": "Previous response..." }, { "role": "user", "content": "New question" # Not cached, changes frequently } ]
response = client.messages.create( model="claude-3-5-sonnet-20241022", max_tokens=1024, messages=messages )
messages = [ { "role": "user", "content": [ { "type": "text", "text": "长对话上下文内容...", "cache_control": {"type": "ephemeral"} # 缓存此内容 } ] }, { "role": "assistant", "content": "之前的响应..." }, { "role": "user", "content": "新问题" # 不缓存,频繁变化 } ]
response = client.messages.create( model="claude-3-5-sonnet-20241022", max_tokens=1024, messages=messages )

Cache hit reduces costs by 90% for cached content

缓存命中可使缓存内容的成本降低90%


---

---

Core Concepts

核心概念

Context Windows and Token Limits

上下文窗口与令牌限制

Context window: Maximum tokens an LLM can process in a single request (input + output).
Current limits (2025):
  • Claude 3.5 Sonnet: 200K tokens (~150K words, ~600 pages)
  • GPT-4 Turbo: 128K tokens (~96K words, ~384 pages)
  • Gemini 1.5 Pro: 2M tokens (~1.5M words, ~6000 pages)
Token estimation:
  • English: ~4 characters per token
  • Code: ~3 characters per token
  • Rule of thumb: 1 token ≈ 0.75 words
Why compression matters:
  • Cost: Claude Sonnet costs $3/$15 per 1M input/output tokens
  • Latency: Larger contexts increase processing time
  • Quality: Excessive context can dilute attention on relevant information
上下文窗口: LLM在单个请求中可处理的最大令牌数(输入+输出)。
当前限制(2025年):
  • Claude 3.5 Sonnet:20万令牌(约15万字,600页)
  • GPT-4 Turbo:12.8万令牌(约9.6万字,384页)
  • Gemini 1.5 Pro:200万令牌(约150万字,6000页)
令牌估算:
  • 英文:约4个字符/令牌
  • 代码:约3个字符/令牌
  • 经验法则:1令牌≈0.75个单词
压缩的重要性:
  • 成本: Claude Sonnet的输入/输出成本为每100万令牌3/15美元
  • 延迟: 更大的上下文会增加处理时间
  • 质量: 过多的上下文会分散对相关信息的注意力

Compression Ratios

压缩比

Compression ratio = Original tokens / Compressed tokens
Industry benchmarks:
  • Extractive summarization: 2-3x
  • Abstractive summarization: 5-10x
  • Hierarchical summarization: 20x+
  • LLMLingua (prompt compression): 20x with 1.5% accuracy loss
  • KVzip (KV cache compression): 3-4x with 2x speed improvement
Target ratios by use case:
  • Customer support: 5-7x (preserve details)
  • General chat: 8-12x (balance quality/efficiency)
  • Code assistants: 3-5x (preserve technical accuracy)
  • Long documents: 15-20x (extract key insights)
压缩比 = 原始令牌数 / 压缩后令牌数
行业基准:
  • 抽取式摘要:2-3倍
  • 生成式摘要:5-10倍
  • 分层摘要:20倍以上
  • LLMLingua(提示压缩):20倍压缩,准确率损失1.5%
  • KVzip(KV缓存压缩):3-4倍压缩,速度提升2倍
按场景划分的目标压缩比:
  • 客户支持:5-7倍(保留细节)
  • 通用聊天:8-12倍(平衡质量与效率)
  • 代码助手:3-5倍(保留技术准确性)
  • 长文档:15-20倍(提取关键见解)

Progressive Compression Thresholds

渐进式压缩阈值

Industry standard pattern:
Context Usage    Action                     Technique
─────────────────────────────────────────────────────────
0-70%           No compression             Store verbatim
70-85%          Light compression          Remove redundancy
85-95%          Medium compression         Summarize old messages
95-100%         Aggressive compression     Hierarchical + RAG
Implementation guidelines:
  • 70% threshold: Remove duplicate/redundant messages, semantic deduplication
  • 85% threshold: Summarize messages older than 20 turns, keep recent 10-15
  • 95% threshold: Multi-level hierarchical summarization + vector store archival
  • Emergency (100%): Drop least important messages, aggressive summarization

行业标准模式:
上下文使用率    操作                     技术
─────────────────────────────────────────────────────────
0-70%           不压缩                 原文存储
70-85%          轻度压缩               移除冗余内容
85-95%          中度压缩               摘要旧消息
95-100%         深度压缩               分层+RAG
实施指南:
  • 70%阈值: 移除重复/冗余消息,语义去重
  • 85%阈值: 摘要20轮以上的旧消息,保留最近10-15轮
  • 95%阈值: 多级分层摘要+向量存储归档
  • 紧急情况(100%): 丢弃最不重要的消息,深度摘要

Compression Techniques

压缩技术

1. Summarization Techniques

1. 摘要技术

1.1 Extractive Summarization

1.1 抽取式摘要

Selects key sentences/phrases without modification.
Pros: No hallucination, fast, deterministic Cons: Limited compression (2-3x), may feel disjointed Best for: Legal/compliance, short-term compression
python
from sklearn.feature_extraction.text import TfidfVectorizer
import numpy as np

def extractive_compress(messages: list, compression_ratio: float = 0.3):
    """Extract most important messages using TF-IDF scoring."""
    texts = [msg['content'] for msg in messages]

    # Calculate TF-IDF scores
    vectorizer = TfidfVectorizer()
    tfidf_matrix = vectorizer.fit_transform(texts)
    scores = np.array(tfidf_matrix.sum(axis=1)).flatten()

    # Select top messages
    n_keep = max(1, int(len(messages) * compression_ratio))
    top_indices = sorted(np.argsort(scores)[-n_keep:])

    return [messages[i] for i in top_indices]
选择关键句子/短语,不做修改。
优点: 无幻觉,速度快,确定性强 缺点: 压缩有限(2-3倍),可能显得不连贯 最佳场景: 法律/合规领域,短期压缩
python
from sklearn.feature_extraction.text import TfidfVectorizer
import numpy as np

def extractive_compress(messages: list, compression_ratio: float = 0.3):
    """使用TF-IDF评分提取最重要的消息。"""
    texts = [msg['content'] for msg in messages]

    # 计算TF-IDF分数
    vectorizer = TfidfVectorizer()
    tfidf_matrix = vectorizer.fit_transform(texts)
    scores = np.array(tfidf_matrix.sum(axis=1)).flatten()

    # 选择顶级消息
    n_keep = max(1, int(len(messages) * compression_ratio))
    top_indices = sorted(np.argsort(scores)[-n_keep:])

    return [messages[i] for i in top_indices]

1.2 Abstractive Summarization

1.2 生成式摘要

Uses LLMs to semantically condense conversation history.
Pros: Higher compression (5-10x), coherent, synthesizes information Cons: Risk of hallucination, higher cost, less deterministic Best for: General chat, customer support, multi-session continuity
python
from anthropic import Anthropic

def abstractive_compress(messages: list, client: Anthropic):
    """Generate semantic summary using Claude."""
    conversation_text = "\n\n".join([
        f"{msg['role'].upper()}: {msg['content']}"
        for msg in messages
    ])

    response = client.messages.create(
        model="claude-3-5-sonnet-20241022",
        max_tokens=500,
        messages=[{
            "role": "user",
            "content": f"""Summarize this conversation, preserving:
1. Key decisions made
2. Important context and facts
3. Unresolved questions
4. Action items

Conversation:
{conversation_text}

Summary (aim for 1/5 the original length):"""
        }]
    )

    return {
        "role": "assistant",
        "content": f"[Summary]\n{response.content[0].text}"
    }
使用LLM对对话历史进行语义浓缩。
优点: 压缩率更高(5-10倍),内容连贯,可整合信息 缺点: 存在幻觉风险,成本更高,确定性较弱 最佳场景: 通用聊天,客户支持,多会话连续性
python
from anthropic import Anthropic

def abstractive_compress(messages: list, client: Anthropic):
    """使用Claude生成语义摘要。"""
    conversation_text = "\n\n".join([
        f"{msg['role'].upper()}: {msg['content']}"
        for msg in messages
    ])

    response = client.messages.create(
        model="claude-3-5-sonnet-20241022",
        max_tokens=500,
        messages=[{
            "role": "user",
            "content": f"""摘要这段对话,保留以下内容:
1. 做出的关键决策
2. 重要上下文和事实
3. 未解决的问题
4. 行动项

对话内容:
{conversation_text}

摘要(目标长度为原文的1/5):"""
        }]
    )

    return {
        "role": "assistant",
        "content": f"[摘要]\n{response.content[0].text}"
    }

1.3 Hierarchical Summarization (Multi-Level)

1.3 分层摘要(多级)

Creates summaries of summaries in a tree structure.
Pros: Extreme compression (20x+), handles 1M+ token conversations Cons: Complex implementation, multiple LLM calls, information loss accumulates Best for: Long-running conversations, multi-session applications
Architecture:
Level 0 (Raw):    [Msg1][Msg2][Msg3][Msg4][Msg5][Msg6][Msg7][Msg8]
Level 1 (Chunk):  [Summary1-2]  [Summary3-4]  [Summary5-6]  [Summary7-8]
Level 2 (Group):  [Summary1-4]              [Summary5-8]
Level 3 (Session): [Overall Session Summary]
python
from anthropic import Anthropic
from typing import List, Dict

class HierarchicalMemory:
    def __init__(self, client: Anthropic, chunk_size: int = 10):
        self.client = client
        self.chunk_size = chunk_size
        self.levels: List[List[Dict]] = [[]]  # Level 0 = raw messages

    def add_message(self, message: Dict):
        """Add message and trigger summarization if needed."""
        self.levels[0].append(message)

        if len(self.levels[0]) >= self.chunk_size * 2:
            self._summarize_level(0)

    def _summarize_level(self, level: int):
        """Summarize a level into the next higher level."""
        messages = self.levels[level]

        # Ensure next level exists
        while len(self.levels) <= level + 1:
            self.levels.append([])

        # Summarize first chunk
        chunk = messages[:self.chunk_size]
        summary = self._generate_summary(chunk, level)

        # Move to next level
        self.levels[level + 1].append(summary)
        self.levels[level] = messages[self.chunk_size:]

        # Recursively check if next level needs summarization
        if len(self.levels[level + 1]) >= self.chunk_size * 2:
            self._summarize_level(level + 1)

    def _generate_summary(self, messages: List[Dict], level: int) -> Dict:
        """Generate summary for a chunk."""
        conversation_text = "\n\n".join([
            f"{msg['role'].upper()}: {msg['content']}"
            for msg in messages
        ])

        response = self.client.messages.create(
            model="claude-3-5-haiku-20241022",
            max_tokens=300,
            messages=[{
                "role": "user",
                "content": f"Summarize this Level {level} conversation chunk:\n\n{conversation_text}"
            }]
        )

        return {
            "role": "system",
            "content": f"[L{level+1} Summary] {response.content[0].text}",
            "level": level + 1
        }

    def get_context(self, max_tokens: int = 4000) -> List[Dict]:
        """Retrieve context within token budget."""
        context = []
        token_count = 0

        # Prioritize recent raw messages
        for msg in reversed(self.levels[0]):
            msg_tokens = len(msg['content']) // 4
            if token_count + msg_tokens > max_tokens * 0.6:
                break
            context.insert(0, msg)
            token_count += msg_tokens

        # Add summaries from higher levels
        for level in range(1, len(self.levels)):
            for summary in self.levels[level]:
                summary_tokens = len(summary['content']) // 4
                if token_count + summary_tokens > max_tokens:
                    break
                context.insert(0, summary)
                token_count += summary_tokens

        return context
Academic reference: "Recursively Summarizing Enables Long-Term Dialogue Memory in Large Language Models" (arXiv:2308.15022)
以树状结构创建摘要的摘要。
优点: 极致压缩(20倍以上),可处理100万+令牌的对话 缺点: 实现复杂,需要多次LLM调用,信息损失会累积 最佳场景: 长期运行的对话,多会话应用
架构:
Level 0(原始):    [Msg1][Msg2][Msg3][Msg4][Msg5][Msg6][Msg7][Msg8]
Level 1(块):  [Summary1-2]  [Summary3-4]  [Summary5-6]  [Summary7-8]
Level 2(组):  [Summary1-4]              [Summary5-8]
Level 3(会话): [Overall Session Summary]
python
from anthropic import Anthropic
from typing import List, Dict

class HierarchicalMemory:
    def __init__(self, client: Anthropic, chunk_size: int = 10):
        self.client = client
        self.chunk_size = chunk_size
        self.levels: List[List[Dict]] = [[]]  # Level 0 = 原始消息

    def add_message(self, message: Dict):
        """添加消息并在需要时触发摘要。"""
        self.levels[0].append(message)

        if len(self.levels[0]) >= self.chunk_size * 2:
            self._summarize_level(0)

    def _summarize_level(self, level: int):
        """将一个级别的内容摘要到更高一级。"""
        messages = self.levels[level]

        # 确保下一级存在
        while len(self.levels) <= level + 1:
            self.levels.append([])

        # 摘要第一个块
        chunk = messages[:self.chunk_size]
        summary = self._generate_summary(chunk, level)

        # 移动到下一级
        self.levels[level + 1].append(summary)
        self.levels[level] = messages[self.chunk_size:]

        # 递归检查下一级是否需要摘要
        if len(self.levels[level + 1]) >= self.chunk_size * 2:
            self._summarize_level(level + 1)

    def _generate_summary(self, messages: List[Dict], level: int) -> Dict:
        """为一个块生成摘要。"""
        conversation_text = "\n\n".join([
            f"{msg['role'].upper()}: {msg['content']}"
            for msg in messages
        ])

        response = self.client.messages.create(
            model="claude-3-5-haiku-20241022",
            max_tokens=300,
            messages=[{
                "role": "user",
                "content": f"摘要这段Level {level}对话块:\n\n{conversation_text}"
            }]
        )

        return {
            "role": "system",
            "content": f"[L{level+1} 摘要] {response.content[0].text}",
            "level": level + 1
        }

    def get_context(self, max_tokens: int = 4000) -> List[Dict]:
        """在令牌预算内检索上下文。"""
        context = []
        token_count = 0

        # 优先保留最近的原始消息
        for msg in reversed(self.levels[0]):
            msg_tokens = len(msg['content']) // 4
            if token_count + msg_tokens > max_tokens * 0.6:
                break
            context.insert(0, msg)
            token_count += msg_tokens

        # 从更高层级添加摘要
        for level in range(1, len(self.levels)):
            for summary in self.levels[level]:
                summary_tokens = len(summary['content']) // 4
                if token_count + summary_tokens > max_tokens:
                    break
                context.insert(0, summary)
                token_count += summary_tokens

        return context
学术参考: "Recursively Summarizing Enables Long-Term Dialogue Memory in Large Language Models" (arXiv:2308.15022)

1.4 Rolling Summarization (Continuous)

1.4 滚动摘要(持续)

Continuously compresses conversation with sliding window.
Pros: Low latency, predictable token usage, simple Cons: Early details over-compressed, no information recovery Best for: Real-time chat, streaming conversations
python
from anthropic import Anthropic

class RollingMemory:
    def __init__(self, client: Anthropic, window_size: int = 10, compress_threshold: int = 15):
        self.client = client
        self.window_size = window_size
        self.compress_threshold = compress_threshold
        self.rolling_summary = None
        self.recent_messages = []

    def add_message(self, message: dict):
        self.recent_messages.append(message)

        if len(self.recent_messages) >= self.compress_threshold:
            self._compress()

    def _compress(self):
        """Compress older messages into rolling summary."""
        messages_to_compress = self.recent_messages[:-self.window_size]

        parts = []
        if self.rolling_summary:
            parts.append(f"Existing summary:\n{self.rolling_summary}")

        parts.append("\nNew messages:\n" + "\n\n".join([
            f"{msg['role']}: {msg['content']}"
            for msg in messages_to_compress
        ]))

        response = self.client.messages.create(
            model="claude-3-5-haiku-20241022",
            max_tokens=400,
            messages=[{
                "role": "user",
                "content": "\n".join(parts) + "\n\nUpdate the summary:"
            }]
        )

        self.rolling_summary = response.content[0].text
        self.recent_messages = self.recent_messages[-self.window_size:]

    def get_context(self):
        context = []
        if self.rolling_summary:
            context.append({
                "role": "system",
                "content": f"[Summary]\n{self.rolling_summary}"
            })
        context.extend(self.recent_messages)
        return context
使用滑动窗口持续压缩对话。
优点: 低延迟,令牌使用可预测,实现简单 缺点: 早期细节过度压缩,无法恢复信息 最佳场景: 实时聊天,流式对话
python
from anthropic import Anthropic

class RollingMemory:
    def __init__(self, client: Anthropic, window_size: int = 10, compress_threshold: int = 15):
        self.client = client
        self.window_size = window_size
        self.compress_threshold = compress_threshold
        self.rolling_summary = None
        self.recent_messages = []

    def add_message(self, message: dict):
        self.recent_messages.append(message)

        if len(self.recent_messages) >= self.compress_threshold:
            self._compress()

    def _compress(self):
        """将旧消息压缩为滚动摘要。"""
        messages_to_compress = self.recent_messages[:-self.window_size]

        parts = []
        if self.rolling_summary:
            parts.append(f"现有摘要:\n{self.rolling_summary}")

        parts.append("\n新消息:\n" + "\n\n".join([
            f"{msg['role']}: {msg['content']}"
            for msg in messages_to_compress
        ]))

        response = self.client.messages.create(
            model="claude-3-5-haiku-20241022",
            max_tokens=400,
            messages=[{
                "role": "user",
                "content": "\n".join(parts) + "\n\n更新摘要:"
            }]
        )

        self.rolling_summary = response.content[0].text
        self.recent_messages = self.recent_messages[-self.window_size:]

    def get_context(self):
        context = []
        if self.rolling_summary:
            context.append({
                "role": "system",
                "content": f"[摘要]\n{self.rolling_summary}"
            })
        context.extend(self.recent_messages)
        return context

2. Embedding-Based Approaches

2. 基于嵌入的方法

2.1 RAG (Retrieval-Augmented Generation)

2.1 RAG(检索增强生成)

Store full conversation in vector database, retrieve only relevant chunks.
Pros: Extremely scalable, no information loss, high relevance Cons: Requires vector DB infrastructure, retrieval latency Best for: Knowledge bases, customer support with large history
python
from anthropic import Anthropic
from openai import OpenAI
import chromadb

class RAGMemory:
    def __init__(self, anthropic_client: Anthropic, openai_client: OpenAI):
        self.anthropic = anthropic_client
        self.openai = openai_client

        # Initialize vector store
        self.chroma = chromadb.Client()
        self.collection = self.chroma.create_collection(
            name="conversation",
            metadata={"hnsw:space": "cosine"}
        )

        self.recent_messages = []
        self.recent_window = 5
        self.message_counter = 0

    def add_message(self, message: dict):
        """Add to recent memory and vector store."""
        self.recent_messages.append(message)

        if len(self.recent_messages) > self.recent_window:
            old_msg = self.recent_messages.pop(0)
            self._store_in_vectordb(old_msg)

    def _store_in_vectordb(self, message: dict):
        """Archive to vector database."""
        # Generate embedding
        response = self.openai.embeddings.create(
            model="text-embedding-3-small",
            input=message['content']
        )

        self.collection.add(
            embeddings=[response.data[0].embedding],
            documents=[message['content']],
            metadatas=[{"role": message['role']}],
            ids=[f"msg_{self.message_counter}"]
        )
        self.message_counter += 1

    def retrieve_context(self, query: str, max_tokens: int = 4000):
        """Retrieve relevant context using RAG."""
        context = []
        token_count = 0

        # 1. Recent messages (short-term memory)
        for msg in self.recent_messages:
            context.append(msg)
            token_count += len(msg['content']) // 4

        # 2. Retrieve relevant historical context
        if token_count < max_tokens:
            query_embedding = self.openai.embeddings.create(
                model="text-embedding-3-small",
                input=query
            )

            n_results = min(10, (max_tokens - token_count) // 100)
            results = self.collection.query(
                query_embeddings=[query_embedding.data[0].embedding],
                n_results=n_results
            )

            for i, doc in enumerate(results['documents'][0]):
                if token_count + len(doc) // 4 > max_tokens:
                    break

                metadata = results['metadatas'][0][i]
                context.insert(0, {
                    "role": metadata['role'],
                    "content": f"[Retrieved] {doc}"
                })
                token_count += len(doc) // 4

        return context
Vector database options:
  • ChromaDB: Embedded, easy local development
  • Pinecone: Managed, 50ms p95 latency
  • Weaviate: Open-source, hybrid search
  • Qdrant: High performance, payload filtering
将完整对话存储在向量数据库中,仅检索相关块。
优点: 可扩展性极强,无信息损失,相关性高 缺点: 需要向量数据库基础设施,存在检索延迟 最佳场景: 知识库,包含大量历史的客户支持
python
from anthropic import Anthropic
from openai import OpenAI
import chromadb

class RAGMemory:
    def __init__(self, anthropic_client: Anthropic, openai_client: OpenAI):
        self.anthropic = anthropic_client
        self.openai = openai_client

        # 初始化向量存储
        self.chroma = chromadb.Client()
        self.collection = self.chroma.create_collection(
            name="conversation",
            metadata={"hnsw:space": "cosine"}
        )

        self.recent_messages = []
        self.recent_window = 5
        self.message_counter = 0

    def add_message(self, message: dict):
        """添加到近期内存和向量存储。"""
        self.recent_messages.append(message)

        if len(self.recent_messages) > self.recent_window:
            old_msg = self.recent_messages.pop(0)
            self._store_in_vectordb(old_msg)

    def _store_in_vectordb(self, message: dict):
        """归档到向量数据库。"""
        # 生成嵌入
        response = self.openai.embeddings.create(
            model="text-embedding-3-small",
            input=message['content']
        )

        self.collection.add(
            embeddings=[response.data[0].embedding],
            documents=[message['content']],
            metadatas=[{"role": message['role']}],
            ids=[f"msg_{self.message_counter}"]
        )
        self.message_counter += 1

    def retrieve_context(self, query: str, max_tokens: int = 4000):
        """使用RAG检索相关上下文。"""
        context = []
        token_count = 0

        # 1. 近期消息(短期内存)
        for msg in self.recent_messages:
            context.append(msg)
            token_count += len(msg['content']) // 4

        # 2. 检索相关历史上下文
        if token_count < max_tokens:
            query_embedding = self.openai.embeddings.create(
                model="text-embedding-3-small",
                input=query
            )

            n_results = min(10, (max_tokens - token_count) // 100)
            results = self.collection.query(
                query_embeddings=[query_embedding.data[0].embedding],
                n_results=n_results
            )

            for i, doc in enumerate(results['documents'][0]):
                if token_count + len(doc) // 4 > max_tokens:
                    break

                metadata = results['metadatas'][0][i]
                context.insert(0, {
                    "role": metadata['role'],
                    "content": f"[检索内容] {doc}"
                })
                token_count += len(doc) // 4

        return context
向量数据库选项:
  • ChromaDB: 嵌入式,易于本地开发
  • Pinecone: 托管式,p95延迟50ms
  • Weaviate: 开源,混合搜索
  • Qdrant: 高性能,负载过滤

2.2 Vector Search and Clustering

2.2 向量搜索与聚类

Group similar messages into clusters, represent with centroids.
Pros: Reduces redundancy, identifies themes, multi-topic handling Cons: Requires sufficient data, may lose nuances Best for: Multi-topic conversations, meeting summaries
python
from sklearn.cluster import KMeans
from openai import OpenAI
import numpy as np

class ClusteredMemory:
    def __init__(self, openai_client: OpenAI, n_clusters: int = 5):
        self.client = openai_client
        self.n_clusters = n_clusters
        self.messages = []
        self.embeddings = []

    def add_messages(self, messages: list):
        for msg in messages:
            self.messages.append(msg)

            response = self.client.embeddings.create(
                model="text-embedding-3-small",
                input=msg['content']
            )
            self.embeddings.append(response.data[0].embedding)

    def compress_by_clustering(self):
        """Cluster messages and return representatives."""
        if len(self.messages) < self.n_clusters:
            return self.messages

        embeddings_array = np.array(self.embeddings)
        kmeans = KMeans(n_clusters=self.n_clusters, random_state=42)
        labels = kmeans.fit_predict(embeddings_array)

        # Select message closest to each centroid
        compressed = []
        for cluster_id in range(self.n_clusters):
            cluster_indices = np.where(labels == cluster_id)[0]
            centroid = kmeans.cluster_centers_[cluster_id]
            cluster_embeddings = embeddings_array[cluster_indices]
            distances = np.linalg.norm(cluster_embeddings - centroid, axis=1)
            closest_idx = cluster_indices[np.argmin(distances)]

            compressed.append({
                **self.messages[closest_idx],
                "cluster_id": int(cluster_id),
                "cluster_size": len(cluster_indices)
            })

        return compressed
将相似消息分组为集群,用质心表示。
优点: 减少冗余,识别主题,处理多主题对话 缺点: 需要足够的数据,可能丢失细节 最佳场景: 多主题对话,会议摘要
python
from sklearn.cluster import KMeans
from openai import OpenAI
import numpy as np

class ClusteredMemory:
    def __init__(self, openai_client: OpenAI, n_clusters: int = 5):
        self.client = openai_client
        self.n_clusters = n_clusters
        self.messages = []
        self.embeddings = []

    def add_messages(self, messages: list):
        for msg in messages:
            self.messages.append(msg)

            response = self.client.embeddings.create(
                model="text-embedding-3-small",
                input=msg['content']
            )
            self.embeddings.append(response.data[0].embedding)

    def compress_by_clustering(self):
        """聚类消息并返回代表内容。"""
        if len(self.messages) < self.n_clusters:
            return self.messages

        embeddings_array = np.array(self.embeddings)
        kmeans = KMeans(n_clusters=self.n_clusters, random_state=42)
        labels = kmeans.fit_predict(embeddings_array)

        # 选择每个质心最近的消息
        compressed = []
        for cluster_id in range(self.n_clusters):
            cluster_indices = np.where(labels == cluster_id)[0]
            centroid = kmeans.cluster_centers_[cluster_id]
            cluster_embeddings = embeddings_array[cluster_indices]
            distances = np.linalg.norm(cluster_embeddings - centroid, axis=1)
            closest_idx = cluster_indices[np.argmin(distances)]

            compressed.append({
                **self.messages[closest_idx],
                "cluster_id": int(cluster_id),
                "cluster_size": len(cluster_indices)
            })

        return compressed

2.3 Semantic Deduplication

2.3 语义去重

Remove semantically similar messages that convey redundant information.
Pros: Reduces redundancy without losing unique content Cons: Requires threshold tuning, O(n²) complexity Best for: FAQ systems, repetitive conversations
python
from openai import OpenAI
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity

class SemanticDeduplicator:
    def __init__(self, openai_client: OpenAI, similarity_threshold: float = 0.85):
        self.client = openai_client
        self.threshold = similarity_threshold

    def deduplicate(self, messages: list):
        """Remove semantically similar messages."""
        if len(messages) <= 1:
            return messages

        # Generate embeddings
        embeddings = []
        for msg in messages:
            response = self.client.embeddings.create(
                model="text-embedding-3-small",
                input=msg['content']
            )
            embeddings.append(response.data[0].embedding)

        embeddings_array = np.array(embeddings)
        similarity_matrix = cosine_similarity(embeddings_array)

        # Mark unique messages
        keep_indices = []
        for i in range(len(messages)):
            is_unique = True
            for j in keep_indices:
                if similarity_matrix[i][j] > self.threshold:
                    is_unique = False
                    break

            if is_unique:
                keep_indices.append(i)

        return [messages[i] for i in keep_indices]
移除语义相似、传达冗余信息的消息。
优点: 在不丢失唯一内容的情况下减少冗余 缺点: 需要调优阈值,时间复杂度O(n²) 最佳场景: FAQ系统,重复性对话
python
from openai import OpenAI
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity

class SemanticDeduplicator:
    def __init__(self, openai_client: OpenAI, similarity_threshold: float = 0.85):
        self.client = openai_client
        self.threshold = similarity_threshold

    def deduplicate(self, messages: list):
        """移除语义相似的消息。"""
        if len(messages) <= 1:
            return messages

        # 生成嵌入
        embeddings = []
        for msg in messages:
            response = self.client.embeddings.create(
                model="text-embedding-3-small",
                input=msg['content']
            )
            embeddings.append(response.data[0].embedding)

        embeddings_array = np.array(embeddings)
        similarity_matrix = cosine_similarity(embeddings_array)

        # 标记唯一消息
        keep_indices = []
        for i in range(len(messages)):
            is_unique = True
            for j in keep_indices:
                if similarity_matrix[i][j] > self.threshold:
                    is_unique = False
                    break

            if is_unique:
                keep_indices.append(i)

        return [messages[i] for i in keep_indices]

3. Token-Efficient Strategies

3. 令牌高效策略

3.1 Message Prioritization

3.1 消息优先级

Assign importance scores and retain only high-priority content.
Pros: Retains most important information, flexible criteria Cons: Scoring is heuristic-based, may break flow Best for: Mixed-importance conversations, filtering noise
python
import re

class MessagePrioritizer:
    def score_message(self, msg: dict, index: int, total: int) -> float:
        """Calculate composite importance score."""
        scores = []

        # Length score (longer = more info)
        scores.append(min(len(msg['content']) / 500, 1.0))

        # Question score
        if msg['role'] == 'user':
            scores.append(min(msg['content'].count('?') * 0.5, 1.0))

        # Entity score (capitalized words)
        entities = len(re.findall(r'\b[A-Z][a-z]+', msg['content']))
        scores.append(min(entities / 10, 1.0))

        # Recency score (linear decay)
        scores.append(index / max(total - 1, 1))

        # Role score
        scores.append(0.6 if msg['role'] == 'user' else 0.4)

        return sum(scores) / len(scores)

    def prioritize(self, messages: list, target_count: int):
        """Select top N messages by priority."""
        scored = [
            (msg, self.score_message(msg, i, len(messages)), i)
            for i, msg in enumerate(messages)
        ]

        scored.sort(key=lambda x: x[1], reverse=True)
        top_messages = scored[:target_count]
        top_messages.sort(key=lambda x: x[2])  # Restore chronological order

        return [msg for msg, score, idx in top_messages]
分配重要性分数,仅保留高优先级内容。
优点: 保留最重要的信息,标准灵活 缺点: 评分基于启发式,可能破坏对话流 最佳场景: 混合重要性的对话,过滤噪音
python
import re

class MessagePrioritizer:
    def score_message(self, msg: dict, index: int, total: int) -> float:
        """计算综合重要性分数。"""
        scores = []

        # 长度分数(越长=信息越多)
        scores.append(min(len(msg['content']) / 500, 1.0))

        # 问题分数
        if msg['role'] == 'user':
            scores.append(min(msg['content'].count('?') * 0.5, 1.0))

        # 实体分数(大写单词)
        entities = len(re.findall(r'\b[A-Z][a-z]+', msg['content']))
        scores.append(min(entities / 10, 1.0))

        # 新鲜度分数(线性衰减)
        scores.append(index / max(total - 1, 1))

        # 角色分数
        scores.append(0.6 if msg['role'] == 'user' else 0.4)

        return sum(scores) / len(scores)

    def prioritize(self, messages: list, target_count: int):
        """按优先级选择前N条消息。"""
        scored = [
            (msg, self.score_message(msg, i, len(messages)), i)
            for i, msg in enumerate(messages)
        ]

        scored.sort(key=lambda x: x[1], reverse=True)
        top_messages = scored[:target_count]
        top_messages.sort(key=lambda x: x[2])  # 恢复时间顺序

        return [msg for msg, score, idx in top_messages]

3.2 Delta Compression

3.2 增量压缩

Store only changes between consecutive messages.
Pros: Highly efficient for incremental changes Cons: Reconstruction overhead, not suitable for all content Best for: Code assistants with incremental edits
python
import difflib

class DeltaCompressor:
    def __init__(self):
        self.base_messages = []
        self.deltas = []

    def add_message(self, message: dict):
        if not self.base_messages:
            self.base_messages.append(message)
            return

        # Find most similar previous message
        last_msg = self.base_messages[-1]

        if last_msg['role'] == message['role']:
            # Calculate delta
            diff = list(difflib.unified_diff(
                last_msg['content'].splitlines(),
                message['content'].splitlines(),
                lineterm=''
            ))

            if len('\n'.join(diff)) < len(message['content']) * 0.7:
                # Store as delta if compression achieved
                self.deltas.append({
                    'base_index': len(self.base_messages) - 1,
                    'delta': diff,
                    'role': message['role']
                })
                return

        # Store as new base message
        self.base_messages.append(message)

    def reconstruct(self):
        """Reconstruct full conversation from bases + deltas."""
        messages = self.base_messages.copy()

        for delta_info in self.deltas:
            base_content = messages[delta_info['base_index']]['content']
            # Apply diff to reconstruct (simplified)
            reconstructed = base_content  # Full implementation would apply diff
            messages.append({
                'role': delta_info['role'],
                'content': reconstructed
            })

        return messages
仅存储连续消息之间的变化。
优点: 对增量变化的效率极高 缺点: 存在重构开销,不适用于所有内容 最佳场景: 包含增量编辑的代码助手
python
import difflib

class DeltaCompressor:
    def __init__(self):
        self.base_messages = []
        self.deltas = []

    def add_message(self, message: dict):
        if not self.base_messages:
            self.base_messages.append(message)
            return

        # 找到最相似的前一条消息
        last_msg = self.base_messages[-1]

        if last_msg['role'] == message['role']:
            # 计算增量
            diff = list(difflib.unified_diff(
                last_msg['content'].splitlines(),
                message['content'].splitlines(),
                lineterm=''
            ))

            if len('\n'.join(diff)) < len(message['content']) * 0.7:
                # 如果实现了压缩,则存储为增量
                self.deltas.append({
                    'base_index': len(self.base_messages) - 1,
                    'delta': diff,
                    'role': message['role']
                })
                return

        # 存储为新的基础消息
        self.base_messages.append(message)

    def reconstruct(self):
        """从基础消息+增量重构完整对话。"""
        messages = self.base_messages.copy()

        for delta_info in self.deltas:
            base_content = messages[delta_info['base_index']]['content']
            # 应用diff进行重构(简化版)
            reconstructed = base_content  # 完整实现会应用diff
            messages.append({
                'role': delta_info['role'],
                'content': reconstructed
            })

        return messages

4. LangChain Memory Types

4. LangChain内存类型

4.1 ConversationSummaryMemory

4.1 ConversationSummaryMemory

Automatically summarizes conversation as it progresses.
python
from langchain.memory import ConversationSummaryMemory
from langchain_anthropic import ChatAnthropic

llm = ChatAnthropic(model="claude-3-5-sonnet-20241022")

memory = ConversationSummaryMemory(llm=llm)
随着对话推进自动生成摘要。
python
from langchain.memory import ConversationSummaryMemory
from langchain_anthropic import ChatAnthropic

llm = ChatAnthropic(model="claude-3-5-sonnet-20241022")

memory = ConversationSummaryMemory(llm=llm)

Add conversation

添加对话

memory.save_context( {"input": "Hi, I'm working on a Python project"}, {"output": "Great! How can I help with your Python project?"} )
memory.save_context( {"input": "你好,我正在做一个Python项目"}, {"output": "太棒了!我能怎么帮你处理这个Python项目?"} )

Get summary

获取摘要

summary = memory.load_memory_variables({}) print(summary['history'])

**Pros:** Automatic summarization, simple API
**Cons:** Every turn triggers LLM call
**Best for:** Medium conversations (20-50 turns)
summary = memory.load_memory_variables({}) print(summary['history'])

**优点:** 自动摘要,API简单
**缺点:** 每一轮都会触发LLM调用
**最佳场景:** 中等长度对话(20-50轮)

4.2 ConversationSummaryBufferMemory

4.2 ConversationSummaryBufferMemory

Hybrid: Recent messages verbatim, older summarized.
python
from langchain.memory import ConversationSummaryBufferMemory
from langchain_anthropic import ChatAnthropic

llm = ChatAnthropic(model="claude-3-5-haiku-20241022")

memory = ConversationSummaryBufferMemory(
    llm=llm,
    max_token_limit=2000,  # Summarize when exceeding
    return_messages=True
)
混合模式:近期消息原文存储,旧消息摘要存储。
python
from langchain.memory import ConversationSummaryBufferMemory
from langchain_anthropic import ChatAnthropic

llm = ChatAnthropic(model="claude-3-5-haiku-20241022")

memory = ConversationSummaryBufferMemory(
    llm=llm,
    max_token_limit=2000,  # 超过此限制时摘要
    return_messages=True
)

Add conversation

添加对话

for i in range(50): memory.save_context( {"input": f"Question {i}"}, {"output": f"Answer {i}"} )
for i in range(50): memory.save_context( {"input": f"问题 {i}"}, {"output": f"答案 {i}"} )

Automatically keeps recent messages + summary of old

自动保留近期消息+旧消息摘要

context = memory.load_memory_variables({})

**Pros:** Best balance of detail and compression
**Cons:** Requires token limit tuning
**Best for:** Most production applications
context = memory.load_memory_variables({})

**优点:** 在细节和压缩之间实现最佳平衡
**缺点:** 需要调优令牌限制
**最佳场景:** 大多数生产应用

4.3 ConversationTokenBufferMemory

4.3 ConversationTokenBufferMemory

Maintains fixed token budget, drops oldest when exceeded.
python
from langchain.memory import ConversationTokenBufferMemory
from langchain_anthropic import ChatAnthropic

llm = ChatAnthropic(model="claude-3-5-sonnet-20241022")

memory = ConversationTokenBufferMemory(
    llm=llm,
    max_token_limit=2000
)
维持固定令牌预算,超过时丢弃最旧的消息。
python
from langchain.memory import ConversationTokenBufferMemory
from langchain_anthropic import ChatAnthropic

llm = ChatAnthropic(model="claude-3-5-sonnet-20241022")

memory = ConversationTokenBufferMemory(
    llm=llm,
    max_token_limit=2000
)

Simple FIFO when token limit exceeded

令牌限制超过时采用简单的FIFO策略


**Pros:** Predictable token usage, simple
**Cons:** Loses old information completely
**Best for:** Real-time chat with strict limits

**优点:** 令牌使用可预测,实现简单
**缺点:** 完全丢失旧信息
**最佳场景:** 有严格限制的实时聊天

4.4 VectorStoreRetrieverMemory

4.4 VectorStoreRetrieverMemory

Stores all messages in vector database, retrieves relevant ones.
python
from langchain.memory import VectorStoreRetrieverMemory
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings

embeddings = OpenAIEmbeddings()
vectorstore = Chroma(embedding_function=embeddings)

memory = VectorStoreRetrieverMemory(
    retriever=vectorstore.as_retriever(search_kwargs={"k": 5})
)
将所有消息存储在向量数据库中,检索相关内容。
python
from langchain.memory import VectorStoreRetrieverMemory
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings

embeddings = OpenAIEmbeddings()
vectorstore = Chroma(embedding_function=embeddings)

memory = VectorStoreRetrieverMemory(
    retriever=vectorstore.as_retriever(search_kwargs={"k": 5})
)

Automatically retrieves most relevant context

自动检索最相关的上下文


**Pros:** Infinite conversation length, semantic retrieval
**Cons:** Requires vector DB, retrieval overhead
**Best for:** Long-running conversations, knowledge bases

**优点:** 对话长度无限制,语义检索
**缺点:** 需要向量数据库,存在检索开销
**最佳场景:** 长期运行的对话,知识库

5. Anthropic-Specific Patterns

5. Anthropic特定模式

5.1 Prompt Caching (90% Cost Reduction)

5.1 Prompt Caching(降低90%成本)

Cache static context to reduce token costs.
python
from anthropic import Anthropic

client = Anthropic(api_key="your-api-key")
缓存静态上下文以减少令牌成本。
python
from anthropic import Anthropic

client = Anthropic(api_key="your-api-key")

Long conversation context

长对话上下文

conversation_history = [ {"role": "user", "content": "Message 1"}, {"role": "assistant", "content": "Response 1"}, # ... many more messages ]
conversation_history = [ {"role": "user", "content": "Message 1"}, {"role": "assistant", "content": "Response 1"}, # ...更多消息 ]

Mark context for caching

标记上下文以进行缓存

messages = [] for i, msg in enumerate(conversation_history[:-1]): content = msg['content']
# Add cache control to last context message
if i == len(conversation_history) - 2:
    messages.append({
        "role": msg['role'],
        "content": [
            {
                "type": "text",
                "text": content,
                "cache_control": {"type": "ephemeral"}
            }
        ]
    })
else:
    messages.append(msg)
messages = [] for i, msg in enumerate(conversation_history[:-1]): content = msg['content']
# 为最后一条上下文消息添加缓存控制
if i == len(conversation_history) - 2:
    messages.append({
        "role": msg['role'],
        "content": [
            {
                "type": "text",
                "text": content,
                "cache_control": {"type": "ephemeral"}
            }
        ]
    })
else:
    messages.append(msg)

Add new user message (not cached)

添加新的用户消息(不缓存)

messages.append(conversation_history[-1])
response = client.messages.create( model="claude-3-5-sonnet-20241022", max_tokens=1024, messages=messages )
messages.append(conversation_history[-1])
response = client.messages.create( model="claude-3-5-sonnet-20241022", max_tokens=1024, messages=messages )

Subsequent calls with same cached context cost 90% less

后续使用相同缓存上下文的调用成本降低90%


**Cache TTL:** 5 minutes
**Savings:** 90% cost reduction for cached tokens
**Limits:** Max 4 cache breakpoints per request
**Best practices:**
- Cache conversation history, not current query
- Update cache when context changes significantly
- Combine with summarization for maximum efficiency

**缓存TTL:** 5分钟
**成本节省:** 缓存令牌的成本降低90%
**限制:** 每个请求最多4个缓存断点
**最佳实践:**
- 缓存对话历史,而非当前查询
- 上下文发生显著变化时更新缓存
- 与摘要结合以实现最大效率

5.2 Extended Thinking for Compression Planning

5.2 扩展思考用于压缩规划

Use extended thinking to plan optimal compression strategy.
python
from anthropic import Anthropic

client = Anthropic(api_key="your-api-key")

response = client.messages.create(
    model="claude-3-7-sonnet-20250219",
    max_tokens=16000,
    thinking={
        "type": "enabled",
        "budget_tokens": 10000
    },
    messages=[{
        "role": "user",
        "content": f"""Analyze this conversation and recommend compression:

{conversation_text}

Current token count: {current_tokens}
Target: {target_tokens}
Required compression: {compression_ratio}x

Recommend optimal strategy."""
    }]
)
使用扩展思考来规划最优压缩策略。
python
from anthropic import Anthropic

client = Anthropic(api_key="your-api-key")

response = client.messages.create(
    model="claude-3-7-sonnet-20250219",
    max_tokens=16000,
    thinking={
        "type": "enabled",
        "budget_tokens": 10000
    },
    messages=[{
        "role": "user",
        "content": f"""分析这段对话并推荐压缩策略:

{conversation_text}

当前令牌数:{current_tokens}
目标:{target_tokens}
所需压缩比:{compression_ratio}
推荐最优策略。"""
    }]
)

Access thinking process

访问思考过程

thinking_content = [ block for block in response.content if block.type == "thinking" ]
thinking_content = [ block for block in response.content if block.type == "thinking" ]

Get compression recommendation

获取压缩建议

recommendation = response.content[-1].text

---
recommendation = response.content[-1].text

---

Production Patterns

生产模式

Checkpointing and Persistence

检查点与持久化

Save compression state for recovery and resume.
python
import json
import pickle
from pathlib import Path

class PersistentMemory:
    def __init__(self, checkpoint_dir: str = "./checkpoints"):
        self.checkpoint_dir = Path(checkpoint_dir)
        self.checkpoint_dir.mkdir(exist_ok=True)
        self.memory = []
        self.summary = None

    def save_checkpoint(self, session_id: str):
        """Save current memory state."""
        checkpoint = {
            'messages': self.memory,
            'summary': self.summary,
            'timestamp': time.time()
        }

        checkpoint_file = self.checkpoint_dir / f"{session_id}.json"
        with open(checkpoint_file, 'w') as f:
            json.dump(checkpoint, f, indent=2)

    def load_checkpoint(self, session_id: str):
        """Load memory state from checkpoint."""
        checkpoint_file = self.checkpoint_dir / f"{session_id}.json"

        if checkpoint_file.exists():
            with open(checkpoint_file, 'r') as f:
                checkpoint = json.load(f)

            self.memory = checkpoint['messages']
            self.summary = checkpoint.get('summary')
            return True

        return False

    def auto_checkpoint(self, session_id: str, interval: int = 10):
        """Automatically save every N messages."""
        if len(self.memory) % interval == 0:
            self.save_checkpoint(session_id)
保存压缩状态以进行恢复和续期。
python
import json
import pickle
from pathlib import Path
import time

class PersistentMemory:
    def __init__(self, checkpoint_dir: str = "./checkpoints"):
        self.checkpoint_dir = Path(checkpoint_dir)
        self.checkpoint_dir.mkdir(exist_ok=True)
        self.memory = []
        self.summary = None

    def save_checkpoint(self, session_id: str):
        """保存当前内存状态。"""
        checkpoint = {
            'messages': self.memory,
            'summary': self.summary,
            'timestamp': time.time()
        }

        checkpoint_file = self.checkpoint_dir / f"{session_id}.json"
        with open(checkpoint_file, 'w') as f:
            json.dump(checkpoint, f, indent=2)

    def load_checkpoint(self, session_id: str):
        """从检查点加载内存状态。"""
        checkpoint_file = self.checkpoint_dir / f"{session_id}.json"

        if checkpoint_file.exists():
            with open(checkpoint_file, 'r') as f:
                checkpoint = json.load(f)

            self.memory = checkpoint['messages']
            self.summary = checkpoint.get('summary')
            return True

        return False

    def auto_checkpoint(self, session_id: str, interval: int = 10):
        """每添加N条消息自动保存。"""
        if len(self.memory) % interval == 0:
            self.save_checkpoint(session_id)

Resume Workflows

续期工作流

Continue conversations across sessions.
python
from anthropic import Anthropic
import json

class ResumableConversation:
    def __init__(self, client: Anthropic, session_id: str):
        self.client = client
        self.session_id = session_id
        self.memory = self._load_or_create()

    def _load_or_create(self):
        """Load existing session or create new."""
        try:
            with open(f'sessions/{self.session_id}.json', 'r') as f:
                return json.load(f)
        except FileNotFoundError:
            return {
                'messages': [],
                'summary': None,
                'created_at': time.time()
            }

    def add_turn(self, user_message: str):
        """Add user message and get response."""
        # Add user message
        self.memory['messages'].append({
            'role': 'user',
            'content': user_message
        })

        # Build context (with compression)
        context = self._build_context()

        # Get response
        response = self.client.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=1024,
            messages=context + [{
                'role': 'user',
                'content': user_message
            }]
        )

        # Save response
        assistant_message = response.content[0].text
        self.memory['messages'].append({
            'role': 'assistant',
            'content': assistant_message
        })

        # Compress if needed
        if len(self.memory['messages']) > 20:
            self._compress()

        # Save state
        self._save()

        return assistant_message

    def _build_context(self):
        """Build context with compression."""
        context = []

        # Add summary if exists
        if self.memory['summary']:
            context.append({
                'role': 'system',
                'content': f"[Previous conversation summary]\n{self.memory['summary']}"
            })

        # Add recent messages
        context.extend(self.memory['messages'][-10:])

        return context

    def _compress(self):
        """Compress older messages."""
        if len(self.memory['messages']) < 15:
            return

        # Messages to summarize
        to_summarize = self.memory['messages'][:-10]

        # Generate summary
        conversation_text = "\n\n".join([
            f"{msg['role']}: {msg['content']}"
            for msg in to_summarize
        ])

        response = self.client.messages.create(
            model="claude-3-5-haiku-20241022",
            max_tokens=500,
            messages=[{
                'role': 'user',
                'content': f"Summarize this conversation:\n\n{conversation_text}"
            }]
        )

        # Update memory
        self.memory['summary'] = response.content[0].text
        self.memory['messages'] = self.memory['messages'][-10:]

    def _save(self):
        """Save session to disk."""
        with open(f'sessions/{self.session_id}.json', 'w') as f:
            json.dump(self.memory, f, indent=2)
跨会话继续对话。
python
from anthropic import Anthropic
import json
import time

class ResumableConversation:
    def __init__(self, client: Anthropic, session_id: str):
        self.client = client
        self.session_id = session_id
        self.memory = self._load_or_create()

    def _load_or_create(self):
        """加载现有会话或创建新会话。"""
        try:
            with open(f'sessions/{self.session_id}.json', 'r') as f:
                return json.load(f)
        except FileNotFoundError:
            return {
                'messages': [],
                'summary': None,
                'created_at': time.time()
            }

    def add_turn(self, user_message: str):
        """添加用户消息并获取响应。"""
        # 添加用户消息
        self.memory['messages'].append({
            'role': 'user',
            'content': user_message
        })

        # 构建上下文(带压缩)
        context = self._build_context()

        # 获取响应
        response = self.client.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=1024,
            messages=context + [{
                'role': 'user',
                'content': user_message
            }]
        )

        # 保存响应
        assistant_message = response.content[0].text
        self.memory['messages'].append({
            'role': 'assistant',
            'content': assistant_message
        })

        # 必要时压缩
        if len(self.memory['messages']) > 20:
            self._compress()

        # 保存状态
        self._save()

        return assistant_message

    def _build_context(self):
        """构建带压缩的上下文。"""
        context = []

        # 如果存在摘要则添加
        if self.memory['summary']:
            context.append({
                'role': 'system',
                'content': f"[之前对话的摘要]\n{self.memory['summary']}"
            })

        # 添加近期消息
        context.extend(self.memory['messages'][-10:])

        return context

    def _compress(self):
        """压缩旧消息。"""
        if len(self.memory['messages']) < 15:
            return

        # 待摘要的消息
        to_summarize = self.memory['messages'][:-10]

        # 生成摘要
        conversation_text = "\n\n".join([
            f"{msg['role']}: {msg['content']}"
            for msg in to_summarize
        ])

        response = self.client.messages.create(
            model="claude-3-5-haiku-20241022",
            max_tokens=500,
            messages=[{
                'role': 'user',
                'content': f"摘要这段对话:\n\n{conversation_text}"
            }]
        )

        # 更新内存
        self.memory['summary'] = response.content[0].text
        self.memory['messages'] = self.memory['messages'][-10:]

    def _save(self):
        """将会话保存到磁盘。"""
        with open(f'sessions/{self.session_id}.json', 'w') as f:
            json.dump(self.memory, f, indent=2)

Usage

使用示例

client = Anthropic(api_key="your-api-key") conversation = ResumableConversation(client, session_id="user123_session1")
client = Anthropic(api_key="your-api-key") conversation = ResumableConversation(client, session_id="user123_session1")

Continue across multiple sessions

跨多个会话继续

response1 = conversation.add_turn("What's Python?")
response1 = conversation.add_turn("什么是Python?")

... later session

...之后的会话

response2 = conversation.add_turn("Show me an example") # Remembers context
undefined
response2 = conversation.add_turn("给我看一个示例") # 会记住上下文
undefined

Hybrid Approaches (Best Practice)

混合方法(最佳实践)

Combine multiple techniques for optimal results.
python
from anthropic import Anthropic
from openai import OpenAI
import chromadb

class HybridMemorySystem:
    """
    Combines:
    - Rolling summarization (short-term compression)
    - RAG retrieval (long-term memory)
    - Prompt caching (cost optimization)
    - Progressive compression (adaptive behavior)
    """

    def __init__(self, anthropic_client: Anthropic, openai_client: OpenAI):
        self.anthropic = anthropic_client
        self.openai = openai_client

        # Recent messages (verbatim)
        self.recent_messages = []
        self.recent_window = 10

        # Rolling summary
        self.rolling_summary = None

        # Vector store (long-term)
        self.chroma = chromadb.Client()
        self.collection = self.chroma.create_collection(name="memory")
        self.message_counter = 0

        # Compression thresholds
        self.thresholds = {
            'light': 0.70,    # Start basic compression
            'medium': 0.85,   # Aggressive summarization
            'heavy': 0.95     # Emergency measures
        }

    def add_message(self, message: dict):
        """Add message with intelligent compression."""
        self.recent_messages.append(message)

        # Check compression needs
        usage_ratio = self._estimate_usage()

        if usage_ratio >= self.thresholds['heavy']:
            self._emergency_compress()
        elif usage_ratio >= self.thresholds['medium']:
            self._medium_compress()
        elif usage_ratio >= self.thresholds['light']:
            self._light_compress()

    def _light_compress(self):
        """Remove redundancy, archive to vector store."""
        if len(self.recent_messages) > self.recent_window * 1.5:
            # Archive oldest to vector store
            to_archive = self.recent_messages[:5]
            for msg in to_archive:
                self._archive_to_vectorstore(msg)

            self.recent_messages = self.recent_messages[5:]

    def _medium_compress(self):
        """Generate rolling summary, aggressive archival."""
        if len(self.recent_messages) > self.recent_window:
            # Summarize older messages
            to_summarize = self.recent_messages[:-self.recent_window]

            summary_text = "\n\n".join([
                f"{msg['role']}: {msg['content']}"
                for msg in to_summarize
            ])

            if self.rolling_summary:
                summary_text = f"Existing: {self.rolling_summary}\n\nNew: {summary_text}"

            response = self.anthropic.messages.create(
                model="claude-3-5-haiku-20241022",
                max_tokens=400,
                messages=[{
                    'role': 'user',
                    'content': f"Update summary:\n{summary_text}"
                }]
            )

            self.rolling_summary = response.content[0].text

            # Archive all summarized messages
            for msg in to_summarize:
                self._archive_to_vectorstore(msg)

            self.recent_messages = self.recent_messages[-self.recent_window:]

    def _emergency_compress(self):
        """Extreme compression for near-limit situations."""
        # Keep only 5 most recent messages
        to_archive = self.recent_messages[:-5]
        for msg in to_archive:
            self._archive_to_vectorstore(msg)

        self.recent_messages = self.recent_messages[-5:]

        # Compress summary further if needed
        if self.rolling_summary and len(self.rolling_summary) > 1000:
            response = self.anthropic.messages.create(
                model="claude-3-5-haiku-20241022",
                max_tokens=200,
                messages=[{
                    'role': 'user',
                    'content': f"Create ultra-concise summary:\n{self.rolling_summary}"
                }]
            )
            self.rolling_summary = response.content[0].text

    def _archive_to_vectorstore(self, message: dict):
        """Store in vector database for retrieval."""
        embedding_response = self.openai.embeddings.create(
            model="text-embedding-3-small",
            input=message['content']
        )

        self.collection.add(
            embeddings=[embedding_response.data[0].embedding],
            documents=[message['content']],
            metadatas=[{'role': message['role']}],
            ids=[f"msg_{self.message_counter}"]
        )
        self.message_counter += 1

    def get_context(self, current_query: str, max_tokens: int = 8000):
        """Build optimal context for current query."""
        context = []
        token_count = 0

        # 1. Add rolling summary (if exists)
        if self.rolling_summary:
            summary_msg = {
                'role': 'system',
                'content': [
                    {
                        'type': 'text',
                        'text': f"[Conversation Summary]\n{self.rolling_summary}",
                        'cache_control': {'type': 'ephemeral'}  # Cache it
                    }
                ]
            }
            context.append(summary_msg)
            token_count += len(self.rolling_summary) // 4

        # 2. Retrieve relevant historical context (RAG)
        if token_count < max_tokens * 0.3:
            query_embedding = self.openai.embeddings.create(
                model="text-embedding-3-small",
                input=current_query
            )

            results = self.collection.query(
                query_embeddings=[query_embedding.data[0].embedding],
                n_results=5
            )

            for i, doc in enumerate(results['documents'][0]):
                if token_count + len(doc) // 4 > max_tokens * 0.3:
                    break

                metadata = results['metadatas'][0][i]
                context.append({
                    'role': metadata['role'],
                    'content': f"[Retrieved] {doc}"
                })
                token_count += len(doc) // 4

        # 3. Add recent messages verbatim
        for msg in self.recent_messages:
            if token_count + len(msg['content']) // 4 > max_tokens * 0.8:
                break
            context.append(msg)
            token_count += len(msg['content']) // 4

        return context

    def _estimate_usage(self):
        """Estimate current context window usage."""
        total_tokens = 0

        if self.rolling_summary:
            total_tokens += len(self.rolling_summary) // 4

        for msg in self.recent_messages:
            total_tokens += len(msg['content']) // 4

        return total_tokens / 200000  # Claude Sonnet context window
结合多种技术以获得最佳结果。
python
from anthropic import Anthropic
from openai import OpenAI
import chromadb

class HybridMemorySystem:
    """
    结合了:
    - 滚动摘要(短期压缩)
    - RAG检索(长期内存)
    - Prompt缓存(成本优化)
    - 渐进式压缩(自适应行为)
    """

    def __init__(self, anthropic_client: Anthropic, openai_client: OpenAI):
        self.anthropic = anthropic_client
        self.openai = openai_client

        # 近期消息(原文)
        self.recent_messages = []
        self.recent_window = 10

        # 滚动摘要
        self.rolling_summary = None

        # 向量存储(长期)
        self.chroma = chromadb.Client()
        self.collection = self.chroma.create_collection(name="memory")
        self.message_counter = 0

        # 压缩阈值
        self.thresholds = {
            'light': 0.70,    # 开始基础压缩
            'medium': 0.85,   # 深度摘要
            'heavy': 0.95     # 紧急措施
        }

    def add_message(self, message: dict):
        """添加消息并进行智能压缩。"""
        self.recent_messages.append(message)

        # 检查压缩需求
        usage_ratio = self._estimate_usage()

        if usage_ratio >= self.thresholds['heavy']:
            self._emergency_compress()
        elif usage_ratio >= self.thresholds['medium']:
            self._medium_compress()
        elif usage_ratio >= self.thresholds['light']:
            self._light_compress()

    def _light_compress(self):
        """移除冗余内容,归档到向量存储。"""
        if len(self.recent_messages) > self.recent_window * 1.5:
            # 将最旧的消息归档到向量存储
            to_archive = self.recent_messages[:5]
            for msg in to_archive:
                self._archive_to_vectorstore(msg)

            self.recent_messages = self.recent_messages[5:]

    def _medium_compress(self):
        """生成滚动摘要,深度归档。"""
        if len(self.recent_messages) > self.recent_window:
            # 摘要旧消息
            to_summarize = self.recent_messages[:-self.recent_window]

            summary_text = "\n\n".join([
                f"{msg['role']}: {msg['content']}"
                for msg in to_summarize
            ])

            if self.rolling_summary:
                summary_text = f"现有摘要:{self.rolling_summary}\n\n新内容:{summary_text}"

            response = self.anthropic.messages.create(
                model="claude-3-5-haiku-20241022",
                max_tokens=400,
                messages=[{
                    'role': 'user',
                    'content': f"更新摘要:\n{summary_text}"
                }]
            )

            self.rolling_summary = response.content[0].text

            # 归档所有已摘要的消息
            for msg in to_summarize:
                self._archive_to_vectorstore(msg)

            self.recent_messages = self.recent_messages[-self.recent_window:]

    def _emergency_compress(self):
        """针对接近限制的情况进行极端压缩。"""
        # 仅保留最近5条消息
        to_archive = self.recent_messages[:-5]
        for msg in to_archive:
            self._archive_to_vectorstore(msg)

        self.recent_messages = self.recent_messages[-5:]

        # 如果需要,进一步压缩摘要
        if self.rolling_summary and len(self.rolling_summary) > 1000:
            response = self.anthropic.messages.create(
                model="claude-3-5-haiku-20241022",
                max_tokens=200,
                messages=[{
                    'role': 'user',
                    'content': f"创建超简洁摘要:\n{self.rolling_summary}"
                }]
            )
            self.rolling_summary = response.content[0].text

    def _archive_to_vectorstore(self, message: dict):
        """存储到向量数据库以进行检索。"""
        embedding_response = self.openai.embeddings.create(
            model="text-embedding-3-small",
            input=message['content']
        )

        self.collection.add(
            embeddings=[embedding_response.data[0].embedding],
            documents=[message['content']],
            metadatas=[{'role': message['role']}],
            ids=[f"msg_{self.message_counter}"]
        )
        self.message_counter += 1

    def get_context(self, current_query: str, max_tokens: int = 8000):
        """为当前查询构建最优上下文。"""
        context = []
        token_count = 0

        # 1. 添加滚动摘要(如果存在)
        if self.rolling_summary:
            summary_msg = {
                'role': 'system',
                'content': [
                    {
                        'type': 'text',
                        'text': f"[对话摘要]\n{self.rolling_summary}",
                        'cache_control': {'type': 'ephemeral'}  # 缓存
                    }
                ]
            }
            context.append(summary_msg)
            token_count += len(self.rolling_summary) // 4

        # 2. 检索相关历史上下文(RAG)
        if token_count < max_tokens * 0.3:
            query_embedding = self.openai.embeddings.create(
                model="text-embedding-3-small",
                input=current_query
            )

            results = self.collection.query(
                query_embeddings=[query_embedding.data[0].embedding],
                n_results=5
            )

            for i, doc in enumerate(results['documents'][0]):
                if token_count + len(doc) // 4 > max_tokens * 0.3:
                    break

                metadata = results['metadatas'][0][i]
                context.append({
                    'role': metadata['role'],
                    'content': f"[检索内容] {doc}"
                })
                token_count += len(doc) // 4

        # 3. 原文添加近期消息
        for msg in self.recent_messages:
            if token_count + len(msg['content']) // 4 > max_tokens * 0.8:
                break
            context.append(msg)
            token_count += len(msg['content']) // 4

        return context

    def _estimate_usage(self):
        """估算当前上下文窗口使用率。"""
        total_tokens = 0

        if self.rolling_summary:
            total_tokens += len(self.rolling_summary) // 4

        for msg in self.recent_messages:
            total_tokens += len(msg['content']) // 4

        return total_tokens / 200000  # Claude Sonnet上下文窗口

Usage

使用示例

anthropic_client = Anthropic(api_key="your-anthropic-key") openai_client = OpenAI(api_key="your-openai-key")
memory = HybridMemorySystem(anthropic_client, openai_client)
anthropic_client = Anthropic(api_key="your-anthropic-key") openai_client = OpenAI(api_key="your-openai-key")
memory = HybridMemorySystem(anthropic_client, openai_client)

Add messages over time

随时间添加消息

for i in range(1000): memory.add_message({ 'role': 'user' if i % 2 == 0 else 'assistant', 'content': f"Message {i} with some content..." })
for i in range(1000): memory.add_message({ 'role': 'user' if i % 2 == 0 else 'assistant', 'content': f"Message {i} with some content..." })

Retrieve optimized context

检索优化后的上下文

current_query = "What did we discuss about pricing?" context = memory.get_context(current_query)
current_query = "我们讨论过哪些关于定价的内容?" context = memory.get_context(current_query)

Use with Claude

与Claude一起使用

response = anthropic_client.messages.create( model="claude-3-5-sonnet-20241022", max_tokens=1024, messages=context + [{ 'role': 'user', 'content': current_query }] )

---
response = anthropic_client.messages.create( model="claude-3-5-sonnet-20241022", max_tokens=1024, messages=context + [{ 'role': 'user', 'content': current_query }] )

---

Performance Benchmarks

性能基准

Compression Efficiency

压缩效率

TechniqueCompression RatioQuality LossLatencyCost Impact
Extractive2-3x<1%<10msNone
Abstractive5-10x2-5%1-2s+$0.001/turn
Hierarchical20x+5-8%2-5s+$0.003/turn
LLMLingua20x1.5%500msNone
RAGVariable<1%100-300ms+$0.0005/turn
Prompt CachingN/A0%0ms-90%
技术压缩比质量损失延迟成本影响
抽取式摘要2-3倍<1%<10ms
生成式摘要5-10倍2-5%1-2s+$0.001/轮
分层摘要20倍以上5-8%2-5s+$0.003/轮
LLMLingua20倍1.5%500ms
RAG可变<1%100-300ms+$0.0005/轮
Prompt CachingN/A0%0ms-90%

Token Savings by Use Case

按场景划分的令牌节省

Customer Support (50-turn conversation):
  • No compression: ~8,000 tokens/request
  • Rolling summary: ~2,000 tokens/request (75% reduction)
  • Hybrid (RAG + summary): ~1,500 tokens/request (81% reduction)
Code Assistant (100-turn session):
  • No compression: ~25,000 tokens/request
  • Hierarchical: ~5,000 tokens/request (80% reduction)
  • Hybrid + caching: ~1,000 tokens/request effective (96% cost reduction)
Educational Tutor (multi-session):
  • No compression: Would exceed context window
  • RAG + summarization: ~3,000 tokens/request
  • Infinite session length enabled
客户支持(50轮对话):
  • 无压缩:~8000令牌/请求
  • 滚动摘要:~2000令牌/请求(减少75%)
  • 混合(RAG+摘要):~1500令牌/请求(减少81%)
代码助手(100轮会话):
  • 无压缩:~25000令牌/请求
  • 分层摘要:~5000令牌/请求(减少80%)
  • 混合+缓存:实际~1000令牌/请求(成本减少96%)
教育辅导(多会话):
  • 无压缩:会超出上下文窗口
  • RAG+摘要:~3000令牌/请求
  • 支持无限会话长度

Cost Analysis

成本分析

Example: Claude Sonnet pricing ($3 input, $15 output per 1M tokens)
1,000 conversations, 50 turns each:
  • No compression:
    • Avg 8K tokens/request × 50K requests = 400M tokens
    • Cost: $1,200
  • With rolling summarization:
    • Avg 2K tokens/request × 50K requests = 100M tokens
    • Summarization overhead: +10M tokens
    • Cost: $330 (72% savings)
  • With hybrid system + caching:
    • First turn: 2K tokens (no cache)
    • Subsequent: 200 tokens effective (90% cache hit)
    • Total: ~15M tokens effective
    • Cost: $45 (96% savings)

示例:Claude Sonnet定价(每100万令牌输入3美元/输出15美元)
1000次对话,每次50轮:
  • 无压缩:
    • 平均8K令牌/请求 × 5万请求 = 4亿令牌
    • 成本:1200美元
  • 使用滚动摘要:
    • 平均2K令牌/请求 × 5万请求 = 1亿令牌
    • 摘要开销:+1000万令牌
    • 成本:330美元(节省72%)
  • 使用混合系统+缓存:
    • 第一轮:2K令牌(无缓存)
    • 后续轮次:实际200令牌(90%缓存命中)
    • 总计:实际~1500万令牌
    • 成本:45美元(节省96%)

Tool Recommendations

工具推荐

Memory Management Tools

内存管理工具

Mem0 (Recommended for Production)

Mem0(生产环境推荐)

Best for: Hybrid memory systems with minimal code
python
from mem0 import MemoryClient

client = MemoryClient(api_key="your-mem0-key")
最佳场景: 混合内存系统,代码量极少
python
from mem0 import MemoryClient

client = MemoryClient(api_key="your-mem0-key")

Automatically handles compression, summarization, RAG

自动处理压缩、摘要、RAG

memory = client.create_memory( user_id="user123", messages=[ {"role": "user", "content": "I'm working on a Python project"}, {"role": "assistant", "content": "Great! What kind of project?"} ] )
memory = client.create_memory( user_id="user123", messages=[ {"role": "user", "content": "我正在做一个Python项目"}, {"role": "assistant", "content": "太棒了!是什么类型的项目?"} ] )

Retrieve relevant context

检索相关上下文

context = client.get_memory( user_id="user123", query="What programming language am I using?" )

**Features:**
- Automatic hierarchical summarization
- Built-in RAG retrieval
- Multi-user session management
- Analytics dashboard

**Pricing:** $0.40/1K memory operations
context = client.get_memory( user_id="user123", query="我在使用什么编程语言?" )

**特性:**
- 自动分层摘要
- 内置RAG检索
- 多用户会话管理
- 分析仪表板

**定价:** 每1000次内存操作0.40美元

Zep

Zep

Best for: Low-latency production deployments**
python
from zep_python import ZepClient

client = ZepClient(api_key="your-zep-key")
最佳场景: 低延迟生产部署**
python
from zep_python import ZepClient

client = ZepClient(api_key="your-zep-key")

Add to session

添加到会话

client.memory.add_memory( session_id="session123", messages=[ {"role": "user", "content": "Hello"}, {"role": "assistant", "content": "Hi there!"} ] )
client.memory.add_memory( session_id="session123", messages=[ {"role": "user", "content": "你好"}, {"role": "assistant", "content": "你好!"} ] )

Auto-summarized retrieval

自动摘要检索

memory = client.memory.get_memory(session_id="session123")

**Features:**
- <100ms retrieval latency
- Automatic fact extraction
- Entity recognition
- Session management

**Pricing:** Open-source (self-hosted) or $0.50/1K operations (cloud)
memory = client.memory.get_memory(session_id="session123")

**特性:**
- <100ms检索延迟
- 自动事实提取
- 实体识别
- 会话管理

**定价:** 开源(自托管)或每1000次操作0.50美元(云服务)

ChromaDB

ChromaDB

Best for: Self-hosted vector storage**
python
import chromadb

client = chromadb.Client()
collection = client.create_collection("conversations")
最佳场景: 自托管向量存储**
python
import chromadb

client = chromadb.Client()
collection = client.create_collection("conversations")

Store embeddings

存储嵌入

collection.add( documents=["Message content"], embeddings=[[0.1, 0.2, ...]], ids=["msg1"] )
collection.add( documents=["Message content"], embeddings=[[0.1, 0.2, ...]], ids=["msg1"] )

Retrieve

检索

results = collection.query( query_embeddings=[[0.1, 0.2, ...]], n_results=5 )

**Features:**
- Fully open-source
- Embedded or client-server
- Fast local development

**Pricing:** Free (self-hosted)
results = collection.query( query_embeddings=[[0.1, 0.2, ...]], n_results=5 )

**特性:**
- 完全开源
- 嵌入式或客户端-服务器模式
- 快速本地开发

**定价:** 免费(自托管)

LangChain

LangChain

Best for: Rapid prototyping and experimentation**
python
from langchain.memory import ConversationSummaryBufferMemory
from langchain_anthropic import ChatAnthropic

llm = ChatAnthropic(model="claude-3-5-sonnet-20241022")
memory = ConversationSummaryBufferMemory(llm=llm, max_token_limit=2000)
Features:
  • Multiple memory types
  • Framework integration
  • Extensive documentation
Pricing: Free (uses your LLM API costs)
最佳场景: 快速原型开发和实验**
python
from langchain.memory import ConversationSummaryBufferMemory
from langchain_anthropic import ChatAnthropic

llm = ChatAnthropic(model="claude-3-5-sonnet-20241022")
memory = ConversationSummaryBufferMemory(llm=llm, max_token_limit=2000)
特性:
  • 多种内存类型
  • 框架集成
  • 丰富的文档
定价: 免费(使用您的LLM API成本)

Compression Libraries

压缩库

LLMLingua

LLMLingua

Best for: Extreme compression with minimal quality loss**
python
from llmlingua import PromptCompressor

compressor = PromptCompressor()

compressed = compressor.compress_prompt(
    context="Long conversation history...",
    instruction="Current user query",
    target_token=500
)
最佳场景: 极端压缩,质量损失极小**
python
from llmlingua import PromptCompressor

compressor = PromptCompressor()

compressed = compressor.compress_prompt(
    context="长对话历史...",
    instruction="当前用户查询",
    target_token=500
)

Achieves 20x compression with 1.5% accuracy loss

实现20倍压缩,准确率损失1.5%


**Features:**
- 20x compression ratios
- <2% quality degradation
- Fast inference (<500ms)

**Pricing:** Free (open-source)

---

**特性:**
- 20倍压缩比
- <2%质量下降
- 快速推理(<500ms)

**定价:** 免费(开源)

---

Use Cases and Patterns

用例与模式

Chatbot (Customer Support)

聊天机器人(客户支持)

Requirements:
  • Multi-turn conversations (50-100 turns)
  • Preserve customer context
  • Fast response times
  • Cost-efficient
Recommended approach:
  • ConversationSummaryBufferMemory (LangChain)
  • 70% threshold: Semantic deduplication
  • 85% threshold: Rolling summarization
  • Prompt caching for frequent patterns
Implementation:
python
from langchain.memory import ConversationSummaryBufferMemory
from langchain_anthropic import ChatAnthropic

llm = ChatAnthropic(model="claude-3-5-haiku-20241022")

memory = ConversationSummaryBufferMemory(
    llm=llm,
    max_token_limit=2000,
    return_messages=True
)
需求:
  • 多轮对话(50-100轮)
  • 保留客户上下文
  • 快速响应
  • 成本高效
推荐方法:
  • ConversationSummaryBufferMemory(LangChain)
  • 70%阈值:语义去重
  • 85%阈值:滚动摘要
  • 对频繁模式使用Prompt缓存
实现:
python
from langchain.memory import ConversationSummaryBufferMemory
from langchain_anthropic import ChatAnthropic

llm = ChatAnthropic(model="claude-3-5-haiku-20241022")

memory = ConversationSummaryBufferMemory(
    llm=llm,
    max_token_limit=2000,
    return_messages=True
)

Add customer conversation

添加客户对话

for turn in customer_conversation: memory.save_context( {"input": turn['customer_message']}, {"output": turn['agent_response']} )
for turn in customer_conversation: memory.save_context( {"input": turn['customer_message']}, {"output": turn['agent_response']} )

Retrieve compressed context

检索压缩后的上下文

context = memory.load_memory_variables({})
undefined
context = memory.load_memory_variables({})
undefined

Code Assistant

代码助手

Requirements:
  • Long development sessions (100+ turns)
  • Preserve technical details
  • Handle large code blocks
  • Track incremental changes
Recommended approach:
  • Hierarchical summarization for overall context
  • RAG retrieval for specific code references
  • Delta compression for iterative edits
  • Prompt caching for system prompts
Implementation:
python
from anthropic import Anthropic

client = Anthropic(api_key="your-api-key")

class CodeAssistantMemory:
    def __init__(self):
        self.hierarchy = HierarchicalMemory(client, chunk_size=15)
        self.rag = RAGMemory(anthropic_client=client, openai_client=openai_client)
        self.deltas = DeltaCompressor()

    def add_interaction(self, code_change: dict):
        # Store in hierarchy
        self.hierarchy.add_message({
            'role': 'user',
            'content': code_change['description']
        })

        # Store in RAG for retrieval
        self.rag.add_message(code_change)

        # Store as delta if incremental
        if code_change.get('is_incremental'):
            self.deltas.add_message(code_change)

    def get_context(self, current_query: str):
        # Combine hierarchical summary + RAG retrieval
        summary_context = self.hierarchy.get_context(max_tokens=2000)
        rag_context = self.rag.retrieve_context(current_query, max_tokens=2000)

        return summary_context + rag_context
需求:
  • 长期开发会话(100+轮)
  • 保留技术细节
  • 处理大代码块
  • 跟踪增量变化
推荐方法:
  • 分层摘要用于整体上下文
  • RAG检索用于特定代码引用
  • 增量压缩用于迭代编辑
  • 对系统提示使用Prompt缓存
实现:
python
from anthropic import Anthropic
from openai import OpenAI

client = Anthropic(api_key="your-api-key")
openai_client = OpenAI(api_key="your-openai-key")

class CodeAssistantMemory:
    def __init__(self):
        self.hierarchy = HierarchicalMemory(client, chunk_size=15)
        self.rag = RAGMemory(anthropic_client=client, openai_client=openai_client)
        self.deltas = DeltaCompressor()

    def add_interaction(self, code_change: dict):
        # 存储到分层内存
        self.hierarchy.add_message({
            'role': 'user',
            'content': code_change['description']
        })

        # 存储到RAG以进行检索
        self.rag.add_message(code_change)

        # 如果是增量变化则存储为增量
        if code_change.get('is_incremental'):
            self.deltas.add_message(code_change)

    def get_context(self, current_query: str):
        # 结合分层摘要+RAG检索
        summary_context = self.hierarchy.get_context(max_tokens=2000)
        rag_context = self.rag.retrieve_context(current_query, max_tokens=2000)

        return summary_context + rag_context

Educational Tutor

教育辅导

Requirements:
  • Multi-session tracking
  • Student progress persistence
  • Personalized context retrieval
  • Long-term knowledge retention
Recommended approach:
  • VectorStoreRetrieverMemory for multi-session
  • Fact extraction for student knowledge
  • Progressive compression across sessions
  • Resumable conversations
Implementation:
python
from langchain.memory import VectorStoreRetrieverMemory
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings

class TutorMemory:
    def __init__(self, student_id: str):
        self.student_id = student_id

        # Vector store for all sessions
        embeddings = OpenAIEmbeddings()
        vectorstore = Chroma(
            collection_name=f"student_{student_id}",
            embedding_function=embeddings
        )

        self.memory = VectorStoreRetrieverMemory(
            retriever=vectorstore.as_retriever(search_kwargs={"k": 10})
        )

    def add_lesson_content(self, lesson: dict):
        """Add lesson interaction to student memory."""
        self.memory.save_context(
            {"input": lesson['topic']},
            {"output": lesson['explanation']}
        )

    def get_student_context(self, current_topic: str):
        """Retrieve relevant past lessons for current topic."""
        return self.memory.load_memory_variables({
            "prompt": current_topic
        })

需求:
  • 多会话跟踪
  • 学生进度持久化
  • 个性化上下文检索
  • 长期知识保留
推荐方法:
  • VectorStoreRetrieverMemory用于多会话
  • 事实提取用于学生知识
  • 跨会话的渐进式压缩
  • 可续期对话
实现:
python
from langchain.memory import VectorStoreRetrieverMemory
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings

class TutorMemory:
    def __init__(self, student_id: str):
        self.student_id = student_id

        # 所有会话的向量存储
        embeddings = OpenAIEmbeddings()
        vectorstore = Chroma(
            collection_name=f"student_{student_id}",
            embedding_function=embeddings
        )

        self.memory = VectorStoreRetrieverMemory(
            retriever=vectorstore.as_retriever(search_kwargs={"k": 10})
        )

    def add_lesson_content(self, lesson: dict):
        """将课程交互添加到学生内存。"""
        self.memory.save_context(
            {"input": lesson['topic']},
            {"output": lesson['explanation']}
        )

    def get_student_context(self, current_topic: str):
        """为当前主题检索相关的过往课程。"""
        return self.memory.load_memory_variables({
            "prompt": current_topic
        })

Best Practices

最佳实践

1. Choose the Right Technique for Your Use Case

1. 为您的用例选择合适的技术

  • Short conversations (<20 turns): No compression needed
  • Medium conversations (20-50 turns): ConversationSummaryBufferMemory
  • Long conversations (50-100 turns): Hierarchical or rolling summarization
  • Very long (100+ turns): Hybrid (RAG + summarization + caching)
  • Multi-session: VectorStoreRetrieverMemory or Mem0
  • 短对话(<20轮): 无需压缩
  • 中等对话(20-50轮): ConversationSummaryBufferMemory
  • 长对话(50-100轮): 分层或滚动摘要
  • 极长对话(100+轮): 混合(RAG+摘要+缓存)
  • 多会话: VectorStoreRetrieverMemory或Mem0

2. Implement Progressive Compression

2. 实现渐进式压缩

Don't compress aggressively from the start. Use thresholds:
  • 0-70%: Store verbatim
  • 70-85%: Light compression (deduplication)
  • 85-95%: Medium compression (summarization)
  • 95-100%: Aggressive compression (hierarchical)
不要从一开始就深度压缩。使用阈值:
  • 0-70%:原文存储
  • 70-85%:轻度压缩(去重)
  • 85-95%:中度压缩(摘要)
  • 95-100%:深度压缩(分层)

3. Combine Techniques

3. 结合多种技术

Single-technique approaches are suboptimal. Best production systems use:
  • Rolling summarization (short-term)
  • RAG retrieval (long-term)
  • Prompt caching (cost optimization)
  • Semantic deduplication (redundancy removal)
单一技术方法不是最优的。最佳生产系统使用:
  • 滚动摘要(短期)
  • RAG检索(长期)
  • Prompt缓存(成本优化)
  • 语义去重(冗余移除)

4. Monitor Quality Metrics

4. 监控质量指标

Track compression impact:
  • Response relevance score
  • Information retention rate
  • User satisfaction metrics
  • Token usage reduction
跟踪压缩的影响:
  • 响应相关性分数
  • 信息保留率
  • 用户满意度指标
  • 令牌使用减少量

5. Use Prompt Caching Strategically

5. 战略性使用Prompt Caching

Cache stable content:
  • Conversation summaries
  • System prompts
  • Knowledge base context
  • User profiles
Don't cache frequently changing content:
  • Current user query
  • Real-time data
  • Session-specific state
缓存稳定内容:
  • 对话摘要
  • 系统提示
  • 知识库上下文
  • 用户配置文件
不要缓存频繁变化的内容:
  • 当前用户查询
  • 实时数据
  • 会话特定状态

6. Implement Checkpointing

6. 实现检查点

Save compression state for:
  • Recovery from failures
  • Multi-session continuity
  • Analytics and debugging
  • A/B testing different strategies
保存压缩状态以用于:
  • 故障恢复
  • 多会话连续性
  • 分析和调试
  • 不同策略的A/B测试

7. Tune Compression Parameters

7. 调优压缩参数

Test and optimize:
  • Summary token limits
  • Compression thresholds
  • Retrieval result counts
  • Cache TTLs
  • Chunk sizes for hierarchical
测试并优化:
  • 摘要令牌限制
  • 压缩阈值
  • 检索结果数量
  • 缓存TTL
  • 分层的块大小

8. Handle Edge Cases

8. 处理边缘情况

Plan for:
  • Very long messages (split or compress individually)
  • Code blocks (preserve formatting)
  • Multi-language content
  • Rapidly changing context

为以下情况做计划:
  • 极长消息(拆分或单独压缩)
  • 代码块(保留格式)
  • 多语言内容
  • 快速变化的上下文

Troubleshooting

故障排除

Problem: Summary loses critical information

问题:摘要丢失关键信息

Solutions:
  • Lower compression ratio (less aggressive)
  • Implement importance scoring to preserve key messages
  • Use extractive summarization for critical sections
  • Increase summary token budget
解决方案:
  • 降低压缩比(不那么激进)
  • 实现重要性评分以保留关键消息
  • 对关键部分使用抽取式摘要
  • 增加摘要令牌预算

Problem: Retrieval returns irrelevant context

问题:检索返回不相关上下文

Solutions:
  • Improve embedding model quality
  • Add metadata filtering (timestamps, topics)
  • Adjust similarity threshold
  • Use hybrid search (semantic + keyword)
解决方案:
  • 提升嵌入模型质量
  • 添加元数据过滤(时间戳、主题)
  • 调整相似性阈值
  • 使用混合搜索(语义+关键词)

Problem: High latency from compression

问题:压缩导致高延迟

Solutions:
  • Compress asynchronously (background tasks)
  • Use faster models for summarization (Haiku instead of Sonnet)
  • Cache summaries more aggressively
  • Reduce compression frequency
解决方案:
  • 异步压缩(后台任务)
  • 使用更快的模型进行摘要(Haiku而非Sonnet)
  • 更积极地缓存摘要
  • 降低压缩频率

Problem: Conversations still exceeding context window

问题:对话仍超出上下文窗口

Solutions:
  • Implement hierarchical compression
  • Archive to vector database more aggressively
  • Use more aggressive compression ratios
  • Consider switching to model with larger context window
解决方案:
  • 实现分层压缩
  • 更积极地归档到向量数据库
  • 使用更激进的压缩比
  • 考虑切换到更大上下文窗口的模型

Problem: High costs despite compression

问题:尽管压缩但成本仍很高

Solutions:
  • Implement prompt caching
  • Use cheaper models for summarization (Haiku)
  • Batch summarization operations
  • Reduce summarization frequency
解决方案:
  • 实现Prompt缓存
  • 使用更便宜的模型进行摘要(Haiku)
  • 批量摘要操作
  • 降低摘要频率

Problem: Lost conversation continuity

问题:对话连续性丢失

Solutions:
  • Increase recent message window
  • Include summary in every request
  • Use more descriptive summaries
  • Implement session resumption with context injection

解决方案:
  • 增加近期消息窗口
  • 在每个请求中包含摘要
  • 使用更具描述性的摘要
  • 实现带上下文注入的会话续期

Advanced Topics

高级主题

Streaming Compression

流式压缩

Compress in real-time as conversation progresses:
python
async def streaming_compress(messages: list):
    """Compress while streaming responses."""
    compressor = ProgressiveCompressor()

    async for message in conversation_stream:
        compressor.add_message(message)

        # Compression happens asynchronously
        if compressor.should_compress():
            asyncio.create_task(compressor.compress_async())

    return compressor.get_context()
随着对话推进实时压缩:
python
import asyncio

async def streaming_compress(conversation_stream):
    """在流式响应时进行压缩。"""
    compressor = ProgressiveCompressor()

    async for message in conversation_stream:
        compressor.add_message(message)

        # 异步进行压缩
        if compressor.should_compress():
            asyncio.create_task(compressor.compress_async())

    return compressor.get_context()

Multi-User Session Management

多用户会话管理

Handle concurrent conversations with shared context:
python
class MultiUserMemory:
    def __init__(self):
        self.user_sessions = {}

    def get_or_create_session(self, user_id: str):
        if user_id not in self.user_sessions:
            self.user_sessions[user_id] = HybridMemorySystem(...)
        return self.user_sessions[user_id]

    def cleanup_inactive_sessions(self, timeout: int = 3600):
        """Remove sessions inactive for > timeout seconds."""
        current_time = time.time()
        inactive = [
            user_id for user_id, session in self.user_sessions.items()
            if current_time - session.last_activity > timeout
        ]

        for user_id in inactive:
            self._archive_session(user_id)
            del self.user_sessions[user_id]
处理带有共享上下文的并发对话:
python
import time

class MultiUserMemory:
    def __init__(self):
        self.user_sessions = {}

    def get_or_create_session(self, user_id: str):
        if user_id not in self.user_sessions:
            self.user_sessions[user_id] = HybridMemorySystem(...)
        return self.user_sessions[user_id]

    def cleanup_inactive_sessions(self, timeout: int = 3600):
        """移除超过超时时间未活动的会话。"""
        current_time = time.time()
        inactive = [
            user_id for user_id, session in self.user_sessions.items()
            if current_time - session.last_activity > timeout
        ]

        for user_id in inactive:
            self._archive_session(user_id)
            del self.user_sessions[user_id]

Custom Importance Scoring

自定义重要性评分

Train ML models to score message importance:
python
from transformers import pipeline

class MLImportanceScorer:
    def __init__(self):
        # Use pre-trained classifier or fine-tune on your data
        self.classifier = pipeline(
            "text-classification",
            model="your-importance-model"
        )

    def score(self, message: dict) -> float:
        """Score message importance (0-1)."""
        result = self.classifier(message['content'])
        return result[0]['score']
训练ML模型为消息重要性评分:
python
from transformers import pipeline

class MLImportanceScorer:
    def __init__(self):
        # 使用预训练分类器或在您的数据上微调
        self.classifier = pipeline(
            "text-classification",
            model="your-importance-model"
        )

    def score(self, message: dict) -> float:
        """为消息重要性评分(0-1)。"""
        result = self.classifier(message['content'])
        return result[0]['score']

Context Window Utilization Optimization

上下文窗口利用率优化

Maximize information density within token budget:
python
def optimize_context_allocation(
    summary_tokens: int,
    recent_tokens: int,
    retrieval_tokens: int,
    max_tokens: int
):
    """
    Optimal allocation (empirically tested):
    - 20% summary
    - 50% recent messages
    - 30% retrieved context
    """
    return {
        'summary': int(max_tokens * 0.20),
        'recent': int(max_tokens * 0.50),
        'retrieval': int(max_tokens * 0.30)
    }

在令牌预算内最大化信息密度:
python
def optimize_context_allocation(
    summary_tokens: int,
    recent_tokens: int,
    retrieval_tokens: int,
    max_tokens: int
):
    """
    最优分配(经验证):
    - 20% 摘要
    - 50% 近期消息
    - 30% 检索上下文
    """
    return {
        'summary': int(max_tokens * 0.20),
        'recent': int(max_tokens * 0.50),
        'retrieval': int(max_tokens * 0.30)
    }

Future Directions

未来方向

Emerging Techniques (2025+)

新兴技术(2025+)

1. Infinite Attention Mechanisms
  • Models with >10M token context windows (Gemini 1.5, future Claude)
  • Reduces need for compression but doesn't eliminate cost concerns
2. Learned Compression Models
  • Neural networks trained to compress conversation optimally
  • Maintain semantic meaning while minimizing tokens
  • Examples: LLMLingua v2, PromptCompressor
3. Multimodal Session Compression
  • Compress conversations with images, audio, video
  • Maintain cross-modal context relationships
4. Federated Memory Systems
  • Distributed compression across multiple memory stores
  • Privacy-preserving compression for sensitive conversations
5. Adaptive Compression Strategies
  • RL-based systems that learn optimal compression per user/domain
  • Dynamic threshold adjustment based on conversation importance

1. 无限注意力机制
  • 上下文窗口>1000万令牌的模型(Gemini 1.5,未来Claude)
  • 减少对压缩的需求,但无法消除成本顾虑
2. 学习压缩模型
  • 经过训练可最优压缩对话的神经网络
  • 在最小化令牌的同时保持语义含义
  • 示例:LLMLingua v2, PromptCompressor
3. 多模态会话压缩
  • 压缩包含图像、音频、视频的对话
  • 保持跨模态上下文关系
4. 联邦内存系统
  • 跨多个内存存储的分布式压缩
  • 针对敏感对话的隐私保护压缩
5. 自适应压缩策略
  • 基于强化学习的系统,为每个用户/领域学习最优压缩
  • 根据对话重要性动态调整阈值

References

参考资料

Academic Papers

学术论文

  • "Recursively Summarizing Enables Long-Term Dialogue Memory" (arXiv:2308.15022)
  • "LLMLingua: Compressing Prompts for Accelerated Inference" (arXiv:2310.05736)
  • "Lost in the Middle: How Language Models Use Long Contexts" (arXiv:2307.03172)
  • "Recursively Summarizing Enables Long-Term Dialogue Memory in Large Language Models" (arXiv:2308.15022)
  • "LLMLingua: Compressing Prompts for Accelerated Inference" (arXiv:2310.05736)
  • "Lost in the Middle: How Language Models Use Long Contexts" (arXiv:2307.03172)

Documentation

文档

Tools

工具


Last Updated: 2025-11-30 Version: 1.0.0 License: MIT

最后更新: 2025-11-30 版本: 1.0.0 许可证: MIT