langgraph-persistence

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese
<overview> LangGraph's persistence layer enables durable execution by checkpointing graph state:
  • Checkpointer: Saves/loads graph state at every super-step
  • Thread ID: Identifies separate checkpoint sequences (conversations)
  • Store: Cross-thread memory for user preferences, facts
Two memory types:
  • Short-term (checkpointer): Thread-scoped conversation history
  • Long-term (store): Cross-thread user preferences, facts </overview>
<checkpointer-selection>
CheckpointerUse CaseProduction Ready
InMemorySaver
Testing, developmentNo
SqliteSaver
Local developmentPartial
PostgresSaver
ProductionYes
</checkpointer-selection>
<overview> LangGraph的持久化层通过为图状态创建检查点,实现可持久化执行:
  • Checkpointer:在每个超级步骤保存/加载图状态
  • Thread ID:标识独立的检查点序列(即对话)
  • Store:用于存储用户偏好、事实信息的跨线程内存
两种内存类型:
  • 短期内存(由Checkpointer实现):线程级别的对话历史
  • 长期内存(由Store实现):跨线程的用户偏好、事实信息 </overview>
<checkpointer-selection>
Checkpointer使用场景生产环境可用
InMemorySaver
测试、开发阶段
SqliteSaver
本地开发部分支持
PostgresSaver
生产环境
</checkpointer-selection>

Checkpointer Setup

Checkpointer 配置

<ex-basic-persistence> <python> Set up a basic graph with in-memory checkpointing and thread-based state persistence. ```python from langgraph.checkpoint.memory import InMemorySaver from langgraph.graph import StateGraph, START, END from typing_extensions import TypedDict, Annotated import operator
class State(TypedDict): messages: Annotated[list, operator.add]
def add_message(state: State) -> dict: return {"messages": ["Bot response"]}
checkpointer = InMemorySaver()
graph = ( StateGraph(State) .add_node("respond", add_message) .add_edge(START, "respond") .add_edge("respond", END) .compile(checkpointer=checkpointer) # Pass at compile time )
<ex-basic-persistence> <python> 设置一个基于内存检查点和线程级状态持久化的基础图。 ```python from langgraph.checkpoint.memory import InMemorySaver from langgraph.graph import StateGraph, START, END from typing_extensions import TypedDict, Annotated import operator
class State(TypedDict): messages: Annotated[list, operator.add]
def add_message(state: State) -> dict: return {"messages": ["Bot response"]}
checkpointer = InMemorySaver()
graph = ( StateGraph(State) .add_node("respond", add_message) .add_edge(START, "respond") .add_edge("respond", END) .compile(checkpointer=checkpointer) # Pass at compile time )

ALWAYS provide thread_id

ALWAYS provide thread_id

config = {"configurable": {"thread_id": "conversation-1"}}
result1 = graph.invoke({"messages": ["Hello"]}, config) print(len(result1["messages"])) # 2
result2 = graph.invoke({"messages": ["How are you?"]}, config) print(len(result2["messages"])) # 4 (previous + new)
</python>
<typescript>
Set up a basic graph with in-memory checkpointing and thread-based state persistence.
```typescript
import { MemorySaver, StateGraph, StateSchema, MessagesValue, START, END } from "@langchain/langgraph";
import { HumanMessage } from "@langchain/core/messages";

const State = new StateSchema({ messages: MessagesValue });

const addMessage = async (state: typeof State.State) => {
  return { messages: [{ role: "assistant", content: "Bot response" }] };
};

const checkpointer = new MemorySaver();

const graph = new StateGraph(State)
  .addNode("respond", addMessage)
  .addEdge(START, "respond")
  .addEdge("respond", END)
  .compile({ checkpointer });

// ALWAYS provide thread_id
const config = { configurable: { thread_id: "conversation-1" } };

const result1 = await graph.invoke({ messages: [new HumanMessage("Hello")] }, config);
console.log(result1.messages.length);  // 2

const result2 = await graph.invoke({ messages: [new HumanMessage("How are you?")] }, config);
console.log(result2.messages.length);  // 4 (previous + new)
</typescript> </ex-basic-persistence> <ex-production-postgres> <python> Configure PostgreSQL-backed checkpointing for production deployments. ```python from langgraph.checkpoint.postgres import PostgresSaver
with PostgresSaver.from_conn_string( "postgresql://user:pass@localhost/db" ) as checkpointer: checkpointer.setup() # only needed on first use to create tables graph = builder.compile(checkpointer=checkpointer)
</python>
<typescript>
Configure PostgreSQL-backed checkpointing for production deployments.
```typescript
import { PostgresSaver } from "@langchain/langgraph-checkpoint-postgres";

const checkpointer = PostgresSaver.fromConnString(
  "postgresql://user:pass@localhost/db"
);
await checkpointer.setup(); // only needed on first use to create tables

const graph = builder.compile({ checkpointer });
</typescript> </ex-production-postgres>
config = {"configurable": {"thread_id": "conversation-1"}}
result1 = graph.invoke({"messages": ["Hello"]}, config) print(len(result1["messages"])) # 2
result2 = graph.invoke({"messages": ["How are you?"]}, config) print(len(result2["messages"])) # 4 (previous + new)
</python>
<typescript>
设置一个基于内存检查点和线程级状态持久化的基础图。
```typescript
import { MemorySaver, StateGraph, StateSchema, MessagesValue, START, END } from "@langchain/langgraph";
import { HumanMessage } from "@langchain/core/messages";

const State = new StateSchema({ messages: MessagesValue });

const addMessage = async (state: typeof State.State) => {
  return { messages: [{ role: "assistant", content: "Bot response" }] };
};

const checkpointer = new MemorySaver();

const graph = new StateGraph(State)
  .addNode("respond", addMessage)
  .addEdge(START, "respond")
  .addEdge("respond", END)
  .compile({ checkpointer });

// ALWAYS provide thread_id
const config = { configurable: { thread_id: "conversation-1" } };

const result1 = await graph.invoke({ messages: [new HumanMessage("Hello")] }, config);
console.log(result1.messages.length);  // 2

const result2 = await graph.invoke({ messages: [new HumanMessage("How are you?")] }, config);
console.log(result2.messages.length);  // 4 (previous + new)
</typescript> </ex-basic-persistence> <ex-production-postgres> <python> 为生产环境部署配置基于PostgreSQL的检查点持久化。 ```python from langgraph.checkpoint.postgres import PostgresSaver
with PostgresSaver.from_conn_string( "postgresql://user:pass@localhost/db" ) as checkpointer: checkpointer.setup() # only needed on first use to create tables graph = builder.compile(checkpointer=checkpointer)
</python>
<typescript>
为生产环境部署配置基于PostgreSQL的检查点持久化。
```typescript
import { PostgresSaver } from "@langchain/langgraph-checkpoint-postgres";

const checkpointer = PostgresSaver.fromConnString(
  "postgresql://user:pass@localhost/db"
);
await checkpointer.setup(); // only needed on first use to create tables

const graph = builder.compile({ checkpointer });
</typescript> </ex-production-postgres>

Thread Management

线程管理

<ex-separate-threads> <python> Demonstrate isolated state between different thread IDs. ```python
<ex-separate-threads> <python> 演示不同线程ID之间的状态隔离。 ```python

Different threads maintain separate state

Different threads maintain separate state

alice_config = {"configurable": {"thread_id": "user-alice"}} bob_config = {"configurable": {"thread_id": "user-bob"}}
graph.invoke({"messages": ["Hi from Alice"]}, alice_config) graph.invoke({"messages": ["Hi from Bob"]}, bob_config)
alice_config = {"configurable": {"thread_id": "user-alice"}} bob_config = {"configurable": {"thread_id": "user-bob"}}
graph.invoke({"messages": ["Hi from Alice"]}, alice_config) graph.invoke({"messages": ["Hi from Bob"]}, bob_config)

Alice's state is isolated from Bob's

Alice's state is isolated from Bob's

</python>
<typescript>
Demonstrate isolated state between different thread IDs.
```typescript
// Different threads maintain separate state
const aliceConfig = { configurable: { thread_id: "user-alice" } };
const bobConfig = { configurable: { thread_id: "user-bob" } };

await graph.invoke({ messages: [new HumanMessage("Hi from Alice")] }, aliceConfig);
await graph.invoke({ messages: [new HumanMessage("Hi from Bob")] }, bobConfig);

// Alice's state is isolated from Bob's
</typescript> </ex-separate-threads>
</python>
<typescript>
演示不同线程ID之间的状态隔离。
```typescript
// Different threads maintain separate state
const aliceConfig = { configurable: {"thread_id": "user-alice" } };
const bobConfig = { configurable: {"thread_id": "user-bob" } };

await graph.invoke({ messages: [new HumanMessage("Hi from Alice")] }, aliceConfig);
await graph.invoke({ messages: [new HumanMessage("Hi from Bob")] }, bobConfig);

// Alice's state is isolated from Bob's
</typescript> </ex-separate-threads>

State History & Time Travel

状态历史与回溯

<ex-resume-from-checkpoint> <python> Time travel: browse checkpoint history and replay or fork from a past state. ```python config = {"configurable": {"thread_id": "session-1"}}
result = graph.invoke({"messages": ["start"]}, config)
<ex-resume-from-checkpoint> <python> 状态回溯:浏览检查点历史,并重放或从历史状态分支执行。 ```python config = {"configurable": {"thread_id": "session-1"}}
result = graph.invoke({"messages": ["start"]}, config)

Browse checkpoint history

Browse checkpoint history

states = list(graph.get_state_history(config))
states = list(graph.get_state_history(config))

Replay from a past checkpoint

Replay from a past checkpoint

past = states[-2] result = graph.invoke(None, past.config) # None = resume from checkpoint
past = states[-2] result = graph.invoke(None, past.config) # None = resume from checkpoint

Or fork: update state at a past checkpoint, then resume

Or fork: update state at a past checkpoint, then resume

fork_config = graph.update_state(past.config, {"messages": ["edited"]}) result = graph.invoke(None, fork_config)
</python>
<typescript>
Time travel: browse checkpoint history and replay or fork from a past state.
```typescript
const config = { configurable: { thread_id: "session-1" } };

const result = await graph.invoke({ messages: ["start"] }, config);

// Browse checkpoint history (async iterable, collect to array)
const states: Awaited<ReturnType<typeof graph.getState>>[] = [];
for await (const state of graph.getStateHistory(config)) {
  states.push(state);
}

// Replay from a past checkpoint
const past = states[states.length - 2];
const replayed = await graph.invoke(null, past.config);  // null = resume from checkpoint

// Or fork: update state at a past checkpoint, then resume
const forkConfig = await graph.updateState(past.config, { messages: ["edited"] });
const forked = await graph.invoke(null, forkConfig);
</typescript> </ex-resume-from-checkpoint> <ex-update-state> <python> Manually update graph state before resuming execution. ```python config = {"configurable": {"thread_id": "session-1"}}
fork_config = graph.update_state(past.config, {"messages": ["edited"]}) result = graph.invoke(None, fork_config)
</python>
<typescript>
状态回溯:浏览检查点历史,并重放或从历史状态分支执行。
```typescript
const config = { configurable: {"thread_id": "session-1" } };

const result = await graph.invoke({ messages: ["start"] }, config);

// Browse checkpoint history (async iterable, collect to array)
const states: Awaited<ReturnType<typeof graph.getState>>[] = [];
for await (const state of graph.getStateHistory(config)) {
  states.push(state);
}

// Replay from a past checkpoint
const past = states[states.length - 2];
const replayed = await graph.invoke(null, past.config);  // null = resume from checkpoint

// Or fork: update state at a past checkpoint, then resume
const forkConfig = await graph.updateState(past.config, { messages: ["edited"] });
const forked = await graph.invoke(null, forkConfig);
</typescript> </ex-resume-from-checkpoint> <ex-update-state> <python> 在继续执行前手动更新图状态。 ```python config = {"configurable": {"thread_id": "session-1"}}
// Modify state before resuming graph.update_state(config, {"data": "manually_updated"})
// Resume with updated state result = graph.invoke(None, config)
</python>
<typescript>
在继续执行前手动更新图状态。
```typescript
const config = { configurable: {"thread_id": "session-1" } };

// Modify state before resuming
await graph.updateState(config, { data: "manually_updated" });

// Resume with updated state
const result = await graph.invoke(null, config);
</typescript> </ex-update-state>

Modify state before resuming

子图检查点作用域

graph.update_state(config, {"data": "manually_updated"})
编译子图时,
checkpointer
参数控制持久化行为。这对于使用中断、需要多轮次内存或并行运行的子图至关重要。
<subgraph-checkpointer-scoping-table>
特性
checkpointer=False
None
(默认)
True
中断(HITL)
多轮次内存
多调用(不同子图)警告(可能存在命名空间冲突)
多调用(同一子图)
状态检查警告(仅当前调用可用)
</subgraph-checkpointer-scoping-table> <subgraph-checkpointer-when-to-use>

Resume with updated state

各模式适用场景

result = graph.invoke(None, config)
</python>
<typescript>
Manually update graph state before resuming execution.
```typescript
const config = { configurable: { thread_id: "session-1" } };

// Modify state before resuming
await graph.updateState(config, { data: "manually_updated" });

// Resume with updated state
const result = await graph.invoke(null, config);
</typescript> </ex-update-state>
  • checkpointer=False
    — 子图不需要中断或持久化。最简单的选项,无检查点开销。
  • None
    (默认/省略
    checkpointer
    — 子图需要
    interrupt()
    但不需要多轮次内存。每次调用都会从头开始,但可以暂停/恢复。并行执行可行,因为每次调用都会获得唯一的命名空间。
  • checkpointer=True
    — 子图需要在多次调用间保留状态(多轮对话)。每次调用都会从上次结束的位置继续。
</subgraph-checkpointer-when-to-use> <warning-stateful-subgraphs-parallel>
警告:有状态子图(
checkpointer=True
)不支持在单个节点内并行调用同一子图实例 — 调用会写入同一检查点命名空间,导致冲突。
</warning-stateful-subgraphs-parallel> <ex-subgraph-checkpointer-modes> <python> 为你的子图选择合适的检查点模式。 ```python

Subgraph Checkpointer Scoping

No interrupts needed — opt out of checkpointing

When compiling a subgraph, the
checkpointer
parameter controls persistence behavior. This is critical for subgraphs that use interrupts, need multi-turn memory, or run in parallel.
<subgraph-checkpointer-scoping-table>
Feature
checkpointer=False
None
(default)
True
Interrupts (HITL)NoYesYes
Multi-turn memoryNoNoYes
Multiple calls (different subgraphs)YesYesWarning (namespace conflicts possible)
Multiple calls (same subgraph)YesYesNo
State inspectionNoWarning (current invocation only)Yes
</subgraph-checkpointer-scoping-table> <subgraph-checkpointer-when-to-use>
subgraph = subgraph_builder.compile(checkpointer=False)

When to use each mode

Need interrupts but not cross-invocation persistence (default)

  • checkpointer=False
    — Subgraph doesn't need interrupts or persistence. Simplest option, no checkpoint overhead.
  • None
    (default / omit
    checkpointer
    )
    — Subgraph needs
    interrupt()
    but not multi-turn memory. Each invocation starts fresh but can pause/resume. Parallel execution works because each invocation gets a unique namespace.
  • checkpointer=True
    — Subgraph needs to remember state across invocations (multi-turn conversations). Each call picks up where the last left off.
</subgraph-checkpointer-when-to-use> <warning-stateful-subgraphs-parallel>
Warning: Stateful subgraphs (
checkpointer=True
) do NOT support calling the same subgraph instance multiple times within a single node — the calls write to the same checkpoint namespace and conflict.
</warning-stateful-subgraphs-parallel> <ex-subgraph-checkpointer-modes> <python> Choose the right checkpointer mode for your subgraph. ```python
subgraph = subgraph_builder.compile()

No interrupts needed — opt out of checkpointing

Need cross-invocation persistence (stateful)

subgraph = subgraph_builder.compile(checkpointer=False)
subgraph = subgraph_builder.compile(checkpointer=True)
</python>
<typescript>
为你的子图选择合适的检查点模式。
```typescript
// No interrupts needed — opt out of checkpointing
const subgraph = subgraphBuilder.compile({ checkpointer: false });

// Need interrupts but not cross-invocation persistence (default)
const subgraph = subgraphBuilder.compile();

// Need cross-invocation persistence (stateful)
const subgraph = subgraphBuilder.compile({ checkpointer: true });
</typescript> </ex-subgraph-checkpointer-modes> <parallel-subgraph-namespacing>

Need interrupts but not cross-invocation persistence (default)

并行子图命名空间

subgraph = subgraph_builder.compile()
当多个不同的有状态子图并行运行时,将每个子图包装在带有唯一节点名称的
StateGraph
中,以实现稳定的命名空间隔离:
<python> ```python from langgraph.graph import MessagesState, StateGraph
def create_sub_agent(model, *, name, **kwargs): """Wrap an agent with a unique node name for namespace isolation.""" agent = create_agent(model=model, name=name, **kwargs) return ( StateGraph(MessagesState) .add_node(name, agent) # unique name -> stable namespace .add_edge("start", name) .compile() )
fruit_agent = create_sub_agent( "gpt-4.1-mini", name="fruit_agent", tools=[fruit_info], prompt="...", checkpointer=True, ) veggie_agent = create_sub_agent( "gpt-4.1-mini", name="veggie_agent", tools=[veggie_info], prompt="...", checkpointer=True, )
</python>
<typescript>
```typescript
import { StateGraph, StateSchema, MessagesValue, START } from "@langchain/langgraph";

function createSubAgent(model: string, { name, ...kwargs }: { name: string; [key: string]: any }) {
  const agent = createAgent({ model, name, ...kwargs });
  return new StateGraph(new StateSchema({ messages: MessagesValue }))
    .addNode(name, agent)  // unique name -> stable namespace
    .addEdge(START, name)
    .compile();
}

const fruitAgent = createSubAgent("gpt-4.1-mini", {
  name: "fruit_agent", tools: [fruitInfo], prompt: "...", checkpointer: true,
});
const veggieAgent = createSubAgent("gpt-4.1-mini", {
  name: "veggie_agent", tools: [veggieInfo], prompt: "...", checkpointer: true,
});
</typescript>
注意:通过
add_node
添加为节点的子图会自动获得基于名称的命名空间,无需此包装器。
</parallel-subgraph-namespacing>

Need cross-invocation persistence (stateful)

长期内存(Store)

subgraph = subgraph_builder.compile(checkpointer=True)
</python>
<typescript>
Choose the right checkpointer mode for your subgraph.
```typescript
// No interrupts needed — opt out of checkpointing
const subgraph = subgraphBuilder.compile({ checkpointer: false });

// Need interrupts but not cross-invocation persistence (default)
const subgraph = subgraphBuilder.compile();

// Need cross-invocation persistence (stateful)
const subgraph = subgraphBuilder.compile({ checkpointer: true });
</typescript> </ex-subgraph-checkpointer-modes> <parallel-subgraph-namespacing>
<ex-long-term-memory-store> <python> 使用Store实现跨线程内存,在多个对话间共享用户偏好。 ```python from langgraph.store.memory import InMemoryStore
store = InMemoryStore()

Parallel subgraph namespacing

Save user preference (available across ALL threads)

When multiple different stateful subgraphs run in parallel, wrap each in its own
StateGraph
with a unique node name for stable namespace isolation:
<python> ```python from langgraph.graph import MessagesState, StateGraph
def create_sub_agent(model, *, name, **kwargs): """Wrap an agent with a unique node name for namespace isolation.""" agent = create_agent(model=model, name=name, **kwargs) return ( StateGraph(MessagesState) .add_node(name, agent) # unique name -> stable namespace .add_edge("start", name) .compile() )
fruit_agent = create_sub_agent( "gpt-4.1-mini", name="fruit_agent", tools=[fruit_info], prompt="...", checkpointer=True, ) veggie_agent = create_sub_agent( "gpt-4.1-mini", name="veggie_agent", tools=[veggie_info], prompt="...", checkpointer=True, )
</python>
<typescript>
```typescript
import { StateGraph, StateSchema, MessagesValue, START } from "@langchain/langgraph";

function createSubAgent(model: string, { name, ...kwargs }: { name: string; [key: string]: any }) {
  const agent = createAgent({ model, name, ...kwargs });
  return new StateGraph(new StateSchema({ messages: MessagesValue }))
    .addNode(name, agent)  // unique name -> stable namespace
    .addEdge(START, name)
    .compile();
}

const fruitAgent = createSubAgent("gpt-4.1-mini", {
  name: "fruit_agent", tools: [fruitInfo], prompt: "...", checkpointer: true,
});
const veggieAgent = createSubAgent("gpt-4.1-mini", {
  name: "veggie_agent", tools: [veggieInfo], prompt: "...", checkpointer: true,
});
</typescript>
Note: Subgraphs added as nodes (via
add_node
) already get name-based namespaces automatically and don't need this wrapper.
</parallel-subgraph-namespacing>
store.put(("alice", "preferences"), "language", {"preference": "short responses"})

Long-Term Memory (Store)

Node with store — access via runtime

<ex-long-term-memory-store> <python> Use a Store for cross-thread memory to share user preferences across conversations. ```python from langgraph.store.memory import InMemoryStore
store = InMemoryStore()
from langgraph.runtime import Runtime
def respond(state, runtime: Runtime): prefs = runtime.store.get((state["user_id"], "preferences"), "language") return {"response": f"Using preference: {prefs.value}"}

Save user preference (available across ALL threads)

Compile with BOTH checkpointer and store

store.put(("alice", "preferences"), "language", {"preference": "short responses"})
graph = builder.compile(checkpointer=checkpointer, store=store)

Node with store — access via runtime

Both threads access same long-term memory

from langgraph.runtime import Runtime
def respond(state, runtime: Runtime): prefs = runtime.store.get((state["user_id"], "preferences"), "language") return {"response": f"Using preference: {prefs.value}"}
graph.invoke({"user_id": "alice"}, {"configurable": {"thread_id": "thread-1"}}) graph.invoke({"user_id": "alice"}, {"configurable": {"thread_id": "thread-2"}}) # Same preferences!
</python>
<typescript>
使用Store实现跨线程内存,在多个对话间共享用户偏好。
```typescript
import { MemoryStore } from "@langchain/langgraph";

const store = new MemoryStore();

// Save user preference (available across ALL threads)
await store.put(["alice", "preferences"], "language", { preference: "short responses" });

// Node with store — access via runtime
const respond = async (state: typeof State.State, runtime: any) => {
  const item = await runtime.store?.get(["alice", "preferences"], "language");
  return { response: `Using preference: ${item?.value?.preference}` };
};

// Compile with BOTH checkpointer and store
const graph = builder.compile({ checkpointer, store });

// Both threads access same long-term memory
await graph.invoke({ userId: "alice" }, { configurable: {"thread_id": "thread-1" } });
await graph.invoke({ userId: "alice" }, { configurable: {"thread_id": "thread-2" } });  // Same preferences!
</typescript> </ex-long-term-memory-store> <ex-store-operations> <python> Store基础操作:新增、查询、搜索和删除。 ```python from langgraph.store.memory import InMemoryStore
store = InMemoryStore()
store.put(("user-123", "facts"), "location", {"city": "San Francisco"}) # Put item = store.get(("user-123", "facts"), "location") # Get results = store.search(("user-123", "facts"), filter={"city": "San Francisco"}) # Search store.delete(("user-123", "facts"), "location") # Delete
</python>
</ex-store-operations>

---

Compile with BOTH checkpointer and store

常见问题修复

graph = builder.compile(checkpointer=checkpointer, store=store)
<fix-thread-id-required> <python> 始终在配置中提供thread_id以启用状态持久化。 ```python

Both threads access same long-term memory

WRONG: No thread_id - state NOT persisted!

graph.invoke({"user_id": "alice"}, {"configurable": {"thread_id": "thread-1"}}) graph.invoke({"user_id": "alice"}, {"configurable": {"thread_id": "thread-2"}}) # Same preferences!
</python>
<typescript>
Use a Store for cross-thread memory to share user preferences across conversations.
```typescript
import { MemoryStore } from "@langchain/langgraph";

const store = new MemoryStore();

// Save user preference (available across ALL threads)
await store.put(["alice", "preferences"], "language", { preference: "short responses" });

// Node with store — access via runtime
const respond = async (state: typeof State.State, runtime: any) => {
  const item = await runtime.store?.get(["alice", "preferences"], "language");
  return { response: `Using preference: ${item?.value?.preference}` };
};

// Compile with BOTH checkpointer and store
const graph = builder.compile({ checkpointer, store });

// Both threads access same long-term memory
await graph.invoke({ userId: "alice" }, { configurable: { thread_id: "thread-1" } });
await graph.invoke({ userId: "alice" }, { configurable: { thread_id: "thread-2" } });  // Same preferences!
</typescript> </ex-long-term-memory-store> <ex-store-operations> <python> Basic store operations: put, get, search, and delete. ```python from langgraph.store.memory import InMemoryStore
store = InMemoryStore()
store.put(("user-123", "facts"), "location", {"city": "San Francisco"}) # Put item = store.get(("user-123", "facts"), "location") # Get results = store.search(("user-123", "facts"), filter={"city": "San Francisco"}) # Search store.delete(("user-123", "facts"), "location") # Delete
</python>
</ex-store-operations>

---
graph.invoke({"messages": ["Hello"]}) graph.invoke({"messages": ["What did I say?"]}) # Doesn't remember!

Fixes

CORRECT: Always provide thread_id

<fix-thread-id-required> <python> Always provide thread_id in config to enable state persistence. ```python
config = {"configurable": {"thread_id": "session-1"}} graph.invoke({"messages": ["Hello"]}, config) graph.invoke({"messages": ["What did I say?"]}, config) # Remembers!
</python>
<typescript>
始终在配置中提供thread_id以启用状态持久化。
```typescript
// WRONG: No thread_id - state NOT persisted!
await graph.invoke({ messages: [new HumanMessage("Hello")] });
await graph.invoke({ messages: [new HumanMessage("What did I say?")] });  // Doesn't remember!

// CORRECT: Always provide thread_id
const config = { configurable: {"thread_id": "session-1" } };
await graph.invoke({ messages: [new HumanMessage("Hello")] }, config);
await graph.invoke({ messages: [new HumanMessage("What did I say?")] }, config);  // Remembers!
</typescript> </fix-thread-id-required> <fix-inmemory-not-for-production> <python> 生产环境中使用PostgresSaver替代InMemorySaver实现持久化。 ```python

WRONG: No thread_id - state NOT persisted!

WRONG: Data lost on process restart

graph.invoke({"messages": ["Hello"]}) graph.invoke({"messages": ["What did I say?"]}) # Doesn't remember!
checkpointer = InMemorySaver() # In-memory only!

CORRECT: Always provide thread_id

CORRECT: Use persistent storage for production

config = {"configurable": {"thread_id": "session-1"}} graph.invoke({"messages": ["Hello"]}, config) graph.invoke({"messages": ["What did I say?"]}, config) # Remembers!
</python>
<typescript>
Always provide thread_id in config to enable state persistence.
```typescript
// WRONG: No thread_id - state NOT persisted!
await graph.invoke({ messages: [new HumanMessage("Hello")] });
await graph.invoke({ messages: [new HumanMessage("What did I say?")] });  // Doesn't remember!

// CORRECT: Always provide thread_id
const config = { configurable: { thread_id: "session-1" } };
await graph.invoke({ messages: [new HumanMessage("Hello")] }, config);
await graph.invoke({ messages: [new HumanMessage("What did I say?")] }, config);  // Remembers!
</typescript> </fix-thread-id-required> <fix-inmemory-not-for-production> <python> Use PostgresSaver instead of InMemorySaver for production persistence. ```python
from langgraph.checkpoint.postgres import PostgresSaver with PostgresSaver.from_conn_string("postgresql://...") as checkpointer: checkpointer.setup() # only needed on first use to create tables graph = builder.compile(checkpointer=checkpointer)
</python>
<typescript>
生产环境中使用PostgresSaver替代MemorySaver实现持久化。
```typescript
// WRONG: Data lost on process restart
const checkpointer = new MemorySaver();  // In-memory only!

// CORRECT: Use persistent storage for production
import { PostgresSaver } from "@langchain/langgraph-checkpoint-postgres";
const checkpointer = PostgresSaver.fromConnString("postgresql://...");
await checkpointer.setup(); // only needed on first use to create tables
</typescript> </fix-inmemory-not-for-production> <fix-update-state-with-reducers> <python> 使用Overwrite替换状态值,而非通过 reducer 传递。 ```python from langgraph.types import Overwrite

WRONG: Data lost on process restart

State with reducer: items: Annotated[list, operator.add]

Current state: {"items": ["A", "B"]}

update_state PASSES THROUGH reducers

checkpointer = InMemorySaver() # In-memory only!
graph.update_state(config, {"items": ["C"]}) # Result: ["A", "B", "C"] - Appended!

CORRECT: Use persistent storage for production

To REPLACE instead, use Overwrite

from langgraph.checkpoint.postgres import PostgresSaver with PostgresSaver.from_conn_string("postgresql://...") as checkpointer: checkpointer.setup() # only needed on first use to create tables graph = builder.compile(checkpointer=checkpointer)
</python>
<typescript>
Use PostgresSaver instead of MemorySaver for production persistence.
```typescript
// WRONG: Data lost on process restart
const checkpointer = new MemorySaver();  // In-memory only!

// CORRECT: Use persistent storage for production
import { PostgresSaver } from "@langchain/langgraph-checkpoint-postgres";
const checkpointer = PostgresSaver.fromConnString("postgresql://...");
await checkpointer.setup(); // only needed on first use to create tables
</typescript> </fix-inmemory-not-for-production> <fix-update-state-with-reducers> <python> Use Overwrite to replace state values instead of passing through reducers. ```python from langgraph.types import Overwrite
graph.update_state(config, {"items": Overwrite(["C"])}) # Result: ["C"] - Replaced
</python>
<typescript>
使用Overwrite替换状态值,而非通过 reducer 传递。
```typescript
import { Overwrite } from "@langchain/langgraph";

// State with reducer: items uses concat reducer
// Current state: { items: ["A", "B"] }

// updateState PASSES THROUGH reducers
await graph.updateState(config, { items: ["C"] });  // Result: ["A", "B", "C"] - Appended!

// To REPLACE instead, use Overwrite
await graph.updateState(config, { items: new Overwrite(["C"]) });  // Result: ["C"] - Replaced
</typescript> </fix-update-state-with-reducers> <fix-store-injection> <python> 在图节点中通过Runtime对象访问Store。 ```python

State with reducer: items: Annotated[list, operator.add]

WRONG: Store not available in node

Current state: {"items": ["A", "B"]}

update_state PASSES THROUGH reducers

graph.update_state(config, {"items": ["C"]}) # Result: ["A", "B", "C"] - Appended!
def my_node(state): store.put(...) # NameError! store not defined

To REPLACE instead, use Overwrite

CORRECT: Access store via runtime

graph.update_state(config, {"items": Overwrite(["C"])}) # Result: ["C"] - Replaced
</python>
<typescript>
Use Overwrite to replace state values instead of passing through reducers.
```typescript
import { Overwrite } from "@langchain/langgraph";

// State with reducer: items uses concat reducer
// Current state: { items: ["A", "B"] }

// updateState PASSES THROUGH reducers
await graph.updateState(config, { items: ["C"] });  // Result: ["A", "B", "C"] - Appended!

// To REPLACE instead, use Overwrite
await graph.updateState(config, { items: new Overwrite(["C"]) });  // Result: ["C"] - Replaced
</typescript> </fix-update-state-with-reducers> <fix-store-injection> <python> Access store via the Runtime object in graph nodes. ```python
from langgraph.runtime import Runtime
def my_node(state, runtime: Runtime): runtime.store.put(...) # Correct store instance
</python>
<typescript>
在图节点中通过runtime参数访问Store。
```typescript
// WRONG: Store not available in node
const myNode = async (state) => {
  store.put(...);  // ReferenceError!
};

// CORRECT: Access store via runtime
const myNode = async (state, runtime) => {
  await runtime.store?.put(...);  // Correct store instance
};
</typescript> </fix-store-injection> <boundaries>

WRONG: Store not available in node

不建议的操作

def my_node(state): store.put(...) # NameError! store not defined
  • 生产环境使用
    InMemorySaver
    — 进程重启后数据丢失;应使用
    PostgresSaver
  • 忘记传入
    thread_id
    — 没有它状态无法持久化
  • 期望
    update_state
    绕过reducer — 它会通过reducer处理;使用
    Overwrite
    实现替换
  • 在单个节点内并行运行同一有状态子图(
    checkpointer=True
    ) — 会导致命名空间冲突
  • 在节点中直接访问Store — 通过
    Runtime
    参数的
    runtime.store
    访问
</boundaries>

CORRECT: Access store via runtime

from langgraph.runtime import Runtime
def my_node(state, runtime: Runtime): runtime.store.put(...) # Correct store instance
</python>
<typescript>
Access store via runtime parameter in graph nodes.
```typescript
// WRONG: Store not available in node
const myNode = async (state) => {
  store.put(...);  // ReferenceError!
};

// CORRECT: Access store via runtime
const myNode = async (state, runtime) => {
  await runtime.store?.put(...);  // Correct store instance
};
</typescript> </fix-store-injection> <boundaries>

What You Should NOT Do

  • Use
    InMemorySaver
    in production — data lost on restart; use
    PostgresSaver
  • Forget
    thread_id
    — state won't persist without it
  • Expect
    update_state
    to bypass reducers — it passes through them; use
    Overwrite
    to replace
  • Run the same stateful subgraph (
    checkpointer=True
    ) in parallel within one node — namespace conflict
  • Access store directly in a node — use
    runtime.store
    via the
    Runtime
    param
</boundaries>