langgraph-checkpoints
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseLangGraph Checkpointing
LangGraph 检查点机制
Persist workflow state for recovery and debugging.
持久化工作流状态以实现恢复与调试。
Checkpointer Options
检查点选项
python
from langgraph.checkpoint import MemorySaver
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.checkpoint.postgres import PostgresSaverpython
from langgraph.checkpoint import MemorySaver
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.checkpoint.postgres import PostgresSaverDevelopment: In-memory
开发环境:内存存储
memory = MemorySaver()
app = workflow.compile(checkpointer=memory)
memory = MemorySaver()
app = workflow.compile(checkpointer=memory)
Production: SQLite
生产环境:SQLite
checkpointer = SqliteSaver.from_conn_string("checkpoints.db")
app = workflow.compile(checkpointer=checkpointer)
checkpointer = SqliteSaver.from_conn_string("checkpoints.db")
app = workflow.compile(checkpointer=checkpointer)
Production: PostgreSQL
生产环境:PostgreSQL
checkpointer = PostgresSaver.from_conn_string("postgresql://...")
app = workflow.compile(checkpointer=checkpointer)
undefinedcheckpointer = PostgresSaver.from_conn_string("postgresql://...")
app = workflow.compile(checkpointer=checkpointer)
undefinedUsing Thread IDs
使用线程ID
python
undefinedpython
undefinedStart new workflow
启动新工作流
config = {"configurable": {"thread_id": "analysis-123"}}
result = app.invoke(initial_state, config=config)
config = {"configurable": {"thread_id": "analysis-123"}}
result = app.invoke(initial_state, config=config)
Resume interrupted workflow
恢复中断的工作流
config = {"configurable": {"thread_id": "analysis-123"}}
result = app.invoke(None, config=config) # Resumes from checkpoint
undefinedconfig = {"configurable": {"thread_id": "analysis-123"}}
result = app.invoke(None, config=config) # 从检查点恢复
undefinedPostgreSQL Setup
PostgreSQL 配置
python
def create_checkpointer():
"""Create PostgreSQL checkpointer for production."""
return PostgresSaver.from_conn_string(
settings.DATABASE_URL,
save_every=1 # Save after each node
)python
def create_checkpointer():
"""为生产环境创建PostgreSQL检查点工具。"""
return PostgresSaver.from_conn_string(
settings.DATABASE_URL,
save_every=1 # 每个节点执行后保存
)Compile with checkpointing
启用检查点机制编译工作流
app = workflow.compile(
checkpointer=create_checkpointer(),
interrupt_before=["quality_gate"] # Manual review point
)
undefinedapp = workflow.compile(
checkpointer=create_checkpointer(),
interrupt_before=["quality_gate"] # 人工审核节点
)
undefinedInspecting Checkpoints
检查点查看
python
undefinedpython
undefinedGet all checkpoints for a workflow
获取工作流的所有检查点
checkpoints = app.get_state_history(config)
for checkpoint in checkpoints:
print(f"Step: {checkpoint.metadata['step']}")
print(f"Node: {checkpoint.metadata['source']}")
print(f"State: {checkpoint.values}")
checkpoints = app.get_state_history(config)
for checkpoint in checkpoints:
print(f"步骤: {checkpoint.metadata['step']}")
print(f"节点: {checkpoint.metadata['source']}")
print(f"状态: {checkpoint.values}")
Get current state
获取当前状态
current = app.get_state(config)
print(current.values)
undefinedcurrent = app.get_state(config)
print(current.values)
undefinedResuming After Crash
崩溃后恢复
python
import logging
async def run_with_recovery(workflow_id: str, initial_state: dict):
"""Run workflow with automatic recovery."""
config = {"configurable": {"thread_id": workflow_id}}
try:
# Try to resume existing workflow
state = app.get_state(config)
if state.values:
logging.info(f"Resuming workflow {workflow_id}")
return app.invoke(None, config=config)
except Exception:
pass # No existing checkpoint
# Start fresh
logging.info(f"Starting new workflow {workflow_id}")
return app.invoke(initial_state, config=config)python
import logging
async def run_with_recovery(workflow_id: str, initial_state: dict):
"""带自动恢复功能的工作流执行方法。"""
config = {"configurable": {"thread_id": workflow_id}}
try:
# 尝试恢复已有工作流
state = app.get_state(config)
if state.values:
logging.info(f"恢复工作流 {workflow_id}")
return app.invoke(None, config=config)
except Exception:
pass # 无可用检查点
# 启动新工作流
logging.info(f"启动新工作流 {workflow_id}")
return app.invoke(initial_state, config=config)Step-by-Step Debugging
分步调试
python
undefinedpython
undefinedExecute one node at a time
逐个节点执行
for step in app.stream(initial_state, config):
print(f"After {step['node']}: {step['state']}")
input("Press Enter to continue...")
for step in app.stream(initial_state, config):
print(f"执行完 {step['node']} 后状态: {step['state']}")
input("按回车键继续...")
Rollback to previous checkpoint
回滚到上一个检查点
history = list(app.get_state_history(config))
previous_state = history[1] # One step back
app.update_state(config, previous_state.values)
undefinedhistory = list(app.get_state_history(config))
previous_state = history[1] # 回退一步
app.update_state(config, previous_state.values)
undefinedStore vs Checkpointer (2026 Best Practice)
Store 与 Checkpointer 对比(2026最佳实践)
python
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.store.postgres import PostgresStorepython
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.store.postgres import PostgresStoreCheckpointer = SHORT-TERM memory (thread-scoped)
Checkpointer = 短期内存(线程级作用域)
- Conversation history within a session
- 会话内的对话历史
- Workflow state for resume/recovery
- 用于恢复的工作流状态
- Scoped to thread_id
- 作用域限定为thread_id
checkpointer = PostgresSaver.from_conn_string(DATABASE_URL)
checkpointer = PostgresSaver.from_conn_string(DATABASE_URL)
Store = LONG-TERM memory (cross-thread)
Store = 长期内存(跨线程)
- User preferences across sessions
- 跨会话的用户偏好
- Learned facts about users
- 关于用户的已学习信息
- Shared across ALL threads for a user
- 对用户的所有线程共享
store = PostgresStore.from_conn_string(DATABASE_URL)
store = PostgresStore.from_conn_string(DATABASE_URL)
Compile with BOTH for full memory support
同时启用两者以获得完整内存支持
app = workflow.compile(
checkpointer=checkpointer, # Thread-scoped state
store=store # Cross-thread memory
)
undefinedapp = workflow.compile(
checkpointer=checkpointer, # 线程级状态
store=store # 跨线程内存
)
undefinedUsing Store for Cross-Thread Memory
利用Store实现跨线程内存
python
from langgraph.store.base import BaseStore
async def agent_with_memory(state: AgentState, *, store: BaseStore):
"""Agent that remembers across conversations."""
user_id = state["user_id"]
# Read cross-thread memory (user preferences)
memories = await store.aget(namespace=("users", user_id), key="preferences")
# Use memories in agent logic
if memories and memories.value.get("prefers_concise"):
state["system_prompt"] += "\nBe concise in responses."
# Update cross-thread memory (learned facts)
await store.aput(
namespace=("users", user_id),
key="last_topic",
value={"topic": state["current_topic"], "timestamp": datetime.now().isoformat()}
)
return statepython
from langgraph.store.base import BaseStore
async def agent_with_memory(state: AgentState, *, store: BaseStore):
"""具备跨对话记忆能力的Agent。"""
user_id = state["user_id"]
# 读取跨线程内存(用户偏好)
memories = await store.aget(namespace=("users", user_id), key="preferences")
# 在Agent逻辑中使用记忆
if memories and memories.value.get("prefers_concise"):
state["system_prompt"] += "\n请简洁回复。"
# 更新跨线程内存(已学习信息)
await store.aput(
namespace=("users", user_id),
key="last_topic",
value={"topic": state["current_topic"], "timestamp": datetime.now().isoformat()}
)
return stateRegister node with store access
注册带Store访问权限的节点
workflow.add_node("agent", agent_with_memory)
undefinedworkflow.add_node("agent", agent_with_memory)
undefinedMemory Architecture
内存架构
┌─────────────────────────────────────────────────────────────┐
│ User: alice │
├─────────────────────────────────────────────────────────────┤
│ Thread 1 (chat-001) │ Thread 2 (chat-002) │
│ ┌─────────────────┐ │ ┌─────────────────┐ │
│ │ Checkpointer │ │ │ Checkpointer │ │
│ │ - msg history │ │ │ - msg history │ │
│ │ - workflow pos │ │ │ - workflow pos │ │
│ └─────────────────┘ │ └─────────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ Store (cross-thread) │
│ namespace=("users", "alice") │
│ - preferences: {prefers_concise: true} │
│ - last_topic: {topic: "langgraph", timestamp: "..."} │
└─────────────────────────────────────────────────────────────┘┌─────────────────────────────────────────────────────────────┐
│ 用户: alice │
├─────────────────────────────────────────────────────────────┤
│ 线程1 (chat-001) │ 线程2 (chat-002) │
│ ┌─────────────────┐ │ ┌─────────────────┐ │
│ │ Checkpointer │ │ │ Checkpointer │ │
│ │ - 消息历史 │ │ │ - 消息历史 │ │
│ │ - 工作流位置 │ │ │ - 工作流位置 │ │
│ └─────────────────┘ │ └─────────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ Store (跨线程) │
│ namespace=("users", "alice") │
│ - 偏好设置: {prefers_concise: true} │
│ - 最后话题: {topic: "langgraph", timestamp: "..."} │
└─────────────────────────────────────────────────────────────┘Graph Migrations (2026 Feature)
图迁移(2026新特性)
LangGraph handles topology changes automatically:
python
undefinedLangGraph可自动处理拓扑结构变更:
python
undefinedSafe changes (handled automatically):
安全变更(自动处理):
- Adding new nodes
- 添加新节点
- Removing nodes
- 删除节点
- Renaming nodes
- 重命名节点
- Adding state keys
- 添加状态键
- Removing state keys
- 删除状态键
Works for both active and completed threads
对活跃和已完成线程均生效
Limitation: Cannot remove node if thread is interrupted at that node
限制:若线程在某节点处中断,则无法删除该节点
undefinedundefinedCheckpoint Cleanup Strategies
检查点清理策略
python
from datetime import datetime, timedeltapython
from datetime import datetime, timedeltaOption 1: TTL-based cleanup (configure at DB level)
选项1:基于TTL的清理(数据库层面配置)
CREATE INDEX idx_checkpoints_created ON checkpoints(created_at);
CREATE INDEX idx_checkpoints_created ON checkpoints(created_at);
DELETE FROM checkpoints WHERE created_at < NOW() - INTERVAL '30 days';
DELETE FROM checkpoints WHERE created_at < NOW() - INTERVAL '30 days';
Option 2: Manual cleanup
选项2:手动清理
async def cleanup_old_checkpoints(db, days: int = 30):
"""Remove checkpoints older than N days."""
cutoff = datetime.now() - timedelta(days=days)
await db.execute(
"DELETE FROM langgraph_checkpoints WHERE created_at < $1",
cutoff
)
async def cleanup_old_checkpoints(db, days: int = 30):
"""删除N天前的旧检查点。"""
cutoff = datetime.now() - timedelta(days=days)
await db.execute(
"DELETE FROM langgraph_checkpoints WHERE created_at < $1",
cutoff
)
Option 3: Per-thread cleanup
选项3:按线程清理
async def cleanup_thread(db, thread_id: str, keep_latest: int = 10):
"""Keep only latest N checkpoints per thread."""
await db.execute("""
DELETE FROM langgraph_checkpoints
WHERE thread_id = $1
AND id NOT IN (
SELECT id FROM langgraph_checkpoints
WHERE thread_id = $1
ORDER BY created_at DESC
LIMIT $2
)
""", thread_id, keep_latest)
undefinedasync def cleanup_thread(db, thread_id: str, keep_latest: int = 10):
"""每个线程仅保留最新的N个检查点。"""
await db.execute("""
DELETE FROM langgraph_checkpoints
WHERE thread_id = $1
AND id NOT IN (
SELECT id FROM langgraph_checkpoints
WHERE thread_id = $1
ORDER BY created_at DESC
LIMIT $2
)
""", thread_id, keep_latest)
undefinedKey Decisions
关键决策建议
| Decision | Recommendation |
|---|---|
| Development | MemorySaver (fast, no setup) |
| Production | PostgresSaver (shared, durable) |
| Thread ID | Use deterministic ID (workflow_id) |
| Short-term memory | Checkpointer (thread-scoped) |
| Long-term memory | Store (cross-thread, namespaced) |
| Cleanup | TTL-based or keep-latest-N per thread |
| Migrations | Automatic for topology changes |
| 决策场景 | 推荐方案 |
|---|---|
| 开发环境 | MemorySaver(速度快,无需配置) |
| 生产环境 | PostgresSaver(可共享,持久化) |
| 线程ID | 使用确定性ID(如workflow_id) |
| 短期内存 | Checkpointer(线程级作用域) |
| 长期内存 | Store(跨线程,带命名空间) |
| 清理策略 | 基于TTL或每个线程保留最新N个 |
| 迁移 | 拓扑结构变更自动处理 |
Common Mistakes
常见错误
- No checkpointer in production (lose progress)
- Random thread IDs (can't resume)
- Not handling missing checkpoints
- Using only checkpointer for user preferences (lost across threads)
- Not using namespaces in Store (data collisions)
- Not cleaning up old checkpoints (database bloat)
- Removing nodes while threads are interrupted at them
- 生产环境未配置检查点(会丢失进度)
- 使用随机线程ID(无法恢复)
- 未处理检查点缺失的情况
- 仅用Checkpointer存储用户偏好(跨线程会丢失)
- Store未使用命名空间(数据冲突)
- 未清理旧检查点(数据库膨胀)
- 在线程于某节点中断时删除该节点
Evaluations
评估测试
See references/evaluations.md for test cases.
查看references/evaluations.md获取测试用例。
Related Skills
相关技能
- - State schemas that persist well with checkpointing
langgraph-state - - Interrupt patterns that leverage checkpoints
langgraph-human-in-loop - - Checkpoint supervisor progress for fault tolerance
langgraph-supervisor - - Stream checkpoint updates to clients
langgraph-streaming - - Functional API with automatic checkpointing
langgraph-functional - - PostgreSQL checkpoint table setup
database-schema-designer
- - 适配检查点持久化的状态模式
langgraph-state - - 利用检查点的中断模式
langgraph-human-in-loop - - 检查点监控以实现容错
langgraph-supervisor - - 向客户端推送检查点更新
langgraph-streaming - - 带自动检查点的函数式API
langgraph-functional - - PostgreSQL检查点表配置
database-schema-designer
Capability Details
能力细节
checkpoint-saving
checkpoint-saving
Keywords: save checkpoint, checkpoint, persist state, save state
Solves:
- Save workflow state at key points
- Implement checkpoint strategies
- Handle checkpoint serialization
关键词: 保存检查点, checkpoint, 持久化状态, 保存状态
解决问题:
- 在关键节点保存工作流状态
- 实现检查点策略
- 处理检查点序列化
checkpoint-loading
checkpoint-loading
Keywords: load checkpoint, restore, resume, recovery
Solves:
- Resume workflows from checkpoints
- Implement state recovery
- Handle checkpoint versioning
关键词: 加载检查点, 恢复, resume, 故障恢复
解决问题:
- 从检查点恢复工作流
- 实现状态恢复
- 处理检查点版本管理
memory-backends
memory-backends
Keywords: memory backend, MemorySaver, SqliteSaver, PostgresSaver
Solves:
- Configure checkpoint storage backends
- Choose between memory/SQLite/Postgres
- Implement custom checkpoint storage
关键词: 内存后端, MemorySaver, SqliteSaver, PostgresSaver
解决问题:
- 配置检查点存储后端
- 在内存/SQLite/Postgres间选择
- 实现自定义检查点存储
async-checkpoints
async-checkpoints
Keywords: async checkpoint, AsyncSqliteSaver, async persistence
Solves:
- Implement async checkpoint operations
- Handle concurrent checkpoint access
- Optimize checkpoint performance
关键词: 异步检查点, AsyncSqliteSaver, 异步持久化
解决问题:
- 实现异步检查点操作
- 处理并发检查点访问
- 优化检查点性能
conversation-history
conversation-history
Keywords: conversation, history, message history, thread
Solves:
- Persist conversation history
- Implement thread-based checkpoints
- Manage conversation state
关键词: 对话, 历史, 消息历史, thread
解决问题:
- 持久化对话历史
- 实现基于线程的检查点
- 管理对话状态