langgraph-persistence
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
Chinese<overview>
LangGraph's persistence layer enables durable execution by checkpointing graph state:
</checkpointer-selection>
- Checkpointer: Saves/loads graph state at every super-step
- Thread ID: Identifies separate checkpoint sequences (conversations)
- Store: Cross-thread memory for user preferences, facts
Two memory types:
- Short-term (checkpointer): Thread-scoped conversation history
- Long-term (store): Cross-thread user preferences, facts </overview>
| Checkpointer | Use Case | Production Ready |
|---|---|---|
| Testing, development | No |
| Local development | Partial |
| Production | Yes |
<overview>
LangGraph的持久化层通过为图状态创建检查点,实现可持久化执行:
</checkpointer-selection>
- Checkpointer:在每个超级步骤保存/加载图状态
- Thread ID:标识独立的检查点序列(即对话)
- Store:用于存储用户偏好、事实信息的跨线程内存
两种内存类型:
- 短期内存(由Checkpointer实现):线程级别的对话历史
- 长期内存(由Store实现):跨线程的用户偏好、事实信息 </overview>
| Checkpointer | 使用场景 | 生产环境可用 |
|---|---|---|
| 测试、开发阶段 | 否 |
| 本地开发 | 部分支持 |
| 生产环境 | 是 |
Checkpointer Setup
Checkpointer 配置
<ex-basic-persistence>
<python>
Set up a basic graph with in-memory checkpointing and thread-based state persistence.
```python
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict, Annotated
import operator
class State(TypedDict):
messages: Annotated[list, operator.add]
def add_message(state: State) -> dict:
return {"messages": ["Bot response"]}
checkpointer = InMemorySaver()
graph = (
StateGraph(State)
.add_node("respond", add_message)
.add_edge(START, "respond")
.add_edge("respond", END)
.compile(checkpointer=checkpointer) # Pass at compile time
)
<ex-basic-persistence>
<python>
设置一个基于内存检查点和线程级状态持久化的基础图。
```python
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict, Annotated
import operator
class State(TypedDict):
messages: Annotated[list, operator.add]
def add_message(state: State) -> dict:
return {"messages": ["Bot response"]}
checkpointer = InMemorySaver()
graph = (
StateGraph(State)
.add_node("respond", add_message)
.add_edge(START, "respond")
.add_edge("respond", END)
.compile(checkpointer=checkpointer) # Pass at compile time
)
ALWAYS provide thread_id
ALWAYS provide thread_id
config = {"configurable": {"thread_id": "conversation-1"}}
result1 = graph.invoke({"messages": ["Hello"]}, config)
print(len(result1["messages"])) # 2
result2 = graph.invoke({"messages": ["How are you?"]}, config)
print(len(result2["messages"])) # 4 (previous + new)
</python>
<typescript>
Set up a basic graph with in-memory checkpointing and thread-based state persistence.
```typescript
import { MemorySaver, StateGraph, StateSchema, MessagesValue, START, END } from "@langchain/langgraph";
import { HumanMessage } from "@langchain/core/messages";
const State = new StateSchema({ messages: MessagesValue });
const addMessage = async (state: typeof State.State) => {
return { messages: [{ role: "assistant", content: "Bot response" }] };
};
const checkpointer = new MemorySaver();
const graph = new StateGraph(State)
.addNode("respond", addMessage)
.addEdge(START, "respond")
.addEdge("respond", END)
.compile({ checkpointer });
// ALWAYS provide thread_id
const config = { configurable: { thread_id: "conversation-1" } };
const result1 = await graph.invoke({ messages: [new HumanMessage("Hello")] }, config);
console.log(result1.messages.length); // 2
const result2 = await graph.invoke({ messages: [new HumanMessage("How are you?")] }, config);
console.log(result2.messages.length); // 4 (previous + new)with PostgresSaver.from_conn_string(
"postgresql://user:pass@localhost/db"
) as checkpointer:
checkpointer.setup() # only needed on first use to create tables
graph = builder.compile(checkpointer=checkpointer)
</python>
<typescript>
Configure PostgreSQL-backed checkpointing for production deployments.
```typescript
import { PostgresSaver } from "@langchain/langgraph-checkpoint-postgres";
const checkpointer = PostgresSaver.fromConnString(
"postgresql://user:pass@localhost/db"
);
await checkpointer.setup(); // only needed on first use to create tables
const graph = builder.compile({ checkpointer });config = {"configurable": {"thread_id": "conversation-1"}}
result1 = graph.invoke({"messages": ["Hello"]}, config)
print(len(result1["messages"])) # 2
result2 = graph.invoke({"messages": ["How are you?"]}, config)
print(len(result2["messages"])) # 4 (previous + new)
</python>
<typescript>
设置一个基于内存检查点和线程级状态持久化的基础图。
```typescript
import { MemorySaver, StateGraph, StateSchema, MessagesValue, START, END } from "@langchain/langgraph";
import { HumanMessage } from "@langchain/core/messages";
const State = new StateSchema({ messages: MessagesValue });
const addMessage = async (state: typeof State.State) => {
return { messages: [{ role: "assistant", content: "Bot response" }] };
};
const checkpointer = new MemorySaver();
const graph = new StateGraph(State)
.addNode("respond", addMessage)
.addEdge(START, "respond")
.addEdge("respond", END)
.compile({ checkpointer });
// ALWAYS provide thread_id
const config = { configurable: { thread_id: "conversation-1" } };
const result1 = await graph.invoke({ messages: [new HumanMessage("Hello")] }, config);
console.log(result1.messages.length); // 2
const result2 = await graph.invoke({ messages: [new HumanMessage("How are you?")] }, config);
console.log(result2.messages.length); // 4 (previous + new)with PostgresSaver.from_conn_string(
"postgresql://user:pass@localhost/db"
) as checkpointer:
checkpointer.setup() # only needed on first use to create tables
graph = builder.compile(checkpointer=checkpointer)
</python>
<typescript>
为生产环境部署配置基于PostgreSQL的检查点持久化。
```typescript
import { PostgresSaver } from "@langchain/langgraph-checkpoint-postgres";
const checkpointer = PostgresSaver.fromConnString(
"postgresql://user:pass@localhost/db"
);
await checkpointer.setup(); // only needed on first use to create tables
const graph = builder.compile({ checkpointer });Thread Management
线程管理
<ex-separate-threads>
<python>
Demonstrate isolated state between different thread IDs.
```python
<ex-separate-threads>
<python>
演示不同线程ID之间的状态隔离。
```python
Different threads maintain separate state
Different threads maintain separate state
alice_config = {"configurable": {"thread_id": "user-alice"}}
bob_config = {"configurable": {"thread_id": "user-bob"}}
graph.invoke({"messages": ["Hi from Alice"]}, alice_config)
graph.invoke({"messages": ["Hi from Bob"]}, bob_config)
alice_config = {"configurable": {"thread_id": "user-alice"}}
bob_config = {"configurable": {"thread_id": "user-bob"}}
graph.invoke({"messages": ["Hi from Alice"]}, alice_config)
graph.invoke({"messages": ["Hi from Bob"]}, bob_config)
Alice's state is isolated from Bob's
Alice's state is isolated from Bob's
</python>
<typescript>
Demonstrate isolated state between different thread IDs.
```typescript
// Different threads maintain separate state
const aliceConfig = { configurable: { thread_id: "user-alice" } };
const bobConfig = { configurable: { thread_id: "user-bob" } };
await graph.invoke({ messages: [new HumanMessage("Hi from Alice")] }, aliceConfig);
await graph.invoke({ messages: [new HumanMessage("Hi from Bob")] }, bobConfig);
// Alice's state is isolated from Bob's</python>
<typescript>
演示不同线程ID之间的状态隔离。
```typescript
// Different threads maintain separate state
const aliceConfig = { configurable: {"thread_id": "user-alice" } };
const bobConfig = { configurable: {"thread_id": "user-bob" } };
await graph.invoke({ messages: [new HumanMessage("Hi from Alice")] }, aliceConfig);
await graph.invoke({ messages: [new HumanMessage("Hi from Bob")] }, bobConfig);
// Alice's state is isolated from Bob'sState History & Time Travel
状态历史与回溯
<ex-resume-from-checkpoint>
<python>
Time travel: browse checkpoint history and replay or fork from a past state.
```python
config = {"configurable": {"thread_id": "session-1"}}
result = graph.invoke({"messages": ["start"]}, config)
<ex-resume-from-checkpoint>
<python>
状态回溯:浏览检查点历史,并重放或从历史状态分支执行。
```python
config = {"configurable": {"thread_id": "session-1"}}
result = graph.invoke({"messages": ["start"]}, config)
Browse checkpoint history
Browse checkpoint history
states = list(graph.get_state_history(config))
states = list(graph.get_state_history(config))
Replay from a past checkpoint
Replay from a past checkpoint
past = states[-2]
result = graph.invoke(None, past.config) # None = resume from checkpoint
past = states[-2]
result = graph.invoke(None, past.config) # None = resume from checkpoint
Or fork: update state at a past checkpoint, then resume
Or fork: update state at a past checkpoint, then resume
fork_config = graph.update_state(past.config, {"messages": ["edited"]})
result = graph.invoke(None, fork_config)
</python>
<typescript>
Time travel: browse checkpoint history and replay or fork from a past state.
```typescript
const config = { configurable: { thread_id: "session-1" } };
const result = await graph.invoke({ messages: ["start"] }, config);
// Browse checkpoint history (async iterable, collect to array)
const states: Awaited<ReturnType<typeof graph.getState>>[] = [];
for await (const state of graph.getStateHistory(config)) {
states.push(state);
}
// Replay from a past checkpoint
const past = states[states.length - 2];
const replayed = await graph.invoke(null, past.config); // null = resume from checkpoint
// Or fork: update state at a past checkpoint, then resume
const forkConfig = await graph.updateState(past.config, { messages: ["edited"] });
const forked = await graph.invoke(null, forkConfig);fork_config = graph.update_state(past.config, {"messages": ["edited"]})
result = graph.invoke(None, fork_config)
</python>
<typescript>
状态回溯:浏览检查点历史,并重放或从历史状态分支执行。
```typescript
const config = { configurable: {"thread_id": "session-1" } };
const result = await graph.invoke({ messages: ["start"] }, config);
// Browse checkpoint history (async iterable, collect to array)
const states: Awaited<ReturnType<typeof graph.getState>>[] = [];
for await (const state of graph.getStateHistory(config)) {
states.push(state);
}
// Replay from a past checkpoint
const past = states[states.length - 2];
const replayed = await graph.invoke(null, past.config); // null = resume from checkpoint
// Or fork: update state at a past checkpoint, then resume
const forkConfig = await graph.updateState(past.config, { messages: ["edited"] });
const forked = await graph.invoke(null, forkConfig);// Modify state before resuming
graph.update_state(config, {"data": "manually_updated"})
// Resume with updated state
result = graph.invoke(None, config)
</python>
<typescript>
在继续执行前手动更新图状态。
```typescript
const config = { configurable: {"thread_id": "session-1" } };
// Modify state before resuming
await graph.updateState(config, { data: "manually_updated" });
// Resume with updated state
const result = await graph.invoke(null, config);Modify state before resuming
子图检查点作用域
graph.update_state(config, {"data": "manually_updated"})
编译子图时,参数控制持久化行为。这对于使用中断、需要多轮次内存或并行运行的子图至关重要。
<subgraph-checkpointer-scoping-table>
checkpointer| 特性 | | | |
|---|---|---|---|
| 中断(HITL) | 否 | 是 | 是 |
| 多轮次内存 | 否 | 否 | 是 |
| 多调用(不同子图) | 是 | 是 | 警告(可能存在命名空间冲突) |
| 多调用(同一子图) | 是 | 是 | 否 |
| 状态检查 | 否 | 警告(仅当前调用可用) | 是 |
Resume with updated state
各模式适用场景
result = graph.invoke(None, config)
</python>
<typescript>
Manually update graph state before resuming execution.
```typescript
const config = { configurable: { thread_id: "session-1" } };
// Modify state before resuming
await graph.updateState(config, { data: "manually_updated" });
// Resume with updated state
const result = await graph.invoke(null, config);- — 子图不需要中断或持久化。最简单的选项,无检查点开销。
checkpointer=False - (默认/省略
None) — 子图需要checkpointer但不需要多轮次内存。每次调用都会从头开始,但可以暂停/恢复。并行执行可行,因为每次调用都会获得唯一的命名空间。interrupt() - — 子图需要在多次调用间保留状态(多轮对话)。每次调用都会从上次结束的位置继续。
checkpointer=True
警告:有状态子图()不支持在单个节点内并行调用同一子图实例 — 调用会写入同一检查点命名空间,导致冲突。
</warning-stateful-subgraphs-parallel>
<ex-subgraph-checkpointer-modes>
<python>
为你的子图选择合适的检查点模式。
```pythoncheckpointer=TrueSubgraph Checkpointer Scoping
No interrupts needed — opt out of checkpointing
When compiling a subgraph, the parameter controls persistence behavior. This is critical for subgraphs that use interrupts, need multi-turn memory, or run in parallel.
<subgraph-checkpointer-scoping-table>
checkpointer| Feature | | | |
|---|---|---|---|
| Interrupts (HITL) | No | Yes | Yes |
| Multi-turn memory | No | No | Yes |
| Multiple calls (different subgraphs) | Yes | Yes | Warning (namespace conflicts possible) |
| Multiple calls (same subgraph) | Yes | Yes | No |
| State inspection | No | Warning (current invocation only) | Yes |
subgraph = subgraph_builder.compile(checkpointer=False)
When to use each mode
Need interrupts but not cross-invocation persistence (default)
- — Subgraph doesn't need interrupts or persistence. Simplest option, no checkpoint overhead.
checkpointer=False - (default / omit
None) — Subgraph needscheckpointerbut not multi-turn memory. Each invocation starts fresh but can pause/resume. Parallel execution works because each invocation gets a unique namespace.interrupt() - — Subgraph needs to remember state across invocations (multi-turn conversations). Each call picks up where the last left off.
checkpointer=True
Warning: Stateful subgraphs () do NOT support calling the same subgraph instance multiple times within a single node — the calls write to the same checkpoint namespace and conflict.
</warning-stateful-subgraphs-parallel>
<ex-subgraph-checkpointer-modes>
<python>
Choose the right checkpointer mode for your subgraph.
```pythoncheckpointer=Truesubgraph = subgraph_builder.compile()
No interrupts needed — opt out of checkpointing
Need cross-invocation persistence (stateful)
subgraph = subgraph_builder.compile(checkpointer=False)
subgraph = subgraph_builder.compile(checkpointer=True)
</python>
<typescript>
为你的子图选择合适的检查点模式。
```typescript
// No interrupts needed — opt out of checkpointing
const subgraph = subgraphBuilder.compile({ checkpointer: false });
// Need interrupts but not cross-invocation persistence (default)
const subgraph = subgraphBuilder.compile();
// Need cross-invocation persistence (stateful)
const subgraph = subgraphBuilder.compile({ checkpointer: true });Need interrupts but not cross-invocation persistence (default)
并行子图命名空间
subgraph = subgraph_builder.compile()
当多个不同的有状态子图并行运行时,将每个子图包装在带有唯一节点名称的中,以实现稳定的命名空间隔离:
<python>
```python
from langgraph.graph import MessagesState, StateGraph
StateGraphdef create_sub_agent(model, *, name, **kwargs):
"""Wrap an agent with a unique node name for namespace isolation."""
agent = create_agent(model=model, name=name, **kwargs)
return (
StateGraph(MessagesState)
.add_node(name, agent) # unique name -> stable namespace
.add_edge("start", name)
.compile()
)
fruit_agent = create_sub_agent(
"gpt-4.1-mini", name="fruit_agent",
tools=[fruit_info], prompt="...", checkpointer=True,
)
veggie_agent = create_sub_agent(
"gpt-4.1-mini", name="veggie_agent",
tools=[veggie_info], prompt="...", checkpointer=True,
)
</python>
<typescript>
```typescript
import { StateGraph, StateSchema, MessagesValue, START } from "@langchain/langgraph";
function createSubAgent(model: string, { name, ...kwargs }: { name: string; [key: string]: any }) {
const agent = createAgent({ model, name, ...kwargs });
return new StateGraph(new StateSchema({ messages: MessagesValue }))
.addNode(name, agent) // unique name -> stable namespace
.addEdge(START, name)
.compile();
}
const fruitAgent = createSubAgent("gpt-4.1-mini", {
name: "fruit_agent", tools: [fruitInfo], prompt: "...", checkpointer: true,
});
const veggieAgent = createSubAgent("gpt-4.1-mini", {
name: "veggie_agent", tools: [veggieInfo], prompt: "...", checkpointer: true,
});注意:通过添加为节点的子图会自动获得基于名称的命名空间,无需此包装器。
</parallel-subgraph-namespacing>
add_nodeNeed cross-invocation persistence (stateful)
长期内存(Store)
subgraph = subgraph_builder.compile(checkpointer=True)
</python>
<typescript>
Choose the right checkpointer mode for your subgraph.
```typescript
// No interrupts needed — opt out of checkpointing
const subgraph = subgraphBuilder.compile({ checkpointer: false });
// Need interrupts but not cross-invocation persistence (default)
const subgraph = subgraphBuilder.compile();
// Need cross-invocation persistence (stateful)
const subgraph = subgraphBuilder.compile({ checkpointer: true });<ex-long-term-memory-store>
<python>
使用Store实现跨线程内存,在多个对话间共享用户偏好。
```python
from langgraph.store.memory import InMemoryStore
store = InMemoryStore()
Parallel subgraph namespacing
Save user preference (available across ALL threads)
When multiple different stateful subgraphs run in parallel, wrap each in its own with a unique node name for stable namespace isolation:
<python>
```python
from langgraph.graph import MessagesState, StateGraph
StateGraphdef create_sub_agent(model, *, name, **kwargs):
"""Wrap an agent with a unique node name for namespace isolation."""
agent = create_agent(model=model, name=name, **kwargs)
return (
StateGraph(MessagesState)
.add_node(name, agent) # unique name -> stable namespace
.add_edge("start", name)
.compile()
)
fruit_agent = create_sub_agent(
"gpt-4.1-mini", name="fruit_agent",
tools=[fruit_info], prompt="...", checkpointer=True,
)
veggie_agent = create_sub_agent(
"gpt-4.1-mini", name="veggie_agent",
tools=[veggie_info], prompt="...", checkpointer=True,
)
</python>
<typescript>
```typescript
import { StateGraph, StateSchema, MessagesValue, START } from "@langchain/langgraph";
function createSubAgent(model: string, { name, ...kwargs }: { name: string; [key: string]: any }) {
const agent = createAgent({ model, name, ...kwargs });
return new StateGraph(new StateSchema({ messages: MessagesValue }))
.addNode(name, agent) // unique name -> stable namespace
.addEdge(START, name)
.compile();
}
const fruitAgent = createSubAgent("gpt-4.1-mini", {
name: "fruit_agent", tools: [fruitInfo], prompt: "...", checkpointer: true,
});
const veggieAgent = createSubAgent("gpt-4.1-mini", {
name: "veggie_agent", tools: [veggieInfo], prompt: "...", checkpointer: true,
});Note: Subgraphs added as nodes (via ) already get name-based namespaces automatically and don't need this wrapper.
</parallel-subgraph-namespacing>
add_nodestore.put(("alice", "preferences"), "language", {"preference": "short responses"})
Long-Term Memory (Store)
Node with store — access via runtime
<ex-long-term-memory-store>
<python>
Use a Store for cross-thread memory to share user preferences across conversations.
```python
from langgraph.store.memory import InMemoryStore
store = InMemoryStore()
from langgraph.runtime import Runtime
def respond(state, runtime: Runtime):
prefs = runtime.store.get((state["user_id"], "preferences"), "language")
return {"response": f"Using preference: {prefs.value}"}
Save user preference (available across ALL threads)
Compile with BOTH checkpointer and store
store.put(("alice", "preferences"), "language", {"preference": "short responses"})
graph = builder.compile(checkpointer=checkpointer, store=store)
Node with store — access via runtime
Both threads access same long-term memory
from langgraph.runtime import Runtime
def respond(state, runtime: Runtime):
prefs = runtime.store.get((state["user_id"], "preferences"), "language")
return {"response": f"Using preference: {prefs.value}"}
graph.invoke({"user_id": "alice"}, {"configurable": {"thread_id": "thread-1"}})
graph.invoke({"user_id": "alice"}, {"configurable": {"thread_id": "thread-2"}}) # Same preferences!
</python>
<typescript>
使用Store实现跨线程内存,在多个对话间共享用户偏好。
```typescript
import { MemoryStore } from "@langchain/langgraph";
const store = new MemoryStore();
// Save user preference (available across ALL threads)
await store.put(["alice", "preferences"], "language", { preference: "short responses" });
// Node with store — access via runtime
const respond = async (state: typeof State.State, runtime: any) => {
const item = await runtime.store?.get(["alice", "preferences"], "language");
return { response: `Using preference: ${item?.value?.preference}` };
};
// Compile with BOTH checkpointer and store
const graph = builder.compile({ checkpointer, store });
// Both threads access same long-term memory
await graph.invoke({ userId: "alice" }, { configurable: {"thread_id": "thread-1" } });
await graph.invoke({ userId: "alice" }, { configurable: {"thread_id": "thread-2" } }); // Same preferences!store = InMemoryStore()
store.put(("user-123", "facts"), "location", {"city": "San Francisco"}) # Put
item = store.get(("user-123", "facts"), "location") # Get
results = store.search(("user-123", "facts"), filter={"city": "San Francisco"}) # Search
store.delete(("user-123", "facts"), "location") # Delete
</python>
</ex-store-operations>
---Compile with BOTH checkpointer and store
常见问题修复
graph = builder.compile(checkpointer=checkpointer, store=store)
<fix-thread-id-required>
<python>
始终在配置中提供thread_id以启用状态持久化。
```python
Both threads access same long-term memory
WRONG: No thread_id - state NOT persisted!
graph.invoke({"user_id": "alice"}, {"configurable": {"thread_id": "thread-1"}})
graph.invoke({"user_id": "alice"}, {"configurable": {"thread_id": "thread-2"}}) # Same preferences!
</python>
<typescript>
Use a Store for cross-thread memory to share user preferences across conversations.
```typescript
import { MemoryStore } from "@langchain/langgraph";
const store = new MemoryStore();
// Save user preference (available across ALL threads)
await store.put(["alice", "preferences"], "language", { preference: "short responses" });
// Node with store — access via runtime
const respond = async (state: typeof State.State, runtime: any) => {
const item = await runtime.store?.get(["alice", "preferences"], "language");
return { response: `Using preference: ${item?.value?.preference}` };
};
// Compile with BOTH checkpointer and store
const graph = builder.compile({ checkpointer, store });
// Both threads access same long-term memory
await graph.invoke({ userId: "alice" }, { configurable: { thread_id: "thread-1" } });
await graph.invoke({ userId: "alice" }, { configurable: { thread_id: "thread-2" } }); // Same preferences!store = InMemoryStore()
store.put(("user-123", "facts"), "location", {"city": "San Francisco"}) # Put
item = store.get(("user-123", "facts"), "location") # Get
results = store.search(("user-123", "facts"), filter={"city": "San Francisco"}) # Search
store.delete(("user-123", "facts"), "location") # Delete
</python>
</ex-store-operations>
---graph.invoke({"messages": ["Hello"]})
graph.invoke({"messages": ["What did I say?"]}) # Doesn't remember!
Fixes
CORRECT: Always provide thread_id
<fix-thread-id-required>
<python>
Always provide thread_id in config to enable state persistence.
```python
config = {"configurable": {"thread_id": "session-1"}}
graph.invoke({"messages": ["Hello"]}, config)
graph.invoke({"messages": ["What did I say?"]}, config) # Remembers!
</python>
<typescript>
始终在配置中提供thread_id以启用状态持久化。
```typescript
// WRONG: No thread_id - state NOT persisted!
await graph.invoke({ messages: [new HumanMessage("Hello")] });
await graph.invoke({ messages: [new HumanMessage("What did I say?")] }); // Doesn't remember!
// CORRECT: Always provide thread_id
const config = { configurable: {"thread_id": "session-1" } };
await graph.invoke({ messages: [new HumanMessage("Hello")] }, config);
await graph.invoke({ messages: [new HumanMessage("What did I say?")] }, config); // Remembers!WRONG: No thread_id - state NOT persisted!
WRONG: Data lost on process restart
graph.invoke({"messages": ["Hello"]})
graph.invoke({"messages": ["What did I say?"]}) # Doesn't remember!
checkpointer = InMemorySaver() # In-memory only!
CORRECT: Always provide thread_id
CORRECT: Use persistent storage for production
config = {"configurable": {"thread_id": "session-1"}}
graph.invoke({"messages": ["Hello"]}, config)
graph.invoke({"messages": ["What did I say?"]}, config) # Remembers!
</python>
<typescript>
Always provide thread_id in config to enable state persistence.
```typescript
// WRONG: No thread_id - state NOT persisted!
await graph.invoke({ messages: [new HumanMessage("Hello")] });
await graph.invoke({ messages: [new HumanMessage("What did I say?")] }); // Doesn't remember!
// CORRECT: Always provide thread_id
const config = { configurable: { thread_id: "session-1" } };
await graph.invoke({ messages: [new HumanMessage("Hello")] }, config);
await graph.invoke({ messages: [new HumanMessage("What did I say?")] }, config); // Remembers!from langgraph.checkpoint.postgres import PostgresSaver
with PostgresSaver.from_conn_string("postgresql://...") as checkpointer:
checkpointer.setup() # only needed on first use to create tables
graph = builder.compile(checkpointer=checkpointer)
</python>
<typescript>
生产环境中使用PostgresSaver替代MemorySaver实现持久化。
```typescript
// WRONG: Data lost on process restart
const checkpointer = new MemorySaver(); // In-memory only!
// CORRECT: Use persistent storage for production
import { PostgresSaver } from "@langchain/langgraph-checkpoint-postgres";
const checkpointer = PostgresSaver.fromConnString("postgresql://...");
await checkpointer.setup(); // only needed on first use to create tablesWRONG: Data lost on process restart
State with reducer: items: Annotated[list, operator.add]
—
Current state: {"items": ["A", "B"]}
—
update_state PASSES THROUGH reducers
checkpointer = InMemorySaver() # In-memory only!
graph.update_state(config, {"items": ["C"]}) # Result: ["A", "B", "C"] - Appended!
CORRECT: Use persistent storage for production
To REPLACE instead, use Overwrite
from langgraph.checkpoint.postgres import PostgresSaver
with PostgresSaver.from_conn_string("postgresql://...") as checkpointer:
checkpointer.setup() # only needed on first use to create tables
graph = builder.compile(checkpointer=checkpointer)
</python>
<typescript>
Use PostgresSaver instead of MemorySaver for production persistence.
```typescript
// WRONG: Data lost on process restart
const checkpointer = new MemorySaver(); // In-memory only!
// CORRECT: Use persistent storage for production
import { PostgresSaver } from "@langchain/langgraph-checkpoint-postgres";
const checkpointer = PostgresSaver.fromConnString("postgresql://...");
await checkpointer.setup(); // only needed on first use to create tablesgraph.update_state(config, {"items": Overwrite(["C"])}) # Result: ["C"] - Replaced
</python>
<typescript>
使用Overwrite替换状态值,而非通过 reducer 传递。
```typescript
import { Overwrite } from "@langchain/langgraph";
// State with reducer: items uses concat reducer
// Current state: { items: ["A", "B"] }
// updateState PASSES THROUGH reducers
await graph.updateState(config, { items: ["C"] }); // Result: ["A", "B", "C"] - Appended!
// To REPLACE instead, use Overwrite
await graph.updateState(config, { items: new Overwrite(["C"]) }); // Result: ["C"] - ReplacedState with reducer: items: Annotated[list, operator.add]
WRONG: Store not available in node
Current state: {"items": ["A", "B"]}
—
update_state PASSES THROUGH reducers
—
graph.update_state(config, {"items": ["C"]}) # Result: ["A", "B", "C"] - Appended!
def my_node(state):
store.put(...) # NameError! store not defined
To REPLACE instead, use Overwrite
CORRECT: Access store via runtime
graph.update_state(config, {"items": Overwrite(["C"])}) # Result: ["C"] - Replaced
</python>
<typescript>
Use Overwrite to replace state values instead of passing through reducers.
```typescript
import { Overwrite } from "@langchain/langgraph";
// State with reducer: items uses concat reducer
// Current state: { items: ["A", "B"] }
// updateState PASSES THROUGH reducers
await graph.updateState(config, { items: ["C"] }); // Result: ["A", "B", "C"] - Appended!
// To REPLACE instead, use Overwrite
await graph.updateState(config, { items: new Overwrite(["C"]) }); // Result: ["C"] - Replacedfrom langgraph.runtime import Runtime
def my_node(state, runtime: Runtime):
runtime.store.put(...) # Correct store instance
</python>
<typescript>
在图节点中通过runtime参数访问Store。
```typescript
// WRONG: Store not available in node
const myNode = async (state) => {
store.put(...); // ReferenceError!
};
// CORRECT: Access store via runtime
const myNode = async (state, runtime) => {
await runtime.store?.put(...); // Correct store instance
};WRONG: Store not available in node
不建议的操作
def my_node(state):
store.put(...) # NameError! store not defined
- 生产环境使用— 进程重启后数据丢失;应使用
InMemorySaverPostgresSaver - 忘记传入— 没有它状态无法持久化
thread_id - 期望绕过reducer — 它会通过reducer处理;使用
update_state实现替换Overwrite - 在单个节点内并行运行同一有状态子图() — 会导致命名空间冲突
checkpointer=True - 在节点中直接访问Store — 通过参数的
Runtime访问runtime.store
CORRECT: Access store via runtime
—
from langgraph.runtime import Runtime
def my_node(state, runtime: Runtime):
runtime.store.put(...) # Correct store instance
</python>
<typescript>
Access store via runtime parameter in graph nodes.
```typescript
// WRONG: Store not available in node
const myNode = async (state) => {
store.put(...); // ReferenceError!
};
// CORRECT: Access store via runtime
const myNode = async (state, runtime) => {
await runtime.store?.put(...); // Correct store instance
};—
What You Should NOT Do
—
- Use in production — data lost on restart; use
InMemorySaverPostgresSaver - Forget — state won't persist without it
thread_id - Expect to bypass reducers — it passes through them; use
update_stateto replaceOverwrite - Run the same stateful subgraph () in parallel within one node — namespace conflict
checkpointer=True - Access store directly in a node — use via the
runtime.storeparamRuntime
—