langgraph-implementation

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

LangGraph Implementation

LangGraph 实现

Core Concepts

核心概念

LangGraph builds stateful, multi-actor agent applications using a graph-based architecture:
  • StateGraph: Builder class for defining graphs with shared state
  • Nodes: Functions that read state and return partial updates
  • Edges: Define execution flow (static or conditional)
  • Channels: Internal state management (LastValue, BinaryOperatorAggregate)
  • Checkpointer: Persistence for pause/resume capabilities
LangGraph 采用基于图的架构构建有状态的多角色智能体应用:
  • StateGraph:用于定义带有共享状态的图的构建器类
  • Nodes:读取状态并返回部分更新的函数
  • Edges:定义执行流程(静态或条件式)
  • Channels:内部状态管理(LastValue、BinaryOperatorAggregate)
  • Checkpointer:用于实现暂停/恢复功能的持久化组件

Essential Imports

必要导入

python
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import MessagesState, add_messages
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import Command, Send, interrupt, RetryPolicy
from typing import Annotated
from typing_extensions import TypedDict
python
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import MessagesState, add_messages
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import Command, Send, interrupt, RetryPolicy
from typing import Annotated
from typing_extensions import TypedDict

State Schema Patterns

状态模式

Basic State with TypedDict

基于TypedDict的基础状态

python
class State(TypedDict):
    counter: int                                    # LastValue - stores last value
    messages: Annotated[list, operator.add]         # Reducer - appends lists
    items: Annotated[list, lambda a, b: a + [b] if b else a]  # Custom reducer
python
class State(TypedDict):
    counter: int                                    # LastValue - stores last value
    messages: Annotated[list, operator.add]         # Reducer - appends lists
    items: Annotated[list, lambda a, b: a + [b] if b else a]  # Custom reducer

MessagesState for Chat Applications

聊天应用专用MessagesState

python
from langgraph.graph.message import MessagesState

class State(MessagesState):
    # Inherits: messages: Annotated[list[AnyMessage], add_messages]
    user_id: str
    context: dict
python
from langgraph.graph.message import MessagesState

class State(MessagesState):
    # Inherits: messages: Annotated[list[AnyMessage], add_messages]
    user_id: str
    context: dict

Pydantic State (for validation)

基于Pydantic的状态(用于验证)

python
from pydantic import BaseModel

class State(BaseModel):
    messages: Annotated[list, add_messages]
    validated_field: str  # Pydantic validates on assignment
python
from pydantic import BaseModel

class State(BaseModel):
    messages: Annotated[list, add_messages]
    validated_field: str  # Pydantic validates on assignment

Building Graphs

构建图

Basic Pattern

基础模式

python
builder = StateGraph(State)
python
builder = StateGraph(State)

Add nodes - functions that take state, return partial updates

添加节点 - 接收状态并返回部分更新的函数

builder.add_node("process", process_fn) builder.add_node("decide", decide_fn)
builder.add_node("process", process_fn) builder.add_node("decide", decide_fn)

Add edges

添加边

builder.add_edge(START, "process") builder.add_edge("process", "decide") builder.add_edge("decide", END)
builder.add_edge(START, "process") builder.add_edge("process", "decide") builder.add_edge("decide", END)

Compile

编译

graph = builder.compile()
undefined
graph = builder.compile()
undefined

Node Function Signature

节点函数签名

python
def my_node(state: State) -> dict:
    """Node receives full state, returns partial update."""
    return {"counter": state["counter"] + 1}
python
def my_node(state: State) -> dict:
    """Node receives full state, returns partial update."""
    return {"counter": state["counter"] + 1}

With config access

支持访问配置

def my_node(state: State, config: RunnableConfig) -> dict: thread_id = config["configurable"]["thread_id"] return {"result": process(state, thread_id)}
def my_node(state: State, config: RunnableConfig) -> dict: thread_id = config["configurable"]["thread_id"] return {"result": process(state, thread_id)}

With Runtime context (v0.6+)

支持Runtime上下文(v0.6+)

def my_node(state: State, runtime: Runtime[Context]) -> dict: user_id = runtime.context.get("user_id") return {"result": user_id}
undefined
def my_node(state: State, runtime: Runtime[Context]) -> dict: user_id = runtime.context.get("user_id") return {"result": user_id}
undefined

Conditional Edges

条件边

python
from typing import Literal

def router(state: State) -> Literal["agent", "tools", "__end__"]:
    last_msg = state["messages"][-1]
    if hasattr(last_msg, "tool_calls") and last_msg.tool_calls:
        return "tools"
    return END  # or "__end__"

builder.add_conditional_edges("agent", router)
python
from typing import Literal

def router(state: State) -> Literal["agent", "tools", "__end__"]:
    last_msg = state["messages"][-1]
    if hasattr(last_msg, "tool_calls") and last_msg.tool_calls:
        return "tools"
    return END  # or "__end__"

builder.add_conditional_edges("agent", router)

With path_map for visualization

使用path_map优化可视化

builder.add_conditional_edges( "agent", router, path_map={"agent": "agent", "tools": "tools", "end": END} )
undefined
builder.add_conditional_edges( "agent", router, path_map={"agent": "agent", "tools": "tools", "end": END} )
undefined

Command Pattern (Dynamic Routing + State Update)

命令模式(动态路由+状态更新)

python
from langgraph.types import Command

def dynamic_node(state: State) -> Command[Literal["next", "__end__"]]:
    if state["should_continue"]:
        return Command(goto="next", update={"step": state["step"] + 1})
    return Command(goto=END)
python
from langgraph.types import Command

def dynamic_node(state: State) -> Command[Literal["next", "__end__"]]:
    if state["should_continue"]:
        return Command(goto="next", update={"step": state["step"] + 1})
    return Command(goto=END)

Must declare destinations for visualization

必须声明目标节点以支持可视化

builder.add_node("dynamic", dynamic_node, destinations=["next", END])
undefined
builder.add_node("dynamic", dynamic_node, destinations=["next", END])
undefined

Send Pattern (Fan-out/Map-Reduce)

发送模式(扇出/Map-Reduce)

python
from langgraph.types import Send

def fan_out(state: State) -> list[Send]:
    """Route to multiple node instances with different inputs."""
    return [Send("worker", {"item": item}) for item in state["items"]]

builder.add_conditional_edges(START, fan_out)
builder.add_edge("worker", "aggregate")  # Workers converge
python
from langgraph.types import Send

def fan_out(state: State) -> list[Send]:
    """Route to multiple node instances with different inputs."""
    return [Send("worker", {"item": item}) for item in state["items"]]

builder.add_conditional_edges(START, fan_out)
builder.add_edge("worker", "aggregate")  # 多个Worker汇聚到同一节点

Checkpointing

检查点

Enable Persistence

启用持久化

python
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.checkpoint.sqlite import SqliteSaver  # Development
from langgraph.checkpoint.postgres import PostgresSaver  # Production
python
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.checkpoint.sqlite import SqliteSaver  # 开发环境
from langgraph.checkpoint.postgres import PostgresSaver  # 生产环境

In-memory (testing only)

内存存储(仅用于测试)

graph = builder.compile(checkpointer=InMemorySaver())
graph = builder.compile(checkpointer=InMemorySaver())

SQLite (development)

SQLite(开发环境)

with SqliteSaver.from_conn_string("checkpoints.db") as checkpointer: graph = builder.compile(checkpointer=checkpointer)
with SqliteSaver.from_conn_string("checkpoints.db") as checkpointer: graph = builder.compile(checkpointer=checkpointer)

Thread-based invocation

基于线程的调用

config = {"configurable": {"thread_id": "user-123"}} result = graph.invoke({"messages": [...]}, config)
undefined
config = {"configurable": {"thread_id": "user-123"}} result = graph.invoke({"messages": [...]}, config)
undefined

State Management

状态管理

python
undefined
python
undefined

Get current state

获取当前状态

state = graph.get_state(config)
state = graph.get_state(config)

Get state history

获取状态历史

for state in graph.get_state_history(config): print(state.values, state.next)
for state in graph.get_state_history(config): print(state.values, state.next)

Update state manually

手动更新状态

graph.update_state(config, {"key": "new_value"}, as_node="node_name")
undefined
graph.update_state(config, {"key": "new_value"}, as_node="node_name")
undefined

Human-in-the-Loop

人机协作

Using interrupt()

使用interrupt()

python
from langgraph.types import interrupt, Command

def review_node(state: State) -> dict:
    # Pause and surface value to client
    human_input = interrupt({"question": "Please review", "data": state["draft"]})
    return {"approved": human_input["approved"]}
python
from langgraph.types import interrupt, Command

def review_node(state: State) -> dict:
    # 暂停并将内容推送给客户端
    human_input = interrupt({"question": "Please review", "data": state["draft"]})
    return {"approved": human_input["approved"]}

Resume with Command

使用Command恢复执行

graph.invoke(Command(resume={"approved": True}), config)
undefined
graph.invoke(Command(resume={"approved": True}), config)
undefined

Interrupt Before/After Nodes

在节点执行前后中断

python
graph = builder.compile(
    checkpointer=checkpointer,
    interrupt_before=["human_review"],  # Pause before node
    interrupt_after=["agent"],          # Pause after node
)
python
graph = builder.compile(
    checkpointer=checkpointer,
    interrupt_before=["human_review"],  # 在节点执行前暂停
    interrupt_after=["agent"],          # 在节点执行后暂停
)

Check pending interrupts

检查待处理的中断

state = graph.get_state(config) if state.next: # Has pending nodes # Resume graph.invoke(None, config)
undefined
state = graph.get_state(config) if state.next: # 存在待执行节点 # 恢复执行 graph.invoke(None, config)
undefined

Streaming

流式处理

python
undefined
python
undefined

Stream modes: "values", "updates", "custom", "messages", "debug"

流式模式:"values", "updates", "custom", "messages", "debug"

Updates only (node outputs)

仅获取更新(节点输出)

for chunk in graph.stream(input, stream_mode="updates"): print(chunk) # {"node_name": {"key": "value"}}
for chunk in graph.stream(input, stream_mode="updates"): print(chunk) # {"node_name": {"key": "value"}}

Full state after each step

每步执行后获取完整状态

for chunk in graph.stream(input, stream_mode="values"): print(chunk)
for chunk in graph.stream(input, stream_mode="values"): print(chunk)

Multiple modes

同时使用多种模式

for mode, chunk in graph.stream(input, stream_mode=["updates", "messages"]): if mode == "messages": print("Token:", chunk)
for mode, chunk in graph.stream(input, stream_mode=["updates", "messages"]): if mode == "messages": print("Token:", chunk)

Custom streaming from within nodes

在节点内自定义流式输出

from langgraph.config import get_stream_writer
def my_node(state): writer = get_stream_writer() writer({"progress": 0.5}) # Custom event return {"result": "done"}
undefined
from langgraph.config import get_stream_writer
def my_node(state): writer = get_stream_writer() writer({"progress": 0.5}) # 自定义事件 return {"result": "done"}
undefined

Subgraphs

子图

python
undefined
python
undefined

Define subgraph

定义子图

sub_builder = StateGraph(SubState) sub_builder.add_node("step", step_fn) sub_builder.add_edge(START, "step") subgraph = sub_builder.compile()
sub_builder = StateGraph(SubState) sub_builder.add_node("step", step_fn) sub_builder.add_edge(START, "step") subgraph = sub_builder.compile()

Use as node in parent

在父图中将子图作为节点使用

parent_builder = StateGraph(ParentState) parent_builder.add_node("subprocess", subgraph) parent_builder.add_edge(START, "subprocess")
parent_builder = StateGraph(ParentState) parent_builder.add_node("subprocess", subgraph) parent_builder.add_edge(START, "subprocess")

Subgraph checkpointing

子图检查点配置

subgraph = sub_builder.compile( checkpointer=None, # Inherit from parent (default) # checkpointer=True, # Use persistent checkpointing # checkpointer=False, # Disable checkpointing )
undefined
subgraph = sub_builder.compile( checkpointer=None, # 继承父图的配置(默认) # checkpointer=True, # 使用持久化检查点 # checkpointer=False, # 禁用检查点 )
undefined

Retry and Caching

重试与缓存

python
from langgraph.types import RetryPolicy, CachePolicy

retry = RetryPolicy(
    initial_interval=0.5,
    backoff_factor=2.0,
    max_attempts=3,
    retry_on=ValueError,  # Or callable: lambda e: isinstance(e, ValueError)
)

cache = CachePolicy(ttl=3600)  # Cache for 1 hour

builder.add_node("risky", risky_fn, retry_policy=retry, cache_policy=cache)
python
from langgraph.types import RetryPolicy, CachePolicy

retry = RetryPolicy(
    initial_interval=0.5,
    backoff_factor=2.0,
    max_attempts=3,
    retry_on=ValueError,  # 或使用可调用对象:lambda e: isinstance(e, ValueError)
)

cache = CachePolicy(ttl=3600)  # 缓存1小时

builder.add_node("risky", risky_fn, retry_policy=retry, cache_policy=cache)

Prebuilt Components

预构建组件

create_react_agent (moved to langchain.agents in v1.0)

create_react_agent(v1.0版本迁移至langchain.agents)

python
from langgraph.prebuilt import create_react_agent, ToolNode
python
from langgraph.prebuilt import create_react_agent, ToolNode

Simple agent

简单智能体

graph = create_react_agent( model="anthropic:claude-3-5-sonnet", tools=[my_tool], prompt="You are a helpful assistant", checkpointer=InMemorySaver(), )
graph = create_react_agent( model="anthropic:claude-3-5-sonnet", tools=[my_tool], prompt="You are a helpful assistant", checkpointer=InMemorySaver(), )

Custom tool node

自定义工具节点

tool_node = ToolNode([tool1, tool2]) builder.add_node("tools", tool_node)
undefined
tool_node = ToolNode([tool1, tool2]) builder.add_node("tools", tool_node)
undefined

Common Patterns

常见模式

Agent Loop

智能体循环

python
def should_continue(state) -> Literal["tools", "__end__"]:
    if state["messages"][-1].tool_calls:
        return "tools"
    return END

builder.add_node("agent", call_model)
builder.add_node("tools", ToolNode(tools))
builder.add_edge(START, "agent")
builder.add_conditional_edges("agent", should_continue)
builder.add_edge("tools", "agent")
python
def should_continue(state) -> Literal["tools", "__end__"]:
    if state["messages"][-1].tool_calls:
        return "tools"
    return END

builder.add_node("agent", call_model)
builder.add_node("tools", ToolNode(tools))
builder.add_edge(START, "agent")
builder.add_conditional_edges("agent", should_continue)
builder.add_edge("tools", "agent")

Parallel Execution

并行执行

python
undefined
python
undefined

Multiple nodes execute in parallel when they share the same trigger

多个节点在同一触发器下并行执行

builder.add_edge(START, "node_a") builder.add_edge(START, "node_b") # Runs parallel with node_a builder.add_edge(["node_a", "node_b"], "join") # Wait for both

See [PATTERNS.md](PATTERNS.md) for advanced patterns including multi-agent systems, hierarchical graphs, and complex workflows.
builder.add_edge(START, "node_a") builder.add_edge(START, "node_b") # 与node_a并行运行 builder.add_edge(["node_a", "node_b"], "join") # 等待两个节点都完成

更多高级模式(包括多智能体系统、分层图和复杂工作流)请查看 [PATTERNS.md](PATTERNS.md)。