langgraph-checkpoints

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

LangGraph Checkpointing

LangGraph 检查点机制

Persist workflow state for recovery and debugging.
持久化工作流状态以实现恢复与调试。

Checkpointer Options

检查点选项

python
from langgraph.checkpoint import MemorySaver
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.checkpoint.postgres import PostgresSaver
python
from langgraph.checkpoint import MemorySaver
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.checkpoint.postgres import PostgresSaver

Development: In-memory

开发环境:内存存储

memory = MemorySaver() app = workflow.compile(checkpointer=memory)
memory = MemorySaver() app = workflow.compile(checkpointer=memory)

Production: SQLite

生产环境:SQLite

checkpointer = SqliteSaver.from_conn_string("checkpoints.db") app = workflow.compile(checkpointer=checkpointer)
checkpointer = SqliteSaver.from_conn_string("checkpoints.db") app = workflow.compile(checkpointer=checkpointer)

Production: PostgreSQL

生产环境:PostgreSQL

checkpointer = PostgresSaver.from_conn_string("postgresql://...") app = workflow.compile(checkpointer=checkpointer)
undefined
checkpointer = PostgresSaver.from_conn_string("postgresql://...") app = workflow.compile(checkpointer=checkpointer)
undefined

Using Thread IDs

使用线程ID

python
undefined
python
undefined

Start new workflow

启动新工作流

config = {"configurable": {"thread_id": "analysis-123"}} result = app.invoke(initial_state, config=config)
config = {"configurable": {"thread_id": "analysis-123"}} result = app.invoke(initial_state, config=config)

Resume interrupted workflow

恢复中断的工作流

config = {"configurable": {"thread_id": "analysis-123"}} result = app.invoke(None, config=config) # Resumes from checkpoint
undefined
config = {"configurable": {"thread_id": "analysis-123"}} result = app.invoke(None, config=config) # 从检查点恢复
undefined

PostgreSQL Setup

PostgreSQL 配置

python
def create_checkpointer():
    """Create PostgreSQL checkpointer for production."""
    return PostgresSaver.from_conn_string(
        settings.DATABASE_URL,
        save_every=1  # Save after each node
    )
python
def create_checkpointer():
    """为生产环境创建PostgreSQL检查点工具。"""
    return PostgresSaver.from_conn_string(
        settings.DATABASE_URL,
        save_every=1  # 每个节点执行后保存
    )

Compile with checkpointing

启用检查点机制编译工作流

app = workflow.compile( checkpointer=create_checkpointer(), interrupt_before=["quality_gate"] # Manual review point )
undefined
app = workflow.compile( checkpointer=create_checkpointer(), interrupt_before=["quality_gate"] # 人工审核节点 )
undefined

Inspecting Checkpoints

检查点查看

python
undefined
python
undefined

Get all checkpoints for a workflow

获取工作流的所有检查点

checkpoints = app.get_state_history(config)
for checkpoint in checkpoints: print(f"Step: {checkpoint.metadata['step']}") print(f"Node: {checkpoint.metadata['source']}") print(f"State: {checkpoint.values}")
checkpoints = app.get_state_history(config)
for checkpoint in checkpoints: print(f"步骤: {checkpoint.metadata['step']}") print(f"节点: {checkpoint.metadata['source']}") print(f"状态: {checkpoint.values}")

Get current state

获取当前状态

current = app.get_state(config) print(current.values)
undefined
current = app.get_state(config) print(current.values)
undefined

Resuming After Crash

崩溃后恢复

python
import logging

async def run_with_recovery(workflow_id: str, initial_state: dict):
    """Run workflow with automatic recovery."""
    config = {"configurable": {"thread_id": workflow_id}}

    try:
        # Try to resume existing workflow
        state = app.get_state(config)
        if state.values:
            logging.info(f"Resuming workflow {workflow_id}")
            return app.invoke(None, config=config)
    except Exception:
        pass  # No existing checkpoint

    # Start fresh
    logging.info(f"Starting new workflow {workflow_id}")
    return app.invoke(initial_state, config=config)
python
import logging

async def run_with_recovery(workflow_id: str, initial_state: dict):
    """带自动恢复功能的工作流执行方法。"""
    config = {"configurable": {"thread_id": workflow_id}}

    try:
        # 尝试恢复已有工作流
        state = app.get_state(config)
        if state.values:
            logging.info(f"恢复工作流 {workflow_id}")
            return app.invoke(None, config=config)
    except Exception:
        pass  # 无可用检查点

    # 启动新工作流
    logging.info(f"启动新工作流 {workflow_id}")
    return app.invoke(initial_state, config=config)

Step-by-Step Debugging

分步调试

python
undefined
python
undefined

Execute one node at a time

逐个节点执行

for step in app.stream(initial_state, config): print(f"After {step['node']}: {step['state']}") input("Press Enter to continue...")
for step in app.stream(initial_state, config): print(f"执行完 {step['node']} 后状态: {step['state']}") input("按回车键继续...")

Rollback to previous checkpoint

回滚到上一个检查点

history = list(app.get_state_history(config)) previous_state = history[1] # One step back app.update_state(config, previous_state.values)
undefined
history = list(app.get_state_history(config)) previous_state = history[1] # 回退一步 app.update_state(config, previous_state.values)
undefined

Store vs Checkpointer (2026 Best Practice)

Store 与 Checkpointer 对比(2026最佳实践)

python
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.store.postgres import PostgresStore
python
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.store.postgres import PostgresStore

Checkpointer = SHORT-TERM memory (thread-scoped)

Checkpointer = 短期内存(线程级作用域)

- Conversation history within a session

- 会话内的对话历史

- Workflow state for resume/recovery

- 用于恢复的工作流状态

- Scoped to thread_id

- 作用域限定为thread_id

checkpointer = PostgresSaver.from_conn_string(DATABASE_URL)
checkpointer = PostgresSaver.from_conn_string(DATABASE_URL)

Store = LONG-TERM memory (cross-thread)

Store = 长期内存(跨线程)

- User preferences across sessions

- 跨会话的用户偏好

- Learned facts about users

- 关于用户的已学习信息

- Shared across ALL threads for a user

- 对用户的所有线程共享

store = PostgresStore.from_conn_string(DATABASE_URL)
store = PostgresStore.from_conn_string(DATABASE_URL)

Compile with BOTH for full memory support

同时启用两者以获得完整内存支持

app = workflow.compile( checkpointer=checkpointer, # Thread-scoped state store=store # Cross-thread memory )
undefined
app = workflow.compile( checkpointer=checkpointer, # 线程级状态 store=store # 跨线程内存 )
undefined

Using Store for Cross-Thread Memory

利用Store实现跨线程内存

python
from langgraph.store.base import BaseStore

async def agent_with_memory(state: AgentState, *, store: BaseStore):
    """Agent that remembers across conversations."""
    user_id = state["user_id"]

    # Read cross-thread memory (user preferences)
    memories = await store.aget(namespace=("users", user_id), key="preferences")

    # Use memories in agent logic
    if memories and memories.value.get("prefers_concise"):
        state["system_prompt"] += "\nBe concise in responses."

    # Update cross-thread memory (learned facts)
    await store.aput(
        namespace=("users", user_id),
        key="last_topic",
        value={"topic": state["current_topic"], "timestamp": datetime.now().isoformat()}
    )

    return state
python
from langgraph.store.base import BaseStore

async def agent_with_memory(state: AgentState, *, store: BaseStore):
    """具备跨对话记忆能力的Agent。"""
    user_id = state["user_id"]

    # 读取跨线程内存(用户偏好)
    memories = await store.aget(namespace=("users", user_id), key="preferences")

    # 在Agent逻辑中使用记忆
    if memories and memories.value.get("prefers_concise"):
        state["system_prompt"] += "\n请简洁回复。"

    # 更新跨线程内存(已学习信息)
    await store.aput(
        namespace=("users", user_id),
        key="last_topic",
        value={"topic": state["current_topic"], "timestamp": datetime.now().isoformat()}
    )

    return state

Register node with store access

注册带Store访问权限的节点

workflow.add_node("agent", agent_with_memory)
undefined
workflow.add_node("agent", agent_with_memory)
undefined

Memory Architecture

内存架构

┌─────────────────────────────────────────────────────────────┐
│                    User: alice                               │
├─────────────────────────────────────────────────────────────┤
│  Thread 1 (chat-001)    │  Thread 2 (chat-002)              │
│  ┌─────────────────┐    │  ┌─────────────────┐              │
│  │ Checkpointer    │    │  │ Checkpointer    │              │
│  │ - msg history   │    │  │ - msg history   │              │
│  │ - workflow pos  │    │  │ - workflow pos  │              │
│  └─────────────────┘    │  └─────────────────┘              │
├─────────────────────────────────────────────────────────────┤
│                     Store (cross-thread)                     │
│  namespace=("users", "alice")                                │
│  - preferences: {prefers_concise: true}                     │
│  - last_topic: {topic: "langgraph", timestamp: "..."}       │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│                    用户: alice                               │
├─────────────────────────────────────────────────────────────┤
│  线程1 (chat-001)    │  线程2 (chat-002)              │
│  ┌─────────────────┐    │  ┌─────────────────┐              │
│  │ Checkpointer    │    │  │ Checkpointer    │              │
│  │ - 消息历史      │    │  │ - 消息历史      │              │
│  │ - 工作流位置    │    │  │ - 工作流位置    │              │
│  └─────────────────┘    │  └─────────────────┘              │
├─────────────────────────────────────────────────────────────┤
│                     Store (跨线程)                     │
│  namespace=("users", "alice")                                │
│  - 偏好设置: {prefers_concise: true}                     │
│  - 最后话题: {topic: "langgraph", timestamp: "..."}       │
└─────────────────────────────────────────────────────────────┘

Graph Migrations (2026 Feature)

图迁移(2026新特性)

LangGraph handles topology changes automatically:
python
undefined
LangGraph可自动处理拓扑结构变更:
python
undefined

Safe changes (handled automatically):

安全变更(自动处理):

- Adding new nodes

- 添加新节点

- Removing nodes

- 删除节点

- Renaming nodes

- 重命名节点

- Adding state keys

- 添加状态键

- Removing state keys

- 删除状态键

Works for both active and completed threads

对活跃和已完成线程均生效

Limitation: Cannot remove node if thread is interrupted at that node

限制:若线程在某节点处中断,则无法删除该节点

undefined
undefined

Checkpoint Cleanup Strategies

检查点清理策略

python
from datetime import datetime, timedelta
python
from datetime import datetime, timedelta

Option 1: TTL-based cleanup (configure at DB level)

选项1:基于TTL的清理(数据库层面配置)

CREATE INDEX idx_checkpoints_created ON checkpoints(created_at);

CREATE INDEX idx_checkpoints_created ON checkpoints(created_at);

DELETE FROM checkpoints WHERE created_at < NOW() - INTERVAL '30 days';

DELETE FROM checkpoints WHERE created_at < NOW() - INTERVAL '30 days';

Option 2: Manual cleanup

选项2:手动清理

async def cleanup_old_checkpoints(db, days: int = 30): """Remove checkpoints older than N days.""" cutoff = datetime.now() - timedelta(days=days) await db.execute( "DELETE FROM langgraph_checkpoints WHERE created_at < $1", cutoff )
async def cleanup_old_checkpoints(db, days: int = 30): """删除N天前的旧检查点。""" cutoff = datetime.now() - timedelta(days=days) await db.execute( "DELETE FROM langgraph_checkpoints WHERE created_at < $1", cutoff )

Option 3: Per-thread cleanup

选项3:按线程清理

async def cleanup_thread(db, thread_id: str, keep_latest: int = 10): """Keep only latest N checkpoints per thread.""" await db.execute(""" DELETE FROM langgraph_checkpoints WHERE thread_id = $1 AND id NOT IN ( SELECT id FROM langgraph_checkpoints WHERE thread_id = $1 ORDER BY created_at DESC LIMIT $2 ) """, thread_id, keep_latest)
undefined
async def cleanup_thread(db, thread_id: str, keep_latest: int = 10): """每个线程仅保留最新的N个检查点。""" await db.execute(""" DELETE FROM langgraph_checkpoints WHERE thread_id = $1 AND id NOT IN ( SELECT id FROM langgraph_checkpoints WHERE thread_id = $1 ORDER BY created_at DESC LIMIT $2 ) """, thread_id, keep_latest)
undefined

Key Decisions

关键决策建议

DecisionRecommendation
DevelopmentMemorySaver (fast, no setup)
ProductionPostgresSaver (shared, durable)
Thread IDUse deterministic ID (workflow_id)
Short-term memoryCheckpointer (thread-scoped)
Long-term memoryStore (cross-thread, namespaced)
CleanupTTL-based or keep-latest-N per thread
MigrationsAutomatic for topology changes
决策场景推荐方案
开发环境MemorySaver(速度快,无需配置)
生产环境PostgresSaver(可共享,持久化)
线程ID使用确定性ID(如workflow_id)
短期内存Checkpointer(线程级作用域)
长期内存Store(跨线程,带命名空间)
清理策略基于TTL或每个线程保留最新N个
迁移拓扑结构变更自动处理

Common Mistakes

常见错误

  • No checkpointer in production (lose progress)
  • Random thread IDs (can't resume)
  • Not handling missing checkpoints
  • Using only checkpointer for user preferences (lost across threads)
  • Not using namespaces in Store (data collisions)
  • Not cleaning up old checkpoints (database bloat)
  • Removing nodes while threads are interrupted at them
  • 生产环境未配置检查点(会丢失进度)
  • 使用随机线程ID(无法恢复)
  • 未处理检查点缺失的情况
  • 仅用Checkpointer存储用户偏好(跨线程会丢失)
  • Store未使用命名空间(数据冲突)
  • 未清理旧检查点(数据库膨胀)
  • 在线程于某节点中断时删除该节点

Evaluations

评估测试

See references/evaluations.md for test cases.
查看references/evaluations.md获取测试用例。

Related Skills

相关技能

  • langgraph-state
    - State schemas that persist well with checkpointing
  • langgraph-human-in-loop
    - Interrupt patterns that leverage checkpoints
  • langgraph-supervisor
    - Checkpoint supervisor progress for fault tolerance
  • langgraph-streaming
    - Stream checkpoint updates to clients
  • langgraph-functional
    - Functional API with automatic checkpointing
  • database-schema-designer
    - PostgreSQL checkpoint table setup
  • langgraph-state
    - 适配检查点持久化的状态模式
  • langgraph-human-in-loop
    - 利用检查点的中断模式
  • langgraph-supervisor
    - 检查点监控以实现容错
  • langgraph-streaming
    - 向客户端推送检查点更新
  • langgraph-functional
    - 带自动检查点的函数式API
  • database-schema-designer
    - PostgreSQL检查点表配置

Capability Details

能力细节

checkpoint-saving

checkpoint-saving

Keywords: save checkpoint, checkpoint, persist state, save state Solves:
  • Save workflow state at key points
  • Implement checkpoint strategies
  • Handle checkpoint serialization
关键词: 保存检查点, checkpoint, 持久化状态, 保存状态 解决问题:
  • 在关键节点保存工作流状态
  • 实现检查点策略
  • 处理检查点序列化

checkpoint-loading

checkpoint-loading

Keywords: load checkpoint, restore, resume, recovery Solves:
  • Resume workflows from checkpoints
  • Implement state recovery
  • Handle checkpoint versioning
关键词: 加载检查点, 恢复, resume, 故障恢复 解决问题:
  • 从检查点恢复工作流
  • 实现状态恢复
  • 处理检查点版本管理

memory-backends

memory-backends

Keywords: memory backend, MemorySaver, SqliteSaver, PostgresSaver Solves:
  • Configure checkpoint storage backends
  • Choose between memory/SQLite/Postgres
  • Implement custom checkpoint storage
关键词: 内存后端, MemorySaver, SqliteSaver, PostgresSaver 解决问题:
  • 配置检查点存储后端
  • 在内存/SQLite/Postgres间选择
  • 实现自定义检查点存储

async-checkpoints

async-checkpoints

Keywords: async checkpoint, AsyncSqliteSaver, async persistence Solves:
  • Implement async checkpoint operations
  • Handle concurrent checkpoint access
  • Optimize checkpoint performance
关键词: 异步检查点, AsyncSqliteSaver, 异步持久化 解决问题:
  • 实现异步检查点操作
  • 处理并发检查点访问
  • 优化检查点性能

conversation-history

conversation-history

Keywords: conversation, history, message history, thread Solves:
  • Persist conversation history
  • Implement thread-based checkpoints
  • Manage conversation state
关键词: 对话, 历史, 消息历史, thread 解决问题:
  • 持久化对话历史
  • 实现基于线程的检查点
  • 管理对话状态