langgraph

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

LangGraph Workflows

LangGraph 工作流

Summary

概述

LangGraph is a framework for building stateful, multi-agent applications with LLMs. It implements state machines and directed graphs for orchestration, enabling complex workflows with persistent state management, human-in-the-loop support, and time-travel debugging.
Key Innovation: Transforms agent coordination from sequential chains into cyclic graphs with persistent state, conditional branching, and production-grade debugging capabilities.
LangGraph是一款用于基于LLM构建有状态多Agent应用的框架。它通过实现状态机和有向图进行编排,支持具备持久化状态管理、人在回路支持和时间旅行调试能力的复杂工作流。
核心创新:将Agent协调从顺序链转换为具备持久化状态、条件分支和生产级调试能力的循环图。

When to Use

适用场景

Use LangGraph When:
  • Multi-agent coordination required
  • Complex state management needs
  • Human-in-the-loop workflows (approval gates, reviews)
  • Need debugging/observability (time-travel, replay)
  • Conditional branching based on outputs
  • Building production agent systems
  • State persistence across sessions
Don't Use LangGraph When:
  • Simple single-agent tasks
  • No state persistence needed
  • Prototyping/experimentation phase (use simple chains)
  • Team lacks graph/state machine expertise
  • Stateless request-response patterns
适合使用LangGraph的场景
  • 需要多Agent协调
  • 有复杂状态管理需求
  • 人在回路工作流(审批关口、审核)
  • 需要调试/可观测能力(时间旅行、回放)
  • 需要基于输出的条件分支
  • 构建生产级Agent系统
  • 需要跨会话状态持久化
不适合使用LangGraph的场景
  • 简单的单Agent任务
  • 不需要状态持久化
  • 原型/实验阶段(建议使用简单链)
  • 团队缺乏图/状态机相关专业知识
  • 无状态请求-响应模式

Quick Start

快速开始

python
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
python
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator

1. Define state schema

1. 定义状态 schema

class AgentState(TypedDict): messages: Annotated[list, operator.add] current_step: str
class AgentState(TypedDict): messages: Annotated[list, operator.add] current_step: str

2. Create graph

2. 创建图

workflow = StateGraph(AgentState)
workflow = StateGraph(AgentState)

3. Add nodes (agents)

3. 添加节点(Agent)

def researcher(state): return {"messages": ["Research complete"], "current_step": "research"}
def writer(state): return {"messages": ["Article written"], "current_step": "writing"}
workflow.add_node("researcher", researcher) workflow.add_node("writer", writer)
def researcher(state): return {"messages": ["Research complete"], "current_step": "research"}
def writer(state): return {"messages": ["Article written"], "current_step": "writing"}
workflow.add_node("researcher", researcher) workflow.add_node("writer", writer)

4. Add edges (transitions)

4. 添加边(流转规则)

workflow.add_edge("researcher", "writer") workflow.add_edge("writer", END)
workflow.add_edge("researcher", "writer") workflow.add_edge("writer", END)

5. Set entry point and compile

5. 设置入口点并编译

workflow.set_entry_point("researcher") app = workflow.compile()
workflow.set_entry_point("researcher") app = workflow.compile()

6. Execute

6. 执行

result = app.invoke({"messages": [], "current_step": "start"}) print(result)

---
result = app.invoke({"messages": [], "current_step": "start"}) print(result)

---

Core Concepts

核心概念

StateGraph

StateGraph

The fundamental building block representing a directed graph of agents with shared state.
python
from langgraph.graph import StateGraph, END
from typing import TypedDict

class AgentState(TypedDict):
    """State schema shared across all nodes."""
    messages: list
    user_input: str
    final_output: str
    metadata: dict
是构成Agent有向图的基础构建块,所有节点共享状态。
python
from langgraph.graph import StateGraph, END
from typing import TypedDict

class AgentState(TypedDict):
    """所有节点共享的状态 schema。"""
    messages: list
    user_input: str
    final_output: str
    metadata: dict

Create graph with state schema

使用状态 schema 创建图

workflow = StateGraph(AgentState)

**Key Properties**:
- **Nodes**: Agent functions that transform state
- **Edges**: Transitions between nodes (static or conditional)
- **State**: Shared data structure passed between nodes
- **Entry Point**: Starting node of execution
- **END**: Terminal node signaling completion
workflow = StateGraph(AgentState)

**核心属性**:
- **节点**:用于转换状态的Agent函数
- **边**:节点之间的流转规则(静态或条件)
- **状态**:在节点之间传递的共享数据结构
- **入口点**:执行的起始节点
- **END**:标识执行完成的终端节点

Nodes

节点

Nodes are functions that receive current state and return state updates.
python
def research_agent(state: AgentState) -> dict:
    """Node function: receives state, returns updates."""
    query = state["user_input"]

    # Perform research (simplified)
    results = search_web(query)

    # Return state updates (partial state)
    return {
        "messages": state["messages"] + [f"Research: {results}"],
        "metadata": {"research_complete": True}
    }
节点是接收当前状态并返回状态更新的函数。
python
def research_agent(state: AgentState) -> dict:
    """节点函数:接收状态,返回更新。"""
    query = state["user_input"]

    # 执行调研(简化版)
    results = search_web(query)

    # 返回状态更新(部分状态)
    return {
        "messages": state["messages"] + [f"Research: {results}"],
        "metadata": {"research_complete": True}
    }

Add node to graph

向图中添加节点

workflow.add_node("researcher", research_agent)

**Node Behavior**:
- Receives **full state** as input
- Returns **partial state** (only fields to update)
- Can be sync or async functions
- Can invoke LLMs, call APIs, run computations
workflow.add_node("researcher", research_agent)

**节点行为**:
- 接收**完整状态**作为输入
- 返回**部分状态**(仅需要更新的字段)
- 可以是同步或异步函数
- 可以调用LLM、API、执行计算

Edges

Edges define transitions between nodes.
边定义了节点之间的流转规则。

Static Edges

静态边

python
undefined
python
undefined

Direct transition: researcher → writer

直接流转:researcher → writer

workflow.add_edge("researcher", "writer")
workflow.add_edge("researcher", "writer")

Transition to END

流转到 END

workflow.add_edge("writer", END)
undefined
workflow.add_edge("writer", END)
undefined

Conditional Edges

条件边

python
def should_continue(state: AgentState) -> str:
    """Routing function: decides next node based on state."""
    last_message = state["messages"][-1]

    if "APPROVED" in last_message:
        return END
    elif "NEEDS_REVISION" in last_message:
        return "writer"
    else:
        return "reviewer"

workflow.add_conditional_edges(
    "reviewer",  # Source node
    should_continue,  # Routing function
    {
        END: END,
        "writer": "writer",
        "reviewer": "reviewer"
    }
)

python
def should_continue(state: AgentState) -> str:
    """路由函数:根据状态决定下一个节点。"""
    last_message = state["messages"][-1]

    if "APPROVED" in last_message:
        return END
    elif "NEEDS_REVISION" in last_message:
        return "writer"
    else:
        return "reviewer"

workflow.add_conditional_edges(
    "reviewer",  # 源节点
    should_continue,  # 路由函数
    {
        END: END,
        "writer": "writer",
        "reviewer": "reviewer"
    }
)

Graph Construction

图构建

Complete Workflow Example

完整工作流示例

python
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
from langchain_anthropic import ChatAnthropic
python
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
from langchain_anthropic import ChatAnthropic

State schema with reducer

带 reducer 的状态 schema

class ResearchState(TypedDict): topic: str research_notes: Annotated[list, operator.add] # Reducer: appends to list draft: str revision_count: int approved: bool
class ResearchState(TypedDict): topic: str research_notes: Annotated[list, operator.add] # Reducer:追加到列表 draft: str revision_count: int approved: bool

Initialize LLM

初始化 LLM

llm = ChatAnthropic(model="claude-sonnet-4")
llm = ChatAnthropic(model="claude-sonnet-4")

Node 1: Research

节点1:调研

def research_node(state: ResearchState) -> dict: """Research the topic and gather information.""" topic = state["topic"]
prompt = f"Research key points about: {topic}"
response = llm.invoke(prompt)

return {
    "research_notes": [response.content]
}
def research_node(state: ResearchState) -> dict: """调研主题并收集信息。""" topic = state["topic"]
prompt = f"Research key points about: {topic}"
response = llm.invoke(prompt)

return {
    "research_notes": [response.content]
}

Node 2: Write

节点2:写作

def write_node(state: ResearchState) -> dict: """Write draft based on research.""" notes = "\n".join(state["research_notes"])
prompt = f"Write article based on:\n{notes}"
response = llm.invoke(prompt)

return {
    "draft": response.content,
    "revision_count": state.get("revision_count", 0)
}
def write_node(state: ResearchState) -> dict: """基于调研内容撰写草稿。""" notes = "\n".join(state["research_notes"])
prompt = f"Write article based on:\n{notes}"
response = llm.invoke(prompt)

return {
    "draft": response.content,
    "revision_count": state.get("revision_count", 0)
}

Node 3: Review

节点3:审核

def review_node(state: ResearchState) -> dict: """Review the draft and decide if approved.""" draft = state["draft"]
prompt = f"Review this draft. Reply APPROVED or NEEDS_REVISION:\n{draft}"
response = llm.invoke(prompt)

approved = "APPROVED" in response.content

return {
    "approved": approved,
    "revision_count": state["revision_count"] + 1,
    "research_notes": [f"Review feedback: {response.content}"]
}
def review_node(state: ResearchState) -> dict: """审核草稿并决定是否通过。""" draft = state["draft"]
prompt = f"Review this draft. Reply APPROVED or NEEDS_REVISION:\n{draft}"
response = llm.invoke(prompt)

approved = "APPROVED" in response.content

return {
    "approved": approved,
    "revision_count": state["revision_count"] + 1,
    "research_notes": [f"Review feedback: {response.content}"]
}

Routing logic

路由逻辑

def should_continue_writing(state: ResearchState) -> str: """Decide next step after review.""" if state["approved"]: return END elif state["revision_count"] >= 3: return END # Max revisions reached else: return "writer"
def should_continue_writing(state: ResearchState) -> str: """审核后决定下一步操作。""" if state["approved"]: return END elif state["revision_count"] >= 3: return END # 达到最大修订次数 else: return "writer"

Build graph

构建图

workflow = StateGraph(ResearchState)
workflow.add_node("researcher", research_node) workflow.add_node("writer", write_node) workflow.add_node("reviewer", review_node)
workflow = StateGraph(ResearchState)
workflow.add_node("researcher", research_node) workflow.add_node("writer", write_node) workflow.add_node("reviewer", review_node)

Static edges

静态边

workflow.add_edge("researcher", "writer") workflow.add_edge("writer", "reviewer")
workflow.add_edge("researcher", "writer") workflow.add_edge("writer", "reviewer")

Conditional edge from reviewer

来自 reviewer 的条件边

workflow.add_conditional_edges( "reviewer", should_continue_writing, { END: END, "writer": "writer" } )
workflow.set_entry_point("researcher")
workflow.add_conditional_edges( "reviewer", should_continue_writing, { END: END, "writer": "writer" } )
workflow.set_entry_point("researcher")

Compile

编译

app = workflow.compile()
app = workflow.compile()

Execute

执行

result = app.invoke({ "topic": "AI Safety", "research_notes": [], "draft": "", "revision_count": 0, "approved": False })
print(f"Final draft: {result['draft']}") print(f"Revisions: {result['revision_count']}")

---
result = app.invoke({ "topic": "AI Safety", "research_notes": [], "draft": "", "revision_count": 0, "approved": False })
print(f"Final draft: {result['draft']}") print(f"Revisions: {result['revision_count']}")

---

State Management

状态管理

State Schema with TypedDict

基于 TypedDict 的状态 Schema

python
from typing import TypedDict, Annotated, Literal
import operator

class WorkflowState(TypedDict):
    # Simple fields (replaced on update)
    user_id: str
    request_id: str
    status: Literal["pending", "processing", "complete", "failed"]

    # List with reducer (appends instead of replacing)
    messages: Annotated[list, operator.add]

    # Dict with custom reducer
    metadata: Annotated[dict, lambda x, y: {**x, **y}]

    # Optional fields
    error: str | None
    result: dict | None
python
from typing import TypedDict, Annotated, Literal
import operator

class WorkflowState(TypedDict):
    # 简单字段(更新时直接替换)
    user_id: str
    request_id: str
    status: Literal["pending", "processing", "complete", "failed"]

    # 带 reducer 的列表(追加而非替换)
    messages: Annotated[list, operator.add]

    # 带自定义 reducer 的字典
    metadata: Annotated[dict, lambda x, y: {**x, **y}]

    # 可选字段
    error: str | None
    result: dict | None

State Reducers

状态 Reducer

Reducers control how state updates are merged.
python
import operator
from typing import Annotated
Reducer 控制状态更新的合并方式。
python
import operator
from typing import Annotated

Built-in reducers

内置 reducer

Annotated[list, operator.add] # Append to list Annotated[set, operator.or_] # Union of sets Annotated[int, operator.add] # Sum integers
Annotated[list, operator.add] # 追加到列表 Annotated[set, operator.or_] # 集合求并集 Annotated[int, operator.add] # 整数求和

Custom reducer

自定义 reducer

def merge_dicts(existing: dict, update: dict) -> dict: """Deep merge dictionaries.""" result = existing.copy() for key, value in update.items(): if key in result and isinstance(result[key], dict) and isinstance(value, dict): result[key] = merge_dicts(result[key], value) else: result[key] = value return result
class State(TypedDict): config: Annotated[dict, merge_dicts]
undefined
def merge_dicts(existing: dict, update: dict) -> dict: """深度合并字典。""" result = existing.copy() for key, value in update.items(): if key in result and isinstance(result[key], dict) and isinstance(value, dict): result[key] = merge_dicts(result[key], value) else: result[key] = value return result
class State(TypedDict): config: Annotated[dict, merge_dicts]
undefined

State Updates

状态更新

Nodes return partial state - only fields to update:
python
def node_function(state: State) -> dict:
    """Return only fields that should be updated."""
    return {
        "messages": ["New message"],  # Will be appended
        "status": "processing"  # Will be replaced
    }
    # Other fields remain unchanged

节点返回部分状态——仅需要更新的字段:
python
def node_function(state: State) -> dict:
    """仅返回需要更新的字段。"""
    return {
        "messages": ["New message"],  # 会被追加
        "status": "processing"  # 会被替换
    }
    # 其他字段保持不变

Conditional Routing

条件路由

Basic Routing Function

基础路由函数

python
def route_based_on_score(state: State) -> str:
    """Route to different nodes based on state."""
    score = state["quality_score"]

    if score >= 0.9:
        return "publish"
    elif score >= 0.6:
        return "review"
    else:
        return "revise"

workflow.add_conditional_edges(
    "evaluator",
    route_based_on_score,
    {
        "publish": "publisher",
        "review": "reviewer",
        "revise": "reviser"
    }
)
python
def route_based_on_score(state: State) -> str:
    """根据状态路由到不同节点。"""
    score = state["quality_score"]

    if score >= 0.9:
        return "publish"
    elif score >= 0.6:
        return "review"
    else:
        return "revise"

workflow.add_conditional_edges(
    "evaluator",
    route_based_on_score,
    {
        "publish": "publisher",
        "review": "reviewer",
        "revise": "reviser"
    }
)

Dynamic Routing with LLM

基于 LLM 的动态路由

python
from langchain_anthropic import ChatAnthropic

def llm_router(state: State) -> str:
    """Use LLM to decide next step."""
    llm = ChatAnthropic(model="claude-sonnet-4")

    prompt = f"""
    Current state: {state['current_step']}
    User request: {state['user_input']}

    Which agent should handle this next?
    Options: researcher, coder, writer, FINISH

    Return only one word.
    """

    response = llm.invoke(prompt)
    next_agent = response.content.strip().lower()

    if next_agent == "finish":
        return END
    else:
        return next_agent

workflow.add_conditional_edges(
    "supervisor",
    llm_router,
    {
        "researcher": "researcher",
        "coder": "coder",
        "writer": "writer",
        END: END
    }
)
python
from langchain_anthropic import ChatAnthropic

def llm_router(state: State) -> str:
    """使用 LLM 决定下一步操作。"""
    llm = ChatAnthropic(model="claude-sonnet-4")

    prompt = f"""
    Current state: {state['current_step']}
    User request: {state['user_input']}

    Which agent should handle this next?
    Options: researcher, coder, writer, FINISH

    Return only one word.
    """

    response = llm.invoke(prompt)
    next_agent = response.content.strip().lower()

    if next_agent == "finish":
        return END
    else:
        return next_agent

workflow.add_conditional_edges(
    "supervisor",
    llm_router,
    {
        "researcher": "researcher",
        "coder": "coder",
        "writer": "writer",
        END: END
    }
)

Multi-Condition Routing

多条件路由

python
def complex_router(state: State) -> str:
    """Route based on multiple conditions."""
    # Check multiple conditions
    has_errors = bool(state.get("errors"))
    is_approved = state.get("approved", False)
    iteration_count = state.get("iterations", 0)

    # Priority-based routing
    if has_errors:
        return "error_handler"
    elif is_approved:
        return END
    elif iteration_count >= 5:
        return "escalation"
    else:
        return "processor"

workflow.add_conditional_edges(
    "validator",
    complex_router,
    {
        "error_handler": "error_handler",
        "processor": "processor",
        "escalation": "escalation",
        END: END
    }
)

python
def complex_router(state: State) -> str:
    """基于多个条件进行路由。"""
    # 检查多个条件
    has_errors = bool(state.get("errors"))
    is_approved = state.get("approved", False)
    iteration_count = state.get("iterations", 0)

    # 基于优先级的路由
    if has_errors:
        return "error_handler"
    elif is_approved:
        return END
    elif iteration_count >= 5:
        return "escalation"
    else:
        return "processor"

workflow.add_conditional_edges(
    "validator",
    complex_router,
    {
        "error_handler": "error_handler",
        "processor": "processor",
        "escalation": "escalation",
        END: END
    }
)

Multi-Agent Patterns

多Agent模式

Supervisor Pattern

主管模式

One supervisor coordinates multiple specialized agents.
python
from langgraph.graph import StateGraph, END
from langchain_anthropic import ChatAnthropic
from typing import TypedDict, Literal

class SupervisorState(TypedDict):
    task: str
    agent_history: list
    result: dict
    next_agent: str
由一个主管Agent协调多个专业化Agent。
python
from langgraph.graph import StateGraph, END
from langchain_anthropic import ChatAnthropic
from typing import TypedDict, Literal

class SupervisorState(TypedDict):
    task: str
    agent_history: list
    result: dict
    next_agent: str

Specialized agents

专业化Agent

class ResearchAgent: def init(self): self.llm = ChatAnthropic(model="claude-sonnet-4")
def run(self, state: SupervisorState) -> dict:
    response = self.llm.invoke(f"Research: {state['task']}")
    return {
        "agent_history": [f"Research: {response.content}"],
        "result": {"research": response.content}
    }
class CodingAgent: def init(self): self.llm = ChatAnthropic(model="claude-sonnet-4")
def run(self, state: SupervisorState) -> dict:
    response = self.llm.invoke(f"Code: {state['task']}")
    return {
        "agent_history": [f"Code: {response.content}"],
        "result": {"code": response.content}
    }
class SupervisorAgent: def init(self): self.llm = ChatAnthropic(model="claude-sonnet-4")
def route(self, state: SupervisorState) -> dict:
    """Decide which agent to use next."""
    history = "\n".join(state.get("agent_history", []))

    prompt = f"""
    Task: {state['task']}
    Progress: {history}

    Which agent should handle the next step?
    Options: researcher, coder, FINISH

    Return only one word.
    """

    response = self.llm.invoke(prompt)
    next_agent = response.content.strip().lower()

    return {"next_agent": next_agent}
class ResearchAgent: def init(self): self.llm = ChatAnthropic(model="claude-sonnet-4")
def run(self, state: SupervisorState) -> dict:
    response = self.llm.invoke(f"Research: {state['task']}")
    return {
        "agent_history": [f"Research: {response.content}"],
        "result": {"research": response.content}
    }
class CodingAgent: def init(self): self.llm = ChatAnthropic(model="claude-sonnet-4")
def run(self, state: SupervisorState) -> dict:
    response = self.llm.invoke(f"Code: {state['task']}")
    return {
        "agent_history": [f"Code: {response.content}"],
        "result": {"code": response.content}
    }
class SupervisorAgent: def init(self): self.llm = ChatAnthropic(model="claude-sonnet-4")
def route(self, state: SupervisorState) -> dict:
    """决定下一步使用哪个Agent。"""
    history = "\n".join(state.get("agent_history", []))

    prompt = f"""
    Task: {state['task']}
    Progress: {history}

    Which agent should handle the next step?
    Options: researcher, coder, FINISH

    Return only one word.
    """

    response = self.llm.invoke(prompt)
    next_agent = response.content.strip().lower()

    return {"next_agent": next_agent}

Build supervisor workflow

构建主管工作流

def create_supervisor_workflow(): workflow = StateGraph(SupervisorState)
# Initialize agents
research_agent = ResearchAgent()
coding_agent = CodingAgent()
supervisor = SupervisorAgent()

# Add nodes
workflow.add_node("supervisor", supervisor.route)
workflow.add_node("researcher", research_agent.run)
workflow.add_node("coder", coding_agent.run)

# Conditional routing from supervisor
def route_from_supervisor(state: SupervisorState) -> str:
    next_agent = state.get("next_agent", "FINISH")
    if next_agent == "finish":
        return END
    return next_agent

workflow.add_conditional_edges(
    "supervisor",
    route_from_supervisor,
    {
        "researcher": "researcher",
        "coder": "coder",
        END: END
    }
)

# Loop back to supervisor after each agent
workflow.add_edge("researcher", "supervisor")
workflow.add_edge("coder", "supervisor")

workflow.set_entry_point("supervisor")

return workflow.compile()
def create_supervisor_workflow(): workflow = StateGraph(SupervisorState)
# 初始化Agent
research_agent = ResearchAgent()
coding_agent = CodingAgent()
supervisor = SupervisorAgent()

# 添加节点
workflow.add_node("supervisor", supervisor.route)
workflow.add_node("researcher", research_agent.run)
workflow.add_node("coder", coding_agent.run)

# 来自主管的条件路由
def route_from_supervisor(state: SupervisorState) -> str:
    next_agent = state.get("next_agent", "FINISH")
    if next_agent == "finish":
        return END
    return next_agent

workflow.add_conditional_edges(
    "supervisor",
    route_from_supervisor,
    {
        "researcher": "researcher",
        "coder": "coder",
        END: END
    }
)

# 每个Agent执行完成后回到主管
workflow.add_edge("researcher", "supervisor")
workflow.add_edge("coder", "supervisor")

workflow.set_entry_point("supervisor")

return workflow.compile()

Execute

执行

app = create_supervisor_workflow() result = app.invoke({ "task": "Build a REST API for user management", "agent_history": [], "result": {}, "next_agent": "" })
undefined
app = create_supervisor_workflow() result = app.invoke({ "task": "Build a REST API for user management", "agent_history": [], "result": {}, "next_agent": "" })
undefined

Hierarchical Multi-Agent

分层多Agent

Nested supervisor pattern with sub-teams.
python
class TeamState(TypedDict):
    task: str
    team_results: dict

def create_backend_team():
    """Sub-graph for backend development."""
    workflow = StateGraph(TeamState)

    workflow.add_node("api_designer", design_api)
    workflow.add_node("database_designer", design_db)
    workflow.add_node("implementer", implement_backend)

    workflow.add_edge("api_designer", "database_designer")
    workflow.add_edge("database_designer", "implementer")
    workflow.add_edge("implementer", END)

    workflow.set_entry_point("api_designer")

    return workflow.compile()

def create_frontend_team():
    """Sub-graph for frontend development."""
    workflow = StateGraph(TeamState)

    workflow.add_node("ui_designer", design_ui)
    workflow.add_node("component_builder", build_components)
    workflow.add_node("integrator", integrate_frontend)

    workflow.add_edge("ui_designer", "component_builder")
    workflow.add_edge("component_builder", "integrator")
    workflow.add_edge("integrator", END)

    workflow.set_entry_point("ui_designer")

    return workflow.compile()
嵌套的主管模式,包含子团队。
python
class TeamState(TypedDict):
    task: str
    team_results: dict

def create_backend_team():
    """后端开发子图。"""
    workflow = StateGraph(TeamState)

    workflow.add_node("api_designer", design_api)
    workflow.add_node("database_designer", design_db)
    workflow.add_node("implementer", implement_backend)

    workflow.add_edge("api_designer", "database_designer")
    workflow.add_edge("database_designer", "implementer")
    workflow.add_edge("implementer", END)

    workflow.set_entry_point("api_designer")

    return workflow.compile()

def create_frontend_team():
    """前端开发子图。"""
    workflow = StateGraph(TeamState)

    workflow.add_node("ui_designer", design_ui)
    workflow.add_node("component_builder", build_components)
    workflow.add_node("integrator", integrate_frontend)

    workflow.add_edge("ui_designer", "component_builder")
    workflow.add_edge("component_builder", "integrator")
    workflow.add_edge("integrator", END)

    workflow.set_entry_point("ui_designer")

    return workflow.compile()

Top-level coordinator

顶层协调器

def create_project_workflow(): workflow = StateGraph(TeamState)
# Add team sub-graphs as nodes
backend_team = create_backend_team()
frontend_team = create_frontend_team()

workflow.add_node("backend_team", backend_team)
workflow.add_node("frontend_team", frontend_team)
workflow.add_node("integrator", integrate_teams)

# Parallel execution of teams
workflow.add_edge("backend_team", "integrator")
workflow.add_edge("frontend_team", "integrator")
workflow.add_edge("integrator", END)

workflow.set_entry_point("backend_team")

return workflow.compile()
undefined
def create_project_workflow(): workflow = StateGraph(TeamState)
# 将团队子图作为节点添加
backend_team = create_backend_team()
frontend_team = create_frontend_team()

workflow.add_node("backend_team", backend_team)
workflow.add_node("frontend_team", frontend_team)
workflow.add_node("integrator", integrate_teams)

# 团队并行执行
workflow.add_edge("backend_team", "integrator")
workflow.add_edge("frontend_team", "integrator")
workflow.add_edge("integrator", END)

workflow.set_entry_point("backend_team")

return workflow.compile()
undefined

Swarm Pattern (2025)

蜂群模式(2025)

Dynamic agent hand-offs with collaborative decision-making.
python
from typing import Literal

class SwarmState(TypedDict):
    task: str
    messages: list
    current_agent: str
    handoff_reason: str

def create_swarm():
    """Agents can dynamically hand off to each other."""

    def research_agent(state: SwarmState) -> dict:
        llm = ChatAnthropic(model="claude-sonnet-4")

        # Perform research
        response = llm.invoke(f"Research: {state['task']}")

        # Decide if handoff needed
        needs_code = "implementation" in response.content.lower()

        if needs_code:
            return {
                "messages": [response.content],
                "current_agent": "coder",
                "handoff_reason": "Need implementation"
            }
        else:
            return {
                "messages": [response.content],
                "current_agent": "writer",
                "handoff_reason": "Ready for documentation"
            }

    def coding_agent(state: SwarmState) -> dict:
        llm = ChatAnthropic(model="claude-sonnet-4")

        response = llm.invoke(f"Implement: {state['task']}")

        return {
            "messages": [response.content],
            "current_agent": "tester",
            "handoff_reason": "Code complete, needs testing"
        }

    def tester_agent(state: SwarmState) -> dict:
        # Test the code
        tests_pass = True  # Simplified

        if tests_pass:
            return {
                "messages": ["Tests passed"],
                "current_agent": "DONE",
                "handoff_reason": "All tests passing"
            }
        else:
            return {
                "messages": ["Tests failed"],
                "current_agent": "coder",
                "handoff_reason": "Fix failing tests"
            }

    # Build graph
    workflow = StateGraph(SwarmState)

    workflow.add_node("researcher", research_agent)
    workflow.add_node("coder", coding_agent)
    workflow.add_node("tester", tester_agent)

    # Dynamic routing based on current_agent
    def route_swarm(state: SwarmState) -> str:
        next_agent = state.get("current_agent", "DONE")
        if next_agent == "DONE":
            return END
        return next_agent

    workflow.add_conditional_edges(
        "researcher",
        route_swarm,
        {"coder": "coder", "writer": "writer"}
    )

    workflow.add_conditional_edges(
        "coder",
        route_swarm,
        {"tester": "tester"}
    )

    workflow.add_conditional_edges(
        "tester",
        route_swarm,
        {"coder": "coder", END: END}
    )

    workflow.set_entry_point("researcher")

    return workflow.compile()

动态Agent交接,支持协同决策。
python
from typing import Literal

class SwarmState(TypedDict):
    task: str
    messages: list
    current_agent: str
    handoff_reason: str

def create_swarm():
    """Agent之间可以动态交接。"""

    def research_agent(state: SwarmState) -> dict:
        llm = ChatAnthropic(model="claude-sonnet-4")

        # 执行调研
        response = llm.invoke(f"Research: {state['task']}")

        # 决定是否需要交接
        needs_code = "implementation" in response.content.lower()

        if needs_code:
            return {
                "messages": [response.content],
                "current_agent": "coder",
                "handoff_reason": "Need implementation"
            }
        else:
            return {
                "messages": [response.content],
                "current_agent": "writer",
                "handoff_reason": "Ready for documentation"
            }

    def coding_agent(state: SwarmState) -> dict:
        llm = ChatAnthropic(model="claude-sonnet-4")

        response = llm.invoke(f"Implement: {state['task']}")

        return {
            "messages": [response.content],
            "current_agent": "tester",
            "handoff_reason": "Code complete, needs testing"
        }

    def tester_agent(state: SwarmState) -> dict:
        # 测试代码
        tests_pass = True  # 简化版

        if tests_pass:
            return {
                "messages": ["Tests passed"],
                "current_agent": "DONE",
                "handoff_reason": "All tests passing"
            }
        else:
            return {
                "messages": ["Tests failed"],
                "current_agent": "coder",
                "handoff_reason": "Fix failing tests"
            }

    # 构建图
    workflow = StateGraph(SwarmState)

    workflow.add_node("researcher", research_agent)
    workflow.add_node("coder", coding_agent)
    workflow.add_node("tester", tester_agent)

    # 基于 current_agent 动态路由
    def route_swarm(state: SwarmState) -> str:
        next_agent = state.get("current_agent", "DONE")
        if next_agent == "DONE":
            return END
        return next_agent

    workflow.add_conditional_edges(
        "researcher",
        route_swarm,
        {"coder": "coder", "writer": "writer"}
    )

    workflow.add_conditional_edges(
        "coder",
        route_swarm,
        {"tester": "tester"}
    )

    workflow.add_conditional_edges(
        "tester",
        route_swarm,
        {"coder": "coder", END: END}
    )

    workflow.set_entry_point("researcher")

    return workflow.compile()

Human-in-the-Loop

人在回路

Interrupt for Approval

中断等待审批

python
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.graph import StateGraph, END
python
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.graph import StateGraph, END

Enable checkpointing for interrupts

启用检查点以支持中断

memory = SqliteSaver.from_conn_string(":memory:")
def create_approval_workflow(): workflow = StateGraph(dict)
workflow.add_node("draft", create_draft)
workflow.add_node("approve", human_approval)  # Interrupt point
workflow.add_node("publish", publish_content)

workflow.add_edge("draft", "approve")
workflow.add_edge("approve", "publish")
workflow.add_edge("publish", END)

workflow.set_entry_point("draft")

# Compile with interrupt before "approve"
return workflow.compile(
    checkpointer=memory,
    interrupt_before=["approve"]  # Pause here
)
memory = SqliteSaver.from_conn_string(":memory:")
def create_approval_workflow(): workflow = StateGraph(dict)
workflow.add_node("draft", create_draft)
workflow.add_node("approve", human_approval)  # 中断点
workflow.add_node("publish", publish_content)

workflow.add_edge("draft", "approve")
workflow.add_edge("approve", "publish")
workflow.add_edge("publish", END)

workflow.set_entry_point("draft")

# 编译时配置在"approve"节点前中断
return workflow.compile(
    checkpointer=memory,
    interrupt_before=["approve"]  # 在此处暂停
)

Usage

使用方法

app = create_approval_workflow() config = {"configurable": {"thread_id": "1"}}
app = create_approval_workflow() config = {"configurable": {"thread_id": "1"}}

Step 1: Run until interrupt

步骤1:运行到中断点

for event in app.stream({"content": "Draft article..."}, config): print(event) # Workflow pauses at "approve" node
for event in app.stream({"content": "Draft article..."}, config): print(event) # 工作流在"approve"节点暂停

Human reviews draft

人工审核草稿

print("Draft ready for review. Approve? (y/n)") approval = input()
print("Draft ready for review. Approve? (y/n)") approval = input()

Step 2: Resume with approval decision

步骤2:携带审批结果恢复执行

if approval == "y": # Continue execution result = app.invoke(None, config) # Resume from checkpoint print("Published:", result) else: # Optionally update state and retry app.update_state(config, {"needs_revision": True}) result = app.invoke(None, config)
undefined
if approval == "y": # 继续执行 result = app.invoke(None, config) # 从检查点恢复 print("Published:", result) else: # 可选:更新状态并重试 app.update_state(config, {"needs_revision": True}) result = app.invoke(None, config)
undefined

Interactive Approval Gates

交互式审批关口

python
class ApprovalState(TypedDict):
    draft: str
    approved: bool
    feedback: str

def approval_node(state: ApprovalState) -> dict:
    """This node requires human interaction."""
    # This is where workflow pauses
    # Human provides input through update_state
    return state  # No automatic changes

def create_interactive_workflow():
    workflow = StateGraph(ApprovalState)

    workflow.add_node("writer", write_draft)
    workflow.add_node("approval_gate", approval_node)
    workflow.add_node("reviser", revise_draft)
    workflow.add_node("publisher", publish_final)

    workflow.add_edge("writer", "approval_gate")

    # Route based on approval
    def route_after_approval(state: ApprovalState) -> str:
        if state.get("approved"):
            return "publisher"
        else:
            return "reviser"

    workflow.add_conditional_edges(
        "approval_gate",
        route_after_approval,
        {"publisher": "publisher", "reviser": "reviser"}
    )

    workflow.add_edge("reviser", "approval_gate")  # Loop back
    workflow.add_edge("publisher", END)

    workflow.set_entry_point("writer")

    memory = SqliteSaver.from_conn_string("approvals.db")
    return workflow.compile(
        checkpointer=memory,
        interrupt_before=["approval_gate"]
    )
python
class ApprovalState(TypedDict):
    draft: str
    approved: bool
    feedback: str

def approval_node(state: ApprovalState) -> dict:
    """该节点需要人工交互。"""
    # 工作流在此处暂停
    # 人工通过 update_state 提供输入
    return state  # 无自动变更

def create_interactive_workflow():
    workflow = StateGraph(ApprovalState)

    workflow.add_node("writer", write_draft)
    workflow.add_node("approval_gate", approval_node)
    workflow.add_node("reviser", revise_draft)
    workflow.add_node("publisher", publish_final)

    workflow.add_edge("writer", "approval_gate")

    # 基于审批结果路由
    def route_after_approval(state: ApprovalState) -> str:
        if state.get("approved"):
            return "publisher"
        else:
            return "reviser"

    workflow.add_conditional_edges(
        "approval_gate",
        route_after_approval,
        {"publisher": "publisher", "reviser": "reviser"}
    )

    workflow.add_edge("reviser", "approval_gate")  # 循环回到审批关口
    workflow.add_edge("publisher", END)

    workflow.set_entry_point("writer")

    memory = SqliteSaver.from_conn_string("approvals.db")
    return workflow.compile(
        checkpointer=memory,
        interrupt_before=["approval_gate"]
    )

Execute with human intervention

执行时人工介入

app = create_interactive_workflow() config = {"configurable": {"thread_id": "article_123"}}
app = create_interactive_workflow() config = {"configurable": {"thread_id": "article_123"}}

Step 1: Generate draft (pauses at approval_gate)

步骤1:生成草稿(在approval_gate暂停)

for event in app.stream({"draft": "", "approved": False, "feedback": ""}, config): print(event)
for event in app.stream({"draft": "", "approved": False, "feedback": ""}, config): print(event)

Step 2: Human reviews and provides feedback

步骤2:人工审核并提供反馈

current_state = app.get_state(config) print(f"Review this draft: {current_state.values['draft']}")
current_state = app.get_state(config) print(f"Review this draft: {current_state.values['draft']}")

Human decides

人工决策

app.update_state(config, { "approved": False, "feedback": "Add more examples in section 2" })
app.update_state(config, { "approved": False, "feedback": "Add more examples in section 2" })

Step 3: Resume (goes to reviser due to approved=False)

步骤3:恢复执行(因为approved=False,会进入reviser节点)

result = app.invoke(None, config)

---
result = app.invoke(None, config)

---

Checkpointing and Persistence

检查点与持久化

MemorySaver (In-Memory)

MemorySaver(内存级)

python
from langgraph.checkpoint.memory import MemorySaver
python
from langgraph.checkpoint.memory import MemorySaver

In-memory checkpointing (lost on restart)

内存级检查点(重启后丢失)

memory = MemorySaver()
app = workflow.compile(checkpointer=memory)
config = {"configurable": {"thread_id": "conversation_1"}}
memory = MemorySaver()
app = workflow.compile(checkpointer=memory)
config = {"configurable": {"thread_id": "conversation_1"}}

State automatically saved at each step

每一步状态自动保存

result = app.invoke(initial_state, config)
undefined
result = app.invoke(initial_state, config)
undefined

SQLite Persistence

SQLite 持久化

python
from langgraph.checkpoint.sqlite import SqliteSaver
python
from langgraph.checkpoint.sqlite import SqliteSaver

Persistent checkpointing with SQLite

基于SQLite的持久化检查点

checkpointer = SqliteSaver.from_conn_string("./workflow_state.db")
app = workflow.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "user_session_456"}}
checkpointer = SqliteSaver.from_conn_string("./workflow_state.db")
app = workflow.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "user_session_456"}}

State persists across restarts

状态跨重启持久化

result = app.invoke(initial_state, config)
result = app.invoke(initial_state, config)

Later: Resume from same thread

后续:从同一个线程恢复

result2 = app.invoke(None, config) # Continues from last state
undefined
result2 = app.invoke(None, config) # 从最后一个状态继续
undefined

PostgreSQL for Production

生产级 PostgreSQL

python
from langgraph.checkpoint.postgres import PostgresSaver
python
from langgraph.checkpoint.postgres import PostgresSaver

Production-grade persistence

生产级持久化

checkpointer = PostgresSaver.from_conn_string( "postgresql://user:pass@localhost/langgraph_db" )
app = workflow.compile(checkpointer=checkpointer)
checkpointer = PostgresSaver.from_conn_string( "postgresql://user:pass@localhost/langgraph_db" )
app = workflow.compile(checkpointer=checkpointer)

Supports concurrent workflows with isolation

支持带隔离的并发工作流

config1 = {"configurable": {"thread_id": "user_1"}} config2 = {"configurable": {"thread_id": "user_2"}}
config1 = {"configurable": {"thread_id": "user_1"}} config2 = {"configurable": {"thread_id": "user_2"}}

Each thread maintains independent state

每个线程维护独立状态

result1 = app.invoke(state1, config1) result2 = app.invoke(state2, config2)
undefined
result1 = app.invoke(state1, config1) result2 = app.invoke(state2, config2)
undefined

Redis for Distributed Systems

分布式系统用 Redis

python
from langgraph.checkpoint.redis import RedisSaver
python
from langgraph.checkpoint.redis import RedisSaver

Distributed checkpointing

分布式检查点

checkpointer = RedisSaver.from_conn_string( "redis://localhost:6379", ttl=3600 # 1 hour TTL )
app = workflow.compile(checkpointer=checkpointer)
checkpointer = RedisSaver.from_conn_string( "redis://localhost:6379", ttl=3600 # 1小时过期时间 )
app = workflow.compile(checkpointer=checkpointer)

Supports horizontal scaling

支持水平扩容

Multiple workers can process different threads

多个Worker可以处理不同的线程


---

---

Time-Travel Debugging

时间旅行调试

View State History

查看状态历史

python
from langgraph.checkpoint.sqlite import SqliteSaver

checkpointer = SqliteSaver.from_conn_string("workflow.db")
app = workflow.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "debug_session"}}
python
from langgraph.checkpoint.sqlite import SqliteSaver

checkpointer = SqliteSaver.from_conn_string("workflow.db")
app = workflow.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "debug_session"}}

Run workflow

运行工作流

result = app.invoke(initial_state, config)
result = app.invoke(initial_state, config)

Retrieve full history

获取完整历史

history = app.get_state_history(config)
for i, state_snapshot in enumerate(history): print(f"Step {i}:") print(f" Node: {state_snapshot.next}") print(f" State: {state_snapshot.values}") print(f" Metadata: {state_snapshot.metadata}") print()
undefined
history = app.get_state_history(config)
for i, state_snapshot in enumerate(history): print(f"Step {i}:") print(f" Node: {state_snapshot.next}") print(f" State: {state_snapshot.values}") print(f" Metadata: {state_snapshot.metadata}") print()
undefined

Replay from Checkpoint

从检查点回放

python
undefined
python
undefined

Get state at specific step

获取特定步骤的状态

history = list(app.get_state_history(config)) checkpoint_3 = history[3] # Get state at step 3
history = list(app.get_state_history(config)) checkpoint_3 = history[3] # 获取第3步的状态

Replay from that checkpoint

从该检查点回放

config_replay = { "configurable": { "thread_id": "debug_session", "checkpoint_id": checkpoint_3.config["configurable"]["checkpoint_id"] } }
config_replay = { "configurable": { "thread_id": "debug_session", "checkpoint_id": checkpoint_3.config["configurable"]["checkpoint_id"] } }

Resume from step 3

从第3步恢复执行

result = app.invoke(None, config_replay)
undefined
result = app.invoke(None, config_replay)
undefined

Rewind and Modify

回退并修改

python
undefined
python
undefined

Rewind to step 5

回退到第5步

history = list(app.get_state_history(config)) step_5 = history[5]
history = list(app.get_state_history(config)) step_5 = history[5]

Update state at that point

更新该时间点的状态

app.update_state( config, {"messages": ["Modified message"]}, as_node="researcher" # Apply update as if from this node )
app.update_state( config, {"messages": ["Modified message"]}, as_node="researcher" # 模拟从该节点应用更新 )

Continue execution with modified state

使用修改后的状态继续执行

result = app.invoke(None, config)
undefined
result = app.invoke(None, config)
undefined

Debug Workflow

调试工作流

python
def debug_workflow():
    """Debug workflow step-by-step."""
    checkpointer = SqliteSaver.from_conn_string("debug.db")
    app = workflow.compile(checkpointer=checkpointer)

    config = {"configurable": {"thread_id": "debug"}}

    # Execute step-by-step
    state = initial_state
    for step in range(10):  # Max 10 steps
        print(f"\n=== Step {step} ===")

        # Get current state
        current = app.get_state(config)
        print(f"Current state: {current.values}")
        print(f"Next node: {current.next}")

        if not current.next:
            print("Workflow complete")
            break

        # Execute one step
        for event in app.stream(None, config):
            print(f"Event: {event}")

        # Pause for inspection
        input("Press Enter to continue...")

    # View full history
    print("\n=== Full History ===")
    for i, snapshot in enumerate(app.get_state_history(config)):
        print(f"Step {i}: {snapshot.next} -> {snapshot.values}")

debug_workflow()

python
def debug_workflow():
    """逐步调试工作流。"""
    checkpointer = SqliteSaver.from_conn_string("debug.db")
    app = workflow.compile(checkpointer=checkpointer)

    config = {"configurable": {"thread_id": "debug"}}

    # 逐步执行
    state = initial_state
    for step in range(10):  # 最多10步
        print(f"\n=== Step {step} ===")

        # 获取当前状态
        current = app.get_state(config)
        print(f"Current state: {current.values}")
        print(f"Next node: {current.next}")

        if not current.next:
            print("Workflow complete")
            break

        # 执行一步
        for event in app.stream(None, config):
            print(f"Event: {event}")

        # 暂停以便检查
        input("Press Enter to continue...")

    # 查看完整历史
    print("\n=== Full History ===")
    for i, snapshot in enumerate(app.get_state_history(config)):
        print(f"Step {i}: {snapshot.next} -> {snapshot.values}")

debug_workflow()

Streaming

流式输出

Token Streaming

Token 流式输出

python
from langchain_anthropic import ChatAnthropic

def streaming_node(state: dict) -> dict:
    """Node that streams LLM output."""
    llm = ChatAnthropic(model="claude-sonnet-4")

    # Use streaming
    response = ""
    for chunk in llm.stream(state["prompt"]):
        content = chunk.content
        print(content, end="", flush=True)
        response += content

    return {"response": response}
python
from langchain_anthropic import ChatAnthropic

def streaming_node(state: dict) -> dict:
    """流式输出LLM结果的节点。"""
    llm = ChatAnthropic(model="claude-sonnet-4")

    # 使用流式输出
    response = ""
    for chunk in llm.stream(state["prompt"]):
        content = chunk.content
        print(content, end="", flush=True)
        response += content

    return {"response": response}

Stream events from workflow

从工作流流式输出事件

for event in app.stream(initial_state): print(event)
undefined
for event in app.stream(initial_state): print(event)
undefined

Event Streaming

事件流式输出

python
undefined
python
undefined

Stream all events (node execution, state updates)

流式输出所有事件(节点执行、状态更新)

for event in app.stream(initial_state, stream_mode="updates"): node_name = event.keys()[0] state_update = event[node_name] print(f"Node '{node_name}' updated state: {state_update}")
undefined
for event in app.stream(initial_state, stream_mode="updates"): node_name = event.keys()[0] state_update = event[node_name] print(f"Node '{node_name}' updated state: {state_update}")
undefined

Custom Streaming

自定义流式输出

python
from typing import AsyncGenerator

async def streaming_workflow(input_data: dict) -> AsyncGenerator:
    """Stream workflow progress in real-time."""
    config = {"configurable": {"thread_id": "stream"}}

    async for event in app.astream(input_data, config):
        # Stream each state update
        yield {
            "type": "state_update",
            "data": event
        }

    # Stream final result
    final_state = app.get_state(config)
    yield {
        "type": "complete",
        "data": final_state.values
    }
python
from typing import AsyncGenerator

async def streaming_workflow(input_data: dict) -> AsyncGenerator:
    """实时流式输出工作流进度。"""
    config = {"configurable": {"thread_id": "stream"}}

    async for event in app.astream(input_data, config):
        # 流式输出每个状态更新
        yield {
            "type": "state_update",
            "data": event
        }

    # 流式输出最终结果
    final_state = app.get_state(config)
    yield {
        "type": "complete",
        "data": final_state.values
    }

Usage in FastAPI

在FastAPI中使用

from fastapi import FastAPI from fastapi.responses import StreamingResponse
app_api = FastAPI()
@app_api.post("/workflow/stream") async def stream_workflow(input: dict): return StreamingResponse( streaming_workflow(input), media_type="text/event-stream" )

---
from fastapi import FastAPI from fastapi.responses import StreamingResponse
app_api = FastAPI()
@app_api.post("/workflow/stream") async def stream_workflow(input: dict): return StreamingResponse( streaming_workflow(input), media_type="text/event-stream" )

---

Tool Integration

工具集成

LangChain Tools

LangChain 工具

python
from langchain.tools import Tool
from langchain_community.tools import DuckDuckGoSearchRun
from langchain.agents import AgentExecutor, create_react_agent
python
from langchain.tools import Tool
from langchain_community.tools import DuckDuckGoSearchRun
from langchain.agents import AgentExecutor, create_react_agent

Define tools

定义工具

search = DuckDuckGoSearchRun()
tools = [ Tool( name="Search", func=search.run, description="Search the web for information" ) ]
def agent_with_tools(state: dict) -> dict: """Node that uses LangChain tools.""" from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-sonnet-4")

# Create agent with tools
agent = create_react_agent(llm, tools, prompt_template)
agent_executor = AgentExecutor(agent=agent, tools=tools)

# Execute with tools
result = agent_executor.invoke({"input": state["query"]})

return {"result": result["output"]}
search = DuckDuckGoSearchRun()
tools = [ Tool( name="Search", func=search.run, description="Search the web for information" ) ]
def agent_with_tools(state: dict) -> dict: """使用LangChain工具的节点。""" from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-sonnet-4")

# 创建带工具的Agent
agent = create_react_agent(llm, tools, prompt_template)
agent_executor = AgentExecutor(agent=agent, tools=tools)

# 使用工具执行
result = agent_executor.invoke({"input": state["query"]})

return {"result": result["output"]}

Add to graph

添加到图中

workflow.add_node("agent_with_tools", agent_with_tools)
undefined
workflow.add_node("agent_with_tools", agent_with_tools)
undefined

Custom Tools

自定义工具

python
from langchain.tools import StructuredTool
from pydantic import BaseModel, Field

class CalculatorInput(BaseModel):
    expression: str = Field(description="Mathematical expression to evaluate")

def calculate(expression: str) -> str:
    """Evaluate mathematical expression."""
    try:
        result = eval(expression)  # Simplified
        return f"Result: {result}"
    except Exception as e:
        return f"Error: {e}"

calculator_tool = StructuredTool.from_function(
    func=calculate,
    name="Calculator",
    description="Evaluate mathematical expressions",
    args_schema=CalculatorInput
)

tools = [calculator_tool]

def tool_using_node(state: dict) -> dict:
    """Use custom tools."""
    result = calculator_tool.invoke({"expression": state["math_problem"]})
    return {"solution": result}
python
from langchain.tools import StructuredTool
from pydantic import BaseModel, Field

class CalculatorInput(BaseModel):
    expression: str = Field(description="Mathematical expression to evaluate")

def calculate(expression: str) -> str:
    """计算数学表达式。"""
    try:
        result = eval(expression)  # 简化版
        return f"Result: {result}"
    except Exception as e:
        return f"Error: {e}"

calculator_tool = StructuredTool.from_function(
    func=calculate,
    name="Calculator",
    description="Evaluate mathematical expressions",
    args_schema=CalculatorInput
)

tools = [calculator_tool]

def tool_using_node(state: dict) -> dict:
    """使用自定义工具。"""
    result = calculator_tool.invoke({"expression": state["math_problem"]})
    return {"solution": result}

Anthropic Tool Use

Anthropic 工具调用

python
from anthropic import Anthropic
import json

def node_with_anthropic_tools(state: dict) -> dict:
    """Use Anthropic's tool use feature."""
    client = Anthropic()

    # Define tools
    tools = [
        {
            "name": "get_weather",
            "description": "Get weather for a location",
            "input_schema": {
                "type": "object",
                "properties": {
                    "location": {"type": "string"}
                },
                "required": ["location"]
            }
        }
    ]

    # First call: Request tool use
    response = client.messages.create(
        model="claude-sonnet-4",
        max_tokens=1024,
        tools=tools,
        messages=[{"role": "user", "content": state["query"]}]
    )

    # Execute tool if requested
    if response.stop_reason == "tool_use":
        tool_use = next(
            block for block in response.content
            if block.type == "tool_use"
        )

        # Execute tool
        tool_result = execute_tool(tool_use.name, tool_use.input)

        # Second call: Provide tool result
        final_response = client.messages.create(
            model="claude-sonnet-4",
            max_tokens=1024,
            tools=tools,
            messages=[
                {"role": "user", "content": state["query"]},
                {"role": "assistant", "content": response.content},
                {
                    "role": "user",
                    "content": [
                        {
                            "type": "tool_result",
                            "tool_use_id": tool_use.id,
                            "content": str(tool_result)
                        }
                    ]
                }
            ]
        )

        return {"response": final_response.content[0].text}

    return {"response": response.content[0].text}

python
from anthropic import Anthropic
import json

def node_with_anthropic_tools(state: dict) -> dict:
    """使用Anthropic的工具调用特性。"""
    client = Anthropic()

    # 定义工具
    tools = [
        {
            "name": "get_weather",
            "description": "Get weather for a location",
            "input_schema": {
                "type": "object",
                "properties": {
                    "location": {"type": "string"}
                },
                "required": ["location"]
            }
        }
    ]

    # 第一次调用:请求工具调用
    response = client.messages.create(
        model="claude-sonnet-4",
        max_tokens=1024,
        tools=tools,
        messages=[{"role": "user", "content": state["query"]}]
    )

    # 如果请求工具调用则执行
    if response.stop_reason == "tool_use":
        tool_use = next(
            block for block in response.content
            if block.type == "tool_use"
        )

        # 执行工具
        tool_result = execute_tool(tool_use.name, tool_use.input)

        # 第二次调用:提供工具结果
        final_response = client.messages.create(
            model="claude-sonnet-4",
            max_tokens=1024,
            tools=tools,
            messages=[
                {"role": "user", "content": state["query"]},
                {"role": "assistant", "content": response.content},
                {
                    "role": "user",
                    "content": [
                        {
                            "type": "tool_result",
                            "tool_use_id": tool_use.id,
                            "content": str(tool_result)
                        }
                    ]
                }
            ]
        )

        return {"response": final_response.content[0].text}

    return {"response": response.content[0].text}

Error Handling

错误处理

Try-Catch in Nodes

节点内的 Try-Catch

python
def robust_node(state: dict) -> dict:
    """Node with error handling."""
    try:
        # Main logic
        result = risky_operation(state["input"])

        return {
            "result": result,
            "error": None,
            "status": "success"
        }

    except ValueError as e:
        # Handle specific error
        return {
            "error": f"Validation error: {e}",
            "status": "validation_failed"
        }

    except Exception as e:
        # Handle unexpected errors
        return {
            "error": f"Unexpected error: {e}",
            "status": "failed"
        }
python
def robust_node(state: dict) -> dict:
    """带错误处理的节点。"""
    try:
        # 核心逻辑
        result = risky_operation(state["input"])

        return {
            "result": result,
            "error": None,
            "status": "success"
        }

    except ValueError as e:
        # 处理特定错误
        return {
            "error": f"Validation error: {e}",
            "status": "validation_failed"
        }

    except Exception as e:
        # 处理非预期错误
        return {
            "error": f"Unexpected error: {e}",
            "status": "failed"
        }

Route based on error status

基于错误状态路由

def error_router(state: dict) -> str: status = state.get("status")
if status == "success":
    return "next_step"
elif status == "validation_failed":
    return "validator"
else:
    return "error_handler"
workflow.add_conditional_edges( "robust_node", error_router, { "next_step": "next_step", "validator": "validator", "error_handler": "error_handler" } )
undefined
def error_router(state: dict) -> str: status = state.get("status")
if status == "success":
    return "next_step"
elif status == "validation_failed":
    return "validator"
else:
    return "error_handler"
workflow.add_conditional_edges( "robust_node", error_router, { "next_step": "next_step", "validator": "validator", "error_handler": "error_handler" } )
undefined

Retry Logic

重试逻辑

python
from typing import TypedDict

class RetryState(TypedDict):
    input: str
    result: str
    error: str | None
    retry_count: int
    max_retries: int

def node_with_retry(state: RetryState) -> dict:
    """Node that can be retried on failure."""
    try:
        result = unstable_operation(state["input"])

        return {
            "result": result,
            "error": None,
            "retry_count": 0
        }

    except Exception as e:
        return {
            "error": str(e),
            "retry_count": state.get("retry_count", 0) + 1
        }

def should_retry(state: RetryState) -> str:
    """Decide whether to retry or fail."""
    if state.get("error") is None:
        return "success"

    retry_count = state.get("retry_count", 0)
    max_retries = state.get("max_retries", 3)

    if retry_count < max_retries:
        return "retry"
    else:
        return "failed"
python
from typing import TypedDict

class RetryState(TypedDict):
    input: str
    result: str
    error: str | None
    retry_count: int
    max_retries: int

def node_with_retry(state: RetryState) -> dict:
    """失败时可重试的节点。"""
    try:
        result = unstable_operation(state["input"])

        return {
            "result": result,
            "error": None,
            "retry_count": 0
        }

    except Exception as e:
        return {
            "error": str(e),
            "retry_count": state.get("retry_count", 0) + 1
        }

def should_retry(state: RetryState) -> str:
    """决定是重试还是失败。"""
    if state.get("error") is None:
        return "success"

    retry_count = state.get("retry_count", 0)
    max_retries = state.get("max_retries", 3)

    if retry_count < max_retries:
        return "retry"
    else:
        return "failed"

Build retry loop

构建重试循环

workflow.add_node("operation", node_with_retry) workflow.add_node("success_handler", handle_success) workflow.add_node("failure_handler", handle_failure)
workflow.add_conditional_edges( "operation", should_retry, { "success": "success_handler", "retry": "operation", # Loop back "failed": "failure_handler" } )
undefined
workflow.add_node("operation", node_with_retry) workflow.add_node("success_handler", handle_success) workflow.add_node("failure_handler", handle_failure)
workflow.add_conditional_edges( "operation", should_retry, { "success": "success_handler", "retry": "operation", # 循环回到节点 "failed": "failure_handler" } )
undefined

Fallback Strategies

降级策略

python
def node_with_fallback(state: dict) -> dict:
    """Try primary method, fall back to secondary."""
    # Try primary LLM
    try:
        result = primary_llm(state["prompt"])
        return {
            "result": result,
            "method": "primary"
        }
    except Exception as e_primary:
        # Fallback to secondary LLM
        try:
            result = secondary_llm(state["prompt"])
            return {
                "result": result,
                "method": "fallback",
                "primary_error": str(e_primary)
            }
        except Exception as e_secondary:
            # Both failed
            return {
                "error": f"Both methods failed: {e_primary}, {e_secondary}",
                "method": "failed"
            }

python
def node_with_fallback(state: dict) -> dict:
    """尝试主方法,失败则降级到备用方法。"""
    # 尝试主LLM
    try:
        result = primary_llm(state["prompt"])
        return {
            "result": result,
            "method": "primary"
        }
    except Exception as e_primary:
        # 降级到备用LLM
        try:
            result = secondary_llm(state["prompt"])
            return {
                "result": result,
                "method": "fallback",
                "primary_error": str(e_primary)
            }
        except Exception as e_secondary:
            # 都失败了
            return {
                "error": f"Both methods failed: {e_primary}, {e_secondary}",
                "method": "failed"
            }

Subgraphs and Composition

子图与组合

Subgraphs as Nodes

子图作为节点

python
def create_subgraph():
    """Create reusable subgraph."""
    subgraph = StateGraph(dict)

    subgraph.add_node("step1", step1_func)
    subgraph.add_node("step2", step2_func)

    subgraph.add_edge("step1", "step2")
    subgraph.add_edge("step2", END)

    subgraph.set_entry_point("step1")

    return subgraph.compile()
python
def create_subgraph():
    """创建可复用的子图。"""
    subgraph = StateGraph(dict)

    subgraph.add_node("step1", step1_func)
    subgraph.add_node("step2", step2_func)

    subgraph.add_edge("step1", "step2")
    subgraph.add_edge("step2", END)

    subgraph.set_entry_point("step1")

    return subgraph.compile()

Use subgraph in main graph

在主图中使用子图

def create_main_graph(): workflow = StateGraph(dict)
# Add subgraph as a node
subgraph_compiled = create_subgraph()
workflow.add_node("subgraph", subgraph_compiled)

workflow.add_node("before", before_func)
workflow.add_node("after", after_func)

workflow.add_edge("before", "subgraph")
workflow.add_edge("subgraph", "after")
workflow.add_edge("after", END)

workflow.set_entry_point("before")

return workflow.compile()
undefined
def create_main_graph(): workflow = StateGraph(dict)
# 将子图作为节点添加
subgraph_compiled = create_subgraph()
workflow.add_node("subgraph", subgraph_compiled)

workflow.add_node("before", before_func)
workflow.add_node("after", after_func)

workflow.add_edge("before", "subgraph")
workflow.add_edge("subgraph", "after")
workflow.add_edge("after", END)

workflow.set_entry_point("before")

return workflow.compile()
undefined

Parallel Subgraphs

并行子图

python
from langgraph.graph import StateGraph, END

def create_parallel_workflow():
    """Execute multiple subgraphs in parallel."""

    # Create subgraphs
    backend_graph = create_backend_subgraph()
    frontend_graph = create_frontend_subgraph()
    database_graph = create_database_subgraph()

    # Main graph
    workflow = StateGraph(dict)

    # Add subgraphs as parallel nodes
    workflow.add_node("backend", backend_graph)
    workflow.add_node("frontend", frontend_graph)
    workflow.add_node("database", database_graph)

    # Merge results
    workflow.add_node("merge", merge_results)

    # All subgraphs lead to merge
    workflow.add_edge("backend", "merge")
    workflow.add_edge("frontend", "merge")
    workflow.add_edge("database", "merge")

    workflow.add_edge("merge", END)

    # Set all as entry points (parallel execution)
    workflow.set_entry_point("backend")
    workflow.set_entry_point("frontend")
    workflow.set_entry_point("database")

    return workflow.compile()

python
from langgraph.graph import StateGraph, END

def create_parallel_workflow():
    """并行执行多个子图。"""

    # 创建子图
    backend_graph = create_backend_subgraph()
    frontend_graph = create_frontend_subgraph()
    database_graph = create_database_subgraph()

    # 主图
    workflow = StateGraph(dict)

    # 将子图作为并行节点添加
    workflow.add_node("backend", backend_graph)
    workflow.add_node("frontend", frontend_graph)
    workflow.add_node("database", database_graph)

    # 合并结果
    workflow.add_node("merge", merge_results)

    # 所有子图执行完成后进入合并节点
    workflow.add_edge("backend", "merge")
    workflow.add_edge("frontend", "merge")
    workflow.add_edge("database", "merge")

    workflow.add_edge("merge", END)

    # 将所有子图设为入口点(并行执行)
    workflow.set_entry_point("backend")
    workflow.set_entry_point("frontend")
    workflow.set_entry_point("database")

    return workflow.compile()

Map-Reduce Patterns

Map-Reduce 模式

Map-Reduce with LangGraph

基于 LangGraph 的 Map-Reduce

python
from typing import TypedDict, List

class MapReduceState(TypedDict):
    documents: List[str]
    summaries: List[str]
    final_summary: str

def map_node(state: MapReduceState) -> dict:
    """Map: Summarize each document."""
    llm = ChatAnthropic(model="claude-sonnet-4")

    summaries = []
    for doc in state["documents"]:
        summary = llm.invoke(f"Summarize: {doc}")
        summaries.append(summary.content)

    return {"summaries": summaries}

def reduce_node(state: MapReduceState) -> dict:
    """Reduce: Combine summaries into final summary."""
    llm = ChatAnthropic(model="claude-sonnet-4")

    combined = "\n".join(state["summaries"])
    final = llm.invoke(f"Combine these summaries:\n{combined}")

    return {"final_summary": final.content}
python
from typing import TypedDict, List

class MapReduceState(TypedDict):
    documents: List[str]
    summaries: List[str]
    final_summary: str

def map_node(state: MapReduceState) -> dict:
    """Map:为每个文档生成摘要。"""
    llm = ChatAnthropic(model="claude-sonnet-4")

    summaries = []
    for doc in state["documents"]:
        summary = llm.invoke(f"Summarize: {doc}")
        summaries.append(summary.content)

    return {"summaries": summaries}

def reduce_node(state: MapReduceState) -> dict:
    """Reduce:将摘要合并为最终摘要。"""
    llm = ChatAnthropic(model="claude-sonnet-4")

    combined = "\n".join(state["summaries"])
    final = llm.invoke(f"Combine these summaries:\n{combined}")

    return {"final_summary": final.content}

Build map-reduce workflow

构建 map-reduce 工作流

workflow = StateGraph(MapReduceState)
workflow.add_node("map", map_node) workflow.add_node("reduce", reduce_node)
workflow.add_edge("map", "reduce") workflow.add_edge("reduce", END)
workflow.set_entry_point("map")
app = workflow.compile()
workflow = StateGraph(MapReduceState)
workflow.add_node("map", map_node) workflow.add_node("reduce", reduce_node)
workflow.add_edge("map", "reduce") workflow.add_edge("reduce", END)
workflow.set_entry_point("map")
app = workflow.compile()

Execute

执行

result = app.invoke({ "documents": ["Doc 1...", "Doc 2...", "Doc 3..."], "summaries": [], "final_summary": "" })
undefined
result = app.invoke({ "documents": ["Doc 1...", "Doc 2...", "Doc 3..."], "summaries": [], "final_summary": "" })
undefined

Parallel Map with Batching

带分批的并行 Map

python
import asyncio
from typing import List

async def parallel_map_node(state: dict) -> dict:
    """Map: Process documents in parallel."""
    llm = ChatAnthropic(model="claude-sonnet-4")

    async def summarize(doc: str) -> str:
        response = await llm.ainvoke(f"Summarize: {doc}")
        return response.content

    # Process all documents in parallel
    summaries = await asyncio.gather(
        *[summarize(doc) for doc in state["documents"]]
    )

    return {"summaries": list(summaries)}
python
import asyncio
from typing import List

async def parallel_map_node(state: dict) -> dict:
    """Map:并行处理文档。"""
    llm = ChatAnthropic(model="claude-sonnet-4")

    async def summarize(doc: str) -> str:
        response = await llm.ainvoke(f"Summarize: {doc}")
        return response.content

    # 并行处理所有文档
    summaries = await asyncio.gather(
        *[summarize(doc) for doc in state["documents"]]
    )

    return {"summaries": list(summaries)}

Use async workflow

使用异步工作流

app = workflow.compile()
app = workflow.compile()

Execute with asyncio

用 asyncio 执行

result = asyncio.run(app.ainvoke({ "documents": ["Doc 1", "Doc 2", "Doc 3"], "summaries": [], "final_summary": "" }))

---
result = asyncio.run(app.ainvoke({ "documents": ["Doc 1", "Doc 2", "Doc 3"], "summaries": [], "final_summary": "" }))

---

Production Deployment

生产部署

LangGraph Cloud

LangGraph Cloud

python
undefined
python
undefined

Deploy to LangGraph Cloud (managed service)

部署到 LangGraph Cloud(托管服务)

1. Install LangGraph CLI

1. 安装 LangGraph CLI

pip install langgraph-cli

pip install langgraph-cli

2. Create langgraph.json config

2. 创建 langgraph.json 配置

langgraph_config = { "dependencies": ["langchain", "langchain-anthropic"], "graphs": { "agent": "./graph.py:app" }, "env": { "ANTHROPIC_API_KEY": "$ANTHROPIC_API_KEY" } }
langgraph_config = { "dependencies": ["langchain", "langchain-anthropic"], "graphs": { "agent": "./graph.py:app" }, "env": { "ANTHROPIC_API_KEY": "$ANTHROPIC_API_KEY" } }

3. Deploy

3. 部署

langgraph deploy

langgraph deploy

4. Use deployed graph via API

4. 通过 API 使用部署好的图

import requests
response = requests.post( "https://your-deployment.langraph.cloud/invoke", json={ "input": {"messages": ["Hello"]}, "config": {"thread_id": "user_123"} }, headers={"Authorization": "Bearer YOUR_API_KEY"} )
result = response.json()
undefined
import requests
response = requests.post( "https://your-deployment.langraph.cloud/invoke", json={ "input": {"messages": ["Hello"]}, "config": {"thread_id": "user_123"} }, headers={"Authorization": "Bearer YOUR_API_KEY"} )
result = response.json()
undefined

Self-Hosted with FastAPI

基于 FastAPI 自托管

python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from langgraph.checkpoint.postgres import PostgresSaver
import uvicorn
python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from langgraph.checkpoint.postgres import PostgresSaver
import uvicorn

Create FastAPI app

创建 FastAPI 应用

app_api = FastAPI()
app_api = FastAPI()

Initialize workflow

初始化工作流

checkpointer = PostgresSaver.from_conn_string( "postgresql://user:pass@localhost/langgraph" ) workflow_app = workflow.compile(checkpointer=checkpointer)
class WorkflowRequest(BaseModel): input: dict thread_id: str
class WorkflowResponse(BaseModel): output: dict thread_id: str
@app_api.post("/invoke", response_model=WorkflowResponse) async def invoke_workflow(request: WorkflowRequest): """Invoke workflow synchronously.""" try: config = {"configurable": {"thread_id": request.thread_id}} result = workflow_app.invoke(request.input, config)
    return WorkflowResponse(
        output=result,
        thread_id=request.thread_id
    )
except Exception as e:
    raise HTTPException(status_code=500, detail=str(e))
@app_api.post("/stream") async def stream_workflow(request: WorkflowRequest): """Stream workflow execution.""" from fastapi.responses import StreamingResponse
config = {"configurable": {"thread_id": request.thread_id}}

async def generate():
    async for event in workflow_app.astream(request.input, config):
        yield f"data: {json.dumps(event)}\n\n"

return StreamingResponse(generate(), media_type="text/event-stream")
checkpointer = PostgresSaver.from_conn_string( "postgresql://user:pass@localhost/langgraph" ) workflow_app = workflow.compile(checkpointer=checkpointer)
class WorkflowRequest(BaseModel): input: dict thread_id: str
class WorkflowResponse(BaseModel): output: dict thread_id: str
@app_api.post("/invoke", response_model=WorkflowResponse) async def invoke_workflow(request: WorkflowRequest): """同步调用工作流。""" try: config = {"configurable": {"thread_id": request.thread_id}} result = workflow_app.invoke(request.input, config)
    return WorkflowResponse(
        output=result,
        thread_id=request.thread_id
    )
except Exception as e:
    raise HTTPException(status_code=500, detail=str(e))
@app_api.post("/stream") async def stream_workflow(request: WorkflowRequest): """流式输出工作流执行过程。""" from fastapi.responses import StreamingResponse
config = {"configurable": {"thread_id": request.thread_id}}

async def generate():
    async for event in workflow_app.astream(request.input, config):
        yield f"data: {json.dumps(event)}\n\n"

return StreamingResponse(generate(), media_type="text/event-stream")

Run server

运行服务

if name == "main": uvicorn.run(app_api, host="0.0.0.0", port=8000)
undefined
if name == "main": uvicorn.run(app_api, host="0.0.0.0", port=8000)
undefined

Docker Deployment

Docker 部署

dockerfile
undefined
dockerfile
undefined

Dockerfile

Dockerfile

FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["uvicorn", "main:app_api", "--host", "0.0.0.0", "--port", "8000"]

```yaml
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["uvicorn", "main:app_api", "--host", "0.0.0.0", "--port", "8000"]

```yaml

docker-compose.yml

docker-compose.yml

version: '3.8'
services: app: build: . ports: - "8000:8000" environment: - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY} - POSTGRES_URL=postgresql://user:pass@db:5432/langgraph depends_on: - db - redis
db: image: postgres:15 environment: - POSTGRES_USER=user - POSTGRES_PASSWORD=pass - POSTGRES_DB=langgraph volumes: - postgres_data:/var/lib/postgresql/data
redis: image: redis:7 ports: - "6379:6379"
volumes: postgres_data:

---
version: '3.8'
services: app: build: . ports: - "8000:8000" environment: - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY} - POSTGRES_URL=postgresql://user:pass@db:5432/langgraph depends_on: - db - redis
db: image: postgres:15 environment: - POSTGRES_USER=user - POSTGRES_PASSWORD=pass - POSTGRES_DB=langgraph volumes: - postgres_data:/var/lib/postgresql/data
redis: image: redis:7 ports: - "6379:6379"
volumes: postgres_data:

---

LangSmith Integration

LangSmith 集成

Automatic Tracing

自动链路追踪

python
import os
from langchain_anthropic import ChatAnthropic
python
import os
from langchain_anthropic import ChatAnthropic

Enable LangSmith tracing

启用 LangSmith 追踪

os.environ["LANGCHAIN_TRACING_V2"] = "true" os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-key" os.environ["LANGCHAIN_PROJECT"] = "my-langgraph-project"
os.environ["LANGCHAIN_TRACING_V2"] = "true" os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-key" os.environ["LANGCHAIN_PROJECT"] = "my-langgraph-project"

All LangGraph executions automatically traced

所有 LangGraph 执行会自动被追踪

app = workflow.compile()
result = app.invoke(initial_state)
app = workflow.compile()
result = app.invoke(initial_state)

View trace in LangSmith UI: https://smith.langchain.com

在 LangSmith UI 中查看链路:https://smith.langchain.com

undefined
undefined

Custom Metadata

自定义元数据

python
from langsmith import traceable

@traceable(
    run_type="chain",
    name="research_agent",
    metadata={"version": "1.0.0"}
)
def research_agent(state: dict) -> dict:
    """Node with custom LangSmith metadata."""
    # Function execution automatically traced
    return {"research": "results"}

workflow.add_node("researcher", research_agent)
python
from langsmith import traceable

@traceable(
    run_type="chain",
    name="research_agent",
    metadata={"version": "1.0.0"}
)
def research_agent(state: dict) -> dict:
    """带自定义 LangSmith 元数据的节点。"""
    # 函数执行会自动被追踪
    return {"research": "results"}

workflow.add_node("researcher", research_agent)

Evaluation with LangSmith

基于 LangSmith 的评估

python
from langsmith import Client
from langsmith.evaluation import evaluate

client = Client()
python
from langsmith import Client
from langsmith.evaluation import evaluate

client = Client()

Create test dataset

创建测试数据集

examples = [ { "inputs": {"topic": "AI Safety"}, "outputs": {"quality_score": 0.9} } ]
dataset = client.create_dataset("langgraph_tests") for example in examples: client.create_example( dataset_id=dataset.id, inputs=example["inputs"], outputs=example["outputs"] )
examples = [ { "inputs": {"topic": "AI Safety"}, "outputs": {"quality_score": 0.9} } ]
dataset = client.create_dataset("langgraph_tests") for example in examples: client.create_example( dataset_id=dataset.id, inputs=example["inputs"], outputs=example["outputs"] )

Evaluate workflow

评估工作流

def workflow_wrapper(inputs: dict) -> dict: result = app.invoke(inputs) return result
def quality_evaluator(run, example): """Custom evaluator.""" score = calculate_quality(run.outputs) return {"key": "quality", "score": score}
results = evaluate( workflow_wrapper, data="langgraph_tests", evaluators=[quality_evaluator], experiment_prefix="langgraph_v1" )
print(f"Average quality: {results['results']['quality']:.2f}")

---
def workflow_wrapper(inputs: dict) -> dict: result = app.invoke(inputs) return result
def quality_evaluator(run, example): """自定义评估器。""" score = calculate_quality(run.outputs) return {"key": "quality", "score": score}
results = evaluate( workflow_wrapper, data="langgraph_tests", evaluators=[quality_evaluator], experiment_prefix="langgraph_v1" )
print(f"Average quality: {results['results']['quality']:.2f}")

---

Real-World Examples

实际案例

Customer Support Agent

客户支持Agent

python
from typing import TypedDict, Literal
from langchain_anthropic import ChatAnthropic

class SupportState(TypedDict):
    customer_query: str
    category: Literal["billing", "technical", "general"]
    priority: Literal["low", "medium", "high"]
    resolution: str
    escalated: bool

def categorize(state: SupportState) -> dict:
    """Categorize customer query."""
    llm = ChatAnthropic(model="claude-sonnet-4")

    prompt = f"""
    Categorize this support query:
    "{state['customer_query']}"

    Categories: billing, technical, general
    Priority: low, medium, high

    Return format: category|priority
    """

    response = llm.invoke(prompt)
    category, priority = response.content.strip().split("|")

    return {
        "category": category,
        "priority": priority
    }

def handle_billing(state: SupportState) -> dict:
    """Handle billing issues."""
    llm = ChatAnthropic(model="claude-sonnet-4")

    response = llm.invoke(f"Resolve billing issue: {state['customer_query']}")

    return {"resolution": response.content}

def handle_technical(state: SupportState) -> dict:
    """Handle technical issues."""
    llm = ChatAnthropic(model="claude-sonnet-4")

    response = llm.invoke(f"Resolve technical issue: {state['customer_query']}")

    return {"resolution": response.content}

def escalate(state: SupportState) -> dict:
    """Escalate to human agent."""
    return {
        "escalated": True,
        "resolution": "Escalated to senior support team"
    }

def route_by_category(state: SupportState) -> str:
    """Route based on category and priority."""
    if state["priority"] == "high":
        return "escalate"

    category = state["category"]
    if category == "billing":
        return "billing"
    elif category == "technical":
        return "technical"
    else:
        return "general"
python
from typing import TypedDict, Literal
from langchain_anthropic import ChatAnthropic

class SupportState(TypedDict):
    customer_query: str
    category: Literal["billing", "technical", "general"]
    priority: Literal["low", "medium", "high"]
    resolution: str
    escalated: bool

def categorize(state: SupportState) -> dict:
    """对客户查询进行分类。"""
    llm = ChatAnthropic(model="claude-sonnet-4")

    prompt = f"""
    Categorize this support query:
    "{state['customer_query']}"

    Categories: billing, technical, general
    Priority: low, medium, high

    Return format: category|priority
    """

    response = llm.invoke(prompt)
    category, priority = response.content.strip().split("|")

    return {
        "category": category,
        "priority": priority
    }

def handle_billing(state: SupportState) -> dict:
    """处理账单问题。"""
    llm = ChatAnthropic(model="claude-sonnet-4")

    response = llm.invoke(f"Resolve billing issue: {state['customer_query']}")

    return {"resolution": response.content}

def handle_technical(state: SupportState) -> dict:
    """处理技术问题。"""
    llm = ChatAnthropic(model="claude-sonnet-4")

    response = llm.invoke(f"Resolve technical issue: {state['customer_query']}")

    return {"resolution": response.content}

def escalate(state: SupportState) -> dict:
    """升级到人工Agent。"""
    return {
        "escalated": True,
        "resolution": "Escalated to senior support team"
    }

def route_by_category(state: SupportState) -> str:
    """基于分类和优先级路由。"""
    if state["priority"] == "high":
        return "escalate"

    category = state["category"]
    if category == "billing":
        return "billing"
    elif category == "technical":
        return "technical"
    else:
        return "general"

Build support workflow

构建支持工作流

workflow = StateGraph(SupportState)
workflow.add_node("categorize", categorize) workflow.add_node("billing", handle_billing) workflow.add_node("technical", handle_technical) workflow.add_node("general", handle_technical) # Reuse workflow.add_node("escalate", escalate)
workflow.add_edge("categorize", "router")
workflow.add_conditional_edges( "categorize", route_by_category, { "billing": "billing", "technical": "technical", "general": "general", "escalate": "escalate" } )
workflow.add_edge("billing", END) workflow.add_edge("technical", END) workflow.add_edge("general", END) workflow.add_edge("escalate", END)
workflow.set_entry_point("categorize")
support_app = workflow.compile()
workflow = StateGraph(SupportState)
workflow.add_node("categorize", categorize) workflow.add_node("billing", handle_billing) workflow.add_node("technical", handle_technical) workflow.add_node("general", handle_technical) # 复用 workflow.add_node("escalate", escalate)
workflow.add_edge("categorize", "router")
workflow.add_conditional_edges( "categorize", route_by_category, { "billing": "billing", "technical": "technical", "general": "general", "escalate": "escalate" } )
workflow.add_edge("billing", END) workflow.add_edge("technical", END) workflow.add_edge("general", END) workflow.add_edge("escalate", END)
workflow.set_entry_point("categorize")
support_app = workflow.compile()

Usage

使用

result = support_app.invoke({ "customer_query": "I was charged twice for my subscription", "category": "", "priority": "", "resolution": "", "escalated": False })
undefined
result = support_app.invoke({ "customer_query": "I was charged twice for my subscription", "category": "", "priority": "", "resolution": "", "escalated": False })
undefined

Research Assistant

调研助手

python
class ResearchState(TypedDict):
    topic: str
    research_notes: list
    outline: str
    draft: str
    final_article: str
    revision_count: int

def research(state: ResearchState) -> dict:
    """Gather information on topic."""
    # Use search tools
    results = search_web(state["topic"])

    return {
        "research_notes": [f"Research: {results}"]
    }

def create_outline(state: ResearchState) -> dict:
    """Create article outline."""
    llm = ChatAnthropic(model="claude-sonnet-4")

    notes = "\n".join(state["research_notes"])
    response = llm.invoke(f"Create outline for: {notes}")

    return {"outline": response.content}

def write_draft(state: ResearchState) -> dict:
    """Write article draft."""
    llm = ChatAnthropic(model="claude-sonnet-4")

    prompt = f"""
    Topic: {state['topic']}
    Outline: {state['outline']}
    Research: {state['research_notes']}

    Write comprehensive article.
    """

    response = llm.invoke(prompt)

    return {"draft": response.content}

def review_and_revise(state: ResearchState) -> dict:
    """Review draft quality."""
    llm = ChatAnthropic(model="claude-sonnet-4")

    response = llm.invoke(f"Review and improve:\n{state['draft']}")

    return {
        "final_article": response.content,
        "revision_count": state.get("revision_count", 0) + 1
    }
python
class ResearchState(TypedDict):
    topic: str
    research_notes: list
    outline: str
    draft: str
    final_article: str
    revision_count: int

def research(state: ResearchState) -> dict:
    """收集主题相关信息。"""
    # 使用搜索工具
    results = search_web(state["topic"])

    return {
        "research_notes": [f"Research: {results}"]
    }

def create_outline(state: ResearchState) -> dict:
    """生成文章大纲。"""
    llm = ChatAnthropic(model="claude-sonnet-4")

    notes = "\n".join(state["research_notes"])
    response = llm.invoke(f"Create outline for: {notes}")

    return {"outline": response.content}

def write_draft(state: ResearchState) -> dict:
    """撰写文章草稿。"""
    llm = ChatAnthropic(model="claude-sonnet-4")

    prompt = f"""
    Topic: {state['topic']}
    Outline: {state['outline']}
    Research: {state['research_notes']}

    Write comprehensive article.
    """

    response = llm.invoke(prompt)

    return {"draft": response.content}

def review_and_revise(state: ResearchState) -> dict:
    """审核草稿质量。"""
    llm = ChatAnthropic(model="claude-sonnet-4")

    response = llm.invoke(f"Review and improve:\n{state['draft']}")

    return {
        "final_article": response.content,
        "revision_count": state.get("revision_count", 0) + 1
    }

Build research workflow

构建调研工作流

workflow = StateGraph(ResearchState)
workflow.add_node("research", research) workflow.add_node("outline", create_outline) workflow.add_node("write", write_draft) workflow.add_node("review", review_and_revise)
workflow.add_edge("research", "outline") workflow.add_edge("outline", "write") workflow.add_edge("write", "review")
def check_quality(state: ResearchState) -> str: if state["revision_count"] >= 2: return END
# Simple quality check
if len(state.get("final_article", "")) > 1000:
    return END
else:
    return "write"  # Revise
workflow.add_conditional_edges( "review", check_quality, {END: END, "write": "write"} )
workflow.set_entry_point("research")
research_app = workflow.compile()
undefined
workflow = StateGraph(ResearchState)
workflow.add_node("research", research) workflow.add_node("outline", create_outline) workflow.add_node("write", write_draft) workflow.add_node("review", review_and_revise)
workflow.add_edge("research", "outline") workflow.add_edge("outline", "write") workflow.add_edge("write", "review")
def check_quality(state: ResearchState) -> str: if state["revision_count"] >= 2: return END
# 简单质量检查
if len(state.get("final_article", "")) > 1000:
    return END
else:
    return "write"  # 修订
workflow.add_conditional_edges( "review", check_quality, {END: END, "write": "write"} )
workflow.set_entry_point("research")
research_app = workflow.compile()
undefined

Code Review Workflow

代码评审工作流

python
class CodeReviewState(TypedDict):
    code: str
    language: str
    lint_results: list
    test_results: dict
    security_scan: dict
    review_comments: list
    approved: bool

def lint_code(state: CodeReviewState) -> dict:
    """Run linters on code."""
    # Run appropriate linter
    issues = run_linter(state["code"], state["language"])

    return {"lint_results": issues}

def run_tests(state: CodeReviewState) -> dict:
    """Execute test suite."""
    results = execute_tests(state["code"])

    return {"test_results": results}

def security_scan(state: CodeReviewState) -> dict:
    """Scan for security vulnerabilities."""
    scan_results = scan_for_vulnerabilities(state["code"])

    return {"security_scan": scan_results}

def ai_review(state: CodeReviewState) -> dict:
    """AI-powered code review."""
    llm = ChatAnthropic(model="claude-sonnet-4")

    prompt = f"""
    Review this {state['language']} code:

    {state['code']}

    Lint issues: {state['lint_results']}
    Test results: {state['test_results']}
    Security: {state['security_scan']}

    Provide review comments.
    """

    response = llm.invoke(prompt)

    return {"review_comments": [response.content]}

def approve_or_reject(state: CodeReviewState) -> dict:
    """Final approval decision."""
    # Auto-approve if all checks pass
    lint_ok = len(state["lint_results"]) == 0
    tests_ok = state["test_results"].get("passed", 0) == state["test_results"].get("total", 0)
    security_ok = len(state["security_scan"].get("vulnerabilities", [])) == 0

    approved = lint_ok and tests_ok and security_ok

    return {"approved": approved}
python
class CodeReviewState(TypedDict):
    code: str
    language: str
    lint_results: list
    test_results: dict
    security_scan: dict
    review_comments: list
    approved: bool

def lint_code(state: CodeReviewState) -> dict:
    """对代码运行静态检查。"""
    # 运行对应的 linter
    issues = run_linter(state["code"], state["language"])

    return {"lint_results": issues}

def run_tests(state: CodeReviewState) -> dict:
    """执行测试套件。"""
    results = execute_tests(state["code"])

    return {"test_results": results}

def security_scan(state: CodeReviewState) -> dict:
    """扫描安全漏洞。"""
    scan_results = scan_for_vulnerabilities(state["code"])

    return {"security_scan": scan_results}

def ai_review(state: CodeReviewState) -> dict:
    """AI驱动的代码评审。"""
    llm = ChatAnthropic(model="claude-sonnet-4")

    prompt = f"""
    Review this {state['language']} code:

    {state['code']}

    Lint issues: {state['lint_results']}
    Test results: {state['test_results']}
    Security: {state['security_scan']}

    Provide review comments.
    """

    response = llm.invoke(prompt)

    return {"review_comments": [response.content]}

def approve_or_reject(state: CodeReviewState) -> dict:
    """最终审批决策。"""
    # 如果所有检查通过则自动审批
    lint_ok = len(state["lint_results"]) == 0
    tests_ok = state["test_results"].get("passed", 0) == state["test_results"].get("total", 0)
    security_ok = len(state["security_scan"].get("vulnerabilities", [])) == 0

    approved = lint_ok and tests_ok and security_ok

    return {"approved": approved}

Build code review workflow

构建代码评审工作流

workflow = StateGraph(CodeReviewState)
workflow.add_node("lint", lint_code) workflow.add_node("test", run_tests) workflow.add_node("security", security_scan) workflow.add_node("ai_review", ai_review) workflow.add_node("decision", approve_or_reject)
workflow = StateGraph(CodeReviewState)
workflow.add_node("lint", lint_code) workflow.add_node("test", run_tests) workflow.add_node("security", security_scan) workflow.add_node("ai_review", ai_review) workflow.add_node("decision", approve_or_reject)

Parallel execution of checks

检查并行执行

workflow.add_edge("lint", "ai_review") workflow.add_edge("test", "ai_review") workflow.add_edge("security", "ai_review") workflow.add_edge("ai_review", "decision") workflow.add_edge("decision", END)
workflow.set_entry_point("lint") workflow.set_entry_point("test") workflow.set_entry_point("security")
code_review_app = workflow.compile()

---
workflow.add_edge("lint", "ai_review") workflow.add_edge("test", "ai_review") workflow.add_edge("security", "ai_review") workflow.add_edge("ai_review", "decision") workflow.add_edge("decision", END)
workflow.set_entry_point("lint") workflow.set_entry_point("test") workflow.set_entry_point("security")
code_review_app = workflow.compile()

---

Performance Optimization

性能优化

Parallel Execution

并行执行

python
undefined
python
undefined

Multiple nodes with same parent execute in parallel

同一个父节点的多个子节点会并行执行

workflow.add_edge("start", "task_a") workflow.add_edge("start", "task_b") workflow.add_edge("start", "task_c")
workflow.add_edge("start", "task_a") workflow.add_edge("start", "task_b") workflow.add_edge("start", "task_c")

task_a, task_b, task_c run concurrently

task_a、task_b、task_c 并发运行

undefined
undefined

Caching with Checkpointers

基于检查点的缓存

python
from langgraph.checkpoint.sqlite import SqliteSaver
python
from langgraph.checkpoint.sqlite import SqliteSaver

Checkpoint results for reuse

检查点结果可复用

checkpointer = SqliteSaver.from_conn_string("cache.db")
app = workflow.compile(checkpointer=checkpointer)
checkpointer = SqliteSaver.from_conn_string("cache.db")
app = workflow.compile(checkpointer=checkpointer)

Same thread_id reuses cached results

相同的 thread_id 会复用缓存结果

config = {"configurable": {"thread_id": "cached_session"}}
config = {"configurable": {"thread_id": "cached_session"}}

First run: executes all nodes

第一次运行:执行所有节点

result1 = app.invoke(input_data, config)
result1 = app.invoke(input_data, config)

Second run: uses cached state

第二次运行:使用缓存的状态

result2 = app.invoke(None, config) # Instant
undefined
result2 = app.invoke(None, config) # 瞬间完成
undefined

Batching

分批处理

python
def batch_processing_node(state: dict) -> dict:
    """Process items in batches."""
    items = state["items"]
    batch_size = 10

    results = []
    for i in range(0, len(items), batch_size):
        batch = items[i:i+batch_size]
        batch_result = process_batch(batch)
        results.extend(batch_result)

    return {"results": results}
python
def batch_processing_node(state: dict) -> dict:
    """分批处理条目。"""
    items = state["items"]
    batch_size = 10

    results = []
    for i in range(0, len(items), batch_size):
        batch = items[i:i+batch_size]
        batch_result = process_batch(batch)
        results.extend(batch_result)

    return {"results": results}

Async Execution

异步执行

python
from langgraph.graph import StateGraph
import asyncio

async def async_node(state: dict) -> dict:
    """Async node for I/O-bound operations."""
    results = await asyncio.gather(
        fetch_api_1(state["input"]),
        fetch_api_2(state["input"]),
        fetch_api_3(state["input"])
    )

    return {"results": results}
python
from langgraph.graph import StateGraph
import asyncio

async def async_node(state: dict) -> dict:
    """适用于IO密集型操作的异步节点。"""
    results = await asyncio.gather(
        fetch_api_1(state["input"]),
        fetch_api_2(state["input"]),
        fetch_api_3(state["input"])
    )

    return {"results": results}

Use async invoke

使用异步调用

result = await app.ainvoke(input_data)

---
result = await app.ainvoke(input_data)

---

Comparison with Simple Chains

与简单链的对比

LangChain LCEL (Simple Chain)

LangChain LCEL(简单链)

python
from langchain_core.runnables import RunnablePassthrough
from langchain_anthropic import ChatAnthropic

llm = ChatAnthropic(model="claude-sonnet-4")
python
from langchain_core.runnables import RunnablePassthrough
from langchain_anthropic import ChatAnthropic

llm = ChatAnthropic(model="claude-sonnet-4")

Simple linear chain

简单线性链

chain = ( RunnablePassthrough() | llm | {"output": lambda x: x.content} )
result = chain.invoke("Hello")

**Limitations**:
- ❌ No state persistence
- ❌ No conditional branching
- ❌ No cycles/loops
- ❌ No human-in-the-loop
- ❌ Limited debugging
chain = ( RunnablePassthrough() | llm | {"output": lambda x: x.content} )
result = chain.invoke("Hello")

**局限性**:
- ❌ 无状态持久化
- ❌ 无条件分支
- ❌ 无循环/回路
- ❌ 无人在回路支持
- ❌ 调试能力有限

LangGraph (Cyclic Workflow)

LangGraph(循环工作流)

python
from langgraph.graph import StateGraph, END

workflow = StateGraph(dict)

workflow.add_node("step1", step1_func)
workflow.add_node("step2", step2_func)
python
from langgraph.graph import StateGraph, END

workflow = StateGraph(dict)

workflow.add_node("step1", step1_func)
workflow.add_node("step2", step2_func)

Conditional loop

条件循环

workflow.add_conditional_edges( "step2", should_continue, {"step1": "step1", END: END} )
app = workflow.compile(checkpointer=memory)

**Advantages**:
- ✅ Persistent state across steps
- ✅ Conditional branching
- ✅ Cycles and loops
- ✅ Human-in-the-loop support
- ✅ Time-travel debugging
- ✅ Production-ready

---
workflow.add_conditional_edges( "step2", should_continue, {"step1": "step1", END: END} )
app = workflow.compile(checkpointer=memory)

**优势**:
- ✅ 跨步骤持久化状态
- ✅ 条件分支
- ✅ 循环和回路
- ✅ 人在回路支持
- ✅ 时间旅行调试
- ✅ 生产就绪

---

Migration from LangChain LCEL

从 LangChain LCEL 迁移

Before: LCEL Chain

迁移前:LCEL 链

python
from langchain_core.runnables import RunnablePassthrough
from langchain_anthropic import ChatAnthropic

llm = ChatAnthropic(model="claude-sonnet-4")

chain = (
    {"input": RunnablePassthrough()}
    | llm
    | {"output": lambda x: x.content}
)

result = chain.invoke("Question")
python
from langchain_core.runnables import RunnablePassthrough
from langchain_anthropic import ChatAnthropic

llm = ChatAnthropic(model="claude-sonnet-4")

chain = (
    {"input": RunnablePassthrough()}
    | llm
    | {"output": lambda x: x.content}
)

result = chain.invoke("Question")

After: LangGraph

迁移后:LangGraph

python
from langgraph.graph import StateGraph, END
from typing import TypedDict

class State(TypedDict):
    input: str
    output: str

def llm_node(state: State) -> dict:
    llm = ChatAnthropic(model="claude-sonnet-4")
    response = llm.invoke(state["input"])
    return {"output": response.content}

workflow = StateGraph(State)
workflow.add_node("llm", llm_node)
workflow.add_edge("llm", END)
workflow.set_entry_point("llm")

app = workflow.compile()

result = app.invoke({"input": "Question", "output": ""})
python
from langgraph.graph import StateGraph, END
from typing import TypedDict

class State(TypedDict):
    input: str
    output: str

def llm_node(state: State) -> dict:
    llm = ChatAnthropic(model="claude-sonnet-4")
    response = llm.invoke(state["input"])
    return {"output": response.content}

workflow = StateGraph(State)
workflow.add_node("llm", llm_node)
workflow.add_edge("llm", END)
workflow.set_entry_point("llm")

app = workflow.compile()

result = app.invoke({"input": "Question", "output": ""})

Migration Checklist

迁移检查清单

  • Identify stateful vs stateless operations
  • Convert chain steps to graph nodes
  • Define state schema (TypedDict)
  • Add edges between nodes
  • Implement conditional routing if needed
  • Add checkpointing for state persistence
  • Test with same inputs
  • Verify output matches original chain

  • 识别有状态和无状态操作
  • 将链步骤转换为图节点
  • 定义状态 schema(TypedDict)
  • 添加节点之间的边
  • 若需要则实现条件路由
  • 添加检查点以支持状态持久化
  • 使用相同输入测试
  • 验证输出与原链一致

Best Practices

最佳实践

State Design

状态设计

python
undefined
python
undefined

✅ GOOD: Flat, explicit state

✅ 好的设计:扁平化、明确的状态

class GoodState(TypedDict): user_id: str messages: list current_step: str result: dict
class GoodState(TypedDict): user_id: str messages: list current_step: str result: dict

❌ BAD: Nested, ambiguous state

❌ 坏的设计:嵌套、模糊的状态

class BadState(TypedDict): data: dict # Too generic context: Any # No type safety
undefined
class BadState(TypedDict): data: dict # 过于通用 context: Any # 无类型安全
undefined

Node Granularity

节点粒度

python
undefined
python
undefined

✅ GOOD: Single responsibility per node

✅ 好的设计:每个节点单一职责

def validate_input(state): ... def process_data(state): ... def format_output(state): ...
def validate_input(state): ... def process_data(state): ... def format_output(state): ...

❌ BAD: Monolithic node

❌ 坏的设计:单体节点

def do_everything(state): ...
undefined
def do_everything(state): ...
undefined

Error Handling

错误处理

python
undefined
python
undefined

✅ GOOD: Explicit error states

✅ 好的设计:明确的错误状态

class State(TypedDict): result: dict | None error: str | None status: Literal["pending", "success", "failed"]
def robust_node(state): try: result = operation() return {"result": result, "status": "success"} except Exception as e: return {"error": str(e), "status": "failed"}
class State(TypedDict): result: dict | None error: str | None status: Literal["pending", "success", "failed"]
def robust_node(state): try: result = operation() return {"result": result, "status": "success"} except Exception as e: return {"error": str(e), "status": "failed"}

❌ BAD: Silent failures

❌ 坏的设计:静默失败

def fragile_node(state): try: return operation() except: return {} # Error lost
undefined
def fragile_node(state): try: return operation() except: return {} # 错误丢失
undefined

Testing

测试

python
undefined
python
undefined

Test individual nodes

测试单个节点

def test_node(): state = {"input": "test"} result = my_node(state) assert result["output"] == "expected"
def test_node(): state = {"input": "test"} result = my_node(state) assert result["output"] == "expected"

Test full workflow

测试完整工作流

def test_workflow(): app = workflow.compile() result = app.invoke(initial_state) assert result["final_output"] == "expected"
def test_workflow(): app = workflow.compile() result = app.invoke(initial_state) assert result["final_output"] == "expected"

Test error paths

测试错误路径

def test_error_handling(): state = {"input": "invalid"} result = my_node(state) assert result["error"] is not None
undefined
def test_error_handling(): state = {"input": "invalid"} result = my_node(state) assert result["error"] is not None
undefined

Monitoring

监控

python
import logging

logger = logging.getLogger(__name__)

def monitored_node(state: dict) -> dict:
    """Node with logging."""
    logger.info(f"Executing node with state: {state.keys()}")

    try:
        result = operation()
        logger.info("Node succeeded")
        return {"result": result}
    except Exception as e:
        logger.error(f"Node failed: {e}")
        raise

python
import logging

logger = logging.getLogger(__name__)

def monitored_node(state: dict) -> dict:
    """带日志的节点。"""
    logger.info(f"Executing node with state: {state.keys()}")

    try:
        result = operation()
        logger.info("Node succeeded")
        return {"result": result}
    except Exception as e:
        logger.error(f"Node failed: {e}")
        raise

Summary

总结

LangGraph transforms agent development from linear chains into cyclic, stateful workflows with production-grade capabilities:
Core Strengths:
  • 🔄 Cyclic workflows with loops and branches
  • 💾 Persistent state across sessions
  • 🕐 Time-travel debugging and replay
  • 👤 Human-in-the-loop approval gates
  • 🔧 Tool integration (LangChain, Anthropic, custom)
  • 📊 LangSmith integration for tracing
  • 🏭 Production deployment (cloud or self-hosted)
When to Choose LangGraph:
  • Multi-agent coordination required
  • Complex branching logic
  • State persistence needed
  • Human approvals in workflow
  • Production systems with debugging needs
Learning Path:
  1. Start with StateGraph basics
  2. Add conditional routing
  3. Implement checkpointing
  4. Build multi-agent patterns
  5. Deploy to production
Production Checklist:
  • State schema with TypedDict
  • Error handling in all nodes
  • Checkpointing configured
  • LangSmith tracing enabled
  • Monitoring and logging
  • Testing (unit + integration)
  • Deployment strategy (cloud/self-hosted)
For simple request-response patterns, use LangChain LCEL. For complex stateful workflows, LangGraph is the industry-standard solution.
LangGraph将Agent开发从线性链转变为具备生产级能力的循环有状态工作流
核心优势
  • 🔄 带循环和分支的循环工作流
  • 💾 跨会话持久化状态
  • 🕐 时间旅行调试和回放
  • 👤 人在回路审批关口
  • 🔧 工具集成(LangChain、Anthropic、自定义)
  • 📊 LangSmith集成支持链路追踪
  • 🏭 生产部署(云或自托管)
选择LangGraph的场景
  • 需要多Agent协调
  • 复杂分支逻辑
  • 需要状态持久化
  • 工作流中需要人工审批
  • 需要调试能力的生产系统
学习路径
  1. 从StateGraph基础开始
  2. 添加条件路由
  3. 实现检查点
  4. 构建多Agent模式
  5. 部署到生产
生产检查清单
  • 基于TypedDict的状态schema
  • 所有节点都有错误处理
  • 配置了检查点
  • 启用了LangSmith追踪
  • 监控和日志
  • 测试(单元+集成)
  • 部署策略(云/自托管)
对于简单的请求-响应模式,使用LangChain LCEL即可。对于复杂的有状态工作流,LangGraph是行业标准解决方案。

Related Skills

相关技能

When using Langgraph, these skills enhance your workflow:
  • dspy: DSPy for LLM prompt optimization (complementary to LangGraph orchestration)
  • test-driven-development: Testing LangGraph workflows and state machines
  • systematic-debugging: Debugging multi-agent workflows and state transitions
[Full documentation available in these skills if deployed in your bundle]
使用Langgraph时,以下技能可以提升你的工作流能力:
  • dspy:用于LLM提示优化的DSPy(是LangGraph编排的补充)
  • test-driven-development:测试LangGraph工作流和状态机
  • systematic-debugging:调试多Agent工作流和状态流转
[如果你的技能包中部署了这些技能,可以查看完整文档]