langgraph
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseLangGraph Workflows
LangGraph 工作流
Summary
概述
LangGraph is a framework for building stateful, multi-agent applications with LLMs. It implements state machines and directed graphs for orchestration, enabling complex workflows with persistent state management, human-in-the-loop support, and time-travel debugging.
Key Innovation: Transforms agent coordination from sequential chains into cyclic graphs with persistent state, conditional branching, and production-grade debugging capabilities.
LangGraph是一款用于基于LLM构建有状态多Agent应用的框架。它通过实现状态机和有向图进行编排,支持具备持久化状态管理、人在回路支持和时间旅行调试能力的复杂工作流。
核心创新:将Agent协调从顺序链转换为具备持久化状态、条件分支和生产级调试能力的循环图。
When to Use
适用场景
✅ Use LangGraph When:
- Multi-agent coordination required
- Complex state management needs
- Human-in-the-loop workflows (approval gates, reviews)
- Need debugging/observability (time-travel, replay)
- Conditional branching based on outputs
- Building production agent systems
- State persistence across sessions
❌ Don't Use LangGraph When:
- Simple single-agent tasks
- No state persistence needed
- Prototyping/experimentation phase (use simple chains)
- Team lacks graph/state machine expertise
- Stateless request-response patterns
✅ 适合使用LangGraph的场景:
- 需要多Agent协调
- 有复杂状态管理需求
- 人在回路工作流(审批关口、审核)
- 需要调试/可观测能力(时间旅行、回放)
- 需要基于输出的条件分支
- 构建生产级Agent系统
- 需要跨会话状态持久化
❌ 不适合使用LangGraph的场景:
- 简单的单Agent任务
- 不需要状态持久化
- 原型/实验阶段(建议使用简单链)
- 团队缺乏图/状态机相关专业知识
- 无状态请求-响应模式
Quick Start
快速开始
python
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operatorpython
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator1. Define state schema
1. 定义状态 schema
class AgentState(TypedDict):
messages: Annotated[list, operator.add]
current_step: str
class AgentState(TypedDict):
messages: Annotated[list, operator.add]
current_step: str
2. Create graph
2. 创建图
workflow = StateGraph(AgentState)
workflow = StateGraph(AgentState)
3. Add nodes (agents)
3. 添加节点(Agent)
def researcher(state):
return {"messages": ["Research complete"], "current_step": "research"}
def writer(state):
return {"messages": ["Article written"], "current_step": "writing"}
workflow.add_node("researcher", researcher)
workflow.add_node("writer", writer)
def researcher(state):
return {"messages": ["Research complete"], "current_step": "research"}
def writer(state):
return {"messages": ["Article written"], "current_step": "writing"}
workflow.add_node("researcher", researcher)
workflow.add_node("writer", writer)
4. Add edges (transitions)
4. 添加边(流转规则)
workflow.add_edge("researcher", "writer")
workflow.add_edge("writer", END)
workflow.add_edge("researcher", "writer")
workflow.add_edge("writer", END)
5. Set entry point and compile
5. 设置入口点并编译
workflow.set_entry_point("researcher")
app = workflow.compile()
workflow.set_entry_point("researcher")
app = workflow.compile()
6. Execute
6. 执行
result = app.invoke({"messages": [], "current_step": "start"})
print(result)
---result = app.invoke({"messages": [], "current_step": "start"})
print(result)
---Core Concepts
核心概念
StateGraph
StateGraph
The fundamental building block representing a directed graph of agents with shared state.
python
from langgraph.graph import StateGraph, END
from typing import TypedDict
class AgentState(TypedDict):
"""State schema shared across all nodes."""
messages: list
user_input: str
final_output: str
metadata: dict是构成Agent有向图的基础构建块,所有节点共享状态。
python
from langgraph.graph import StateGraph, END
from typing import TypedDict
class AgentState(TypedDict):
"""所有节点共享的状态 schema。"""
messages: list
user_input: str
final_output: str
metadata: dictCreate graph with state schema
使用状态 schema 创建图
workflow = StateGraph(AgentState)
**Key Properties**:
- **Nodes**: Agent functions that transform state
- **Edges**: Transitions between nodes (static or conditional)
- **State**: Shared data structure passed between nodes
- **Entry Point**: Starting node of execution
- **END**: Terminal node signaling completionworkflow = StateGraph(AgentState)
**核心属性**:
- **节点**:用于转换状态的Agent函数
- **边**:节点之间的流转规则(静态或条件)
- **状态**:在节点之间传递的共享数据结构
- **入口点**:执行的起始节点
- **END**:标识执行完成的终端节点Nodes
节点
Nodes are functions that receive current state and return state updates.
python
def research_agent(state: AgentState) -> dict:
"""Node function: receives state, returns updates."""
query = state["user_input"]
# Perform research (simplified)
results = search_web(query)
# Return state updates (partial state)
return {
"messages": state["messages"] + [f"Research: {results}"],
"metadata": {"research_complete": True}
}节点是接收当前状态并返回状态更新的函数。
python
def research_agent(state: AgentState) -> dict:
"""节点函数:接收状态,返回更新。"""
query = state["user_input"]
# 执行调研(简化版)
results = search_web(query)
# 返回状态更新(部分状态)
return {
"messages": state["messages"] + [f"Research: {results}"],
"metadata": {"research_complete": True}
}Add node to graph
向图中添加节点
workflow.add_node("researcher", research_agent)
**Node Behavior**:
- Receives **full state** as input
- Returns **partial state** (only fields to update)
- Can be sync or async functions
- Can invoke LLMs, call APIs, run computationsworkflow.add_node("researcher", research_agent)
**节点行为**:
- 接收**完整状态**作为输入
- 返回**部分状态**(仅需要更新的字段)
- 可以是同步或异步函数
- 可以调用LLM、API、执行计算Edges
边
Edges define transitions between nodes.
边定义了节点之间的流转规则。
Static Edges
静态边
python
undefinedpython
undefinedDirect transition: researcher → writer
直接流转:researcher → writer
workflow.add_edge("researcher", "writer")
workflow.add_edge("researcher", "writer")
Transition to END
流转到 END
workflow.add_edge("writer", END)
undefinedworkflow.add_edge("writer", END)
undefinedConditional Edges
条件边
python
def should_continue(state: AgentState) -> str:
"""Routing function: decides next node based on state."""
last_message = state["messages"][-1]
if "APPROVED" in last_message:
return END
elif "NEEDS_REVISION" in last_message:
return "writer"
else:
return "reviewer"
workflow.add_conditional_edges(
"reviewer", # Source node
should_continue, # Routing function
{
END: END,
"writer": "writer",
"reviewer": "reviewer"
}
)python
def should_continue(state: AgentState) -> str:
"""路由函数:根据状态决定下一个节点。"""
last_message = state["messages"][-1]
if "APPROVED" in last_message:
return END
elif "NEEDS_REVISION" in last_message:
return "writer"
else:
return "reviewer"
workflow.add_conditional_edges(
"reviewer", # 源节点
should_continue, # 路由函数
{
END: END,
"writer": "writer",
"reviewer": "reviewer"
}
)Graph Construction
图构建
Complete Workflow Example
完整工作流示例
python
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
from langchain_anthropic import ChatAnthropicpython
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
from langchain_anthropic import ChatAnthropicState schema with reducer
带 reducer 的状态 schema
class ResearchState(TypedDict):
topic: str
research_notes: Annotated[list, operator.add] # Reducer: appends to list
draft: str
revision_count: int
approved: bool
class ResearchState(TypedDict):
topic: str
research_notes: Annotated[list, operator.add] # Reducer:追加到列表
draft: str
revision_count: int
approved: bool
Initialize LLM
初始化 LLM
llm = ChatAnthropic(model="claude-sonnet-4")
llm = ChatAnthropic(model="claude-sonnet-4")
Node 1: Research
节点1:调研
def research_node(state: ResearchState) -> dict:
"""Research the topic and gather information."""
topic = state["topic"]
prompt = f"Research key points about: {topic}"
response = llm.invoke(prompt)
return {
"research_notes": [response.content]
}def research_node(state: ResearchState) -> dict:
"""调研主题并收集信息。"""
topic = state["topic"]
prompt = f"Research key points about: {topic}"
response = llm.invoke(prompt)
return {
"research_notes": [response.content]
}Node 2: Write
节点2:写作
def write_node(state: ResearchState) -> dict:
"""Write draft based on research."""
notes = "\n".join(state["research_notes"])
prompt = f"Write article based on:\n{notes}"
response = llm.invoke(prompt)
return {
"draft": response.content,
"revision_count": state.get("revision_count", 0)
}def write_node(state: ResearchState) -> dict:
"""基于调研内容撰写草稿。"""
notes = "\n".join(state["research_notes"])
prompt = f"Write article based on:\n{notes}"
response = llm.invoke(prompt)
return {
"draft": response.content,
"revision_count": state.get("revision_count", 0)
}Node 3: Review
节点3:审核
def review_node(state: ResearchState) -> dict:
"""Review the draft and decide if approved."""
draft = state["draft"]
prompt = f"Review this draft. Reply APPROVED or NEEDS_REVISION:\n{draft}"
response = llm.invoke(prompt)
approved = "APPROVED" in response.content
return {
"approved": approved,
"revision_count": state["revision_count"] + 1,
"research_notes": [f"Review feedback: {response.content}"]
}def review_node(state: ResearchState) -> dict:
"""审核草稿并决定是否通过。"""
draft = state["draft"]
prompt = f"Review this draft. Reply APPROVED or NEEDS_REVISION:\n{draft}"
response = llm.invoke(prompt)
approved = "APPROVED" in response.content
return {
"approved": approved,
"revision_count": state["revision_count"] + 1,
"research_notes": [f"Review feedback: {response.content}"]
}Routing logic
路由逻辑
def should_continue_writing(state: ResearchState) -> str:
"""Decide next step after review."""
if state["approved"]:
return END
elif state["revision_count"] >= 3:
return END # Max revisions reached
else:
return "writer"
def should_continue_writing(state: ResearchState) -> str:
"""审核后决定下一步操作。"""
if state["approved"]:
return END
elif state["revision_count"] >= 3:
return END # 达到最大修订次数
else:
return "writer"
Build graph
构建图
workflow = StateGraph(ResearchState)
workflow.add_node("researcher", research_node)
workflow.add_node("writer", write_node)
workflow.add_node("reviewer", review_node)
workflow = StateGraph(ResearchState)
workflow.add_node("researcher", research_node)
workflow.add_node("writer", write_node)
workflow.add_node("reviewer", review_node)
Static edges
静态边
workflow.add_edge("researcher", "writer")
workflow.add_edge("writer", "reviewer")
workflow.add_edge("researcher", "writer")
workflow.add_edge("writer", "reviewer")
Conditional edge from reviewer
来自 reviewer 的条件边
workflow.add_conditional_edges(
"reviewer",
should_continue_writing,
{
END: END,
"writer": "writer"
}
)
workflow.set_entry_point("researcher")
workflow.add_conditional_edges(
"reviewer",
should_continue_writing,
{
END: END,
"writer": "writer"
}
)
workflow.set_entry_point("researcher")
Compile
编译
app = workflow.compile()
app = workflow.compile()
Execute
执行
result = app.invoke({
"topic": "AI Safety",
"research_notes": [],
"draft": "",
"revision_count": 0,
"approved": False
})
print(f"Final draft: {result['draft']}")
print(f"Revisions: {result['revision_count']}")
---result = app.invoke({
"topic": "AI Safety",
"research_notes": [],
"draft": "",
"revision_count": 0,
"approved": False
})
print(f"Final draft: {result['draft']}")
print(f"Revisions: {result['revision_count']}")
---State Management
状态管理
State Schema with TypedDict
基于 TypedDict 的状态 Schema
python
from typing import TypedDict, Annotated, Literal
import operator
class WorkflowState(TypedDict):
# Simple fields (replaced on update)
user_id: str
request_id: str
status: Literal["pending", "processing", "complete", "failed"]
# List with reducer (appends instead of replacing)
messages: Annotated[list, operator.add]
# Dict with custom reducer
metadata: Annotated[dict, lambda x, y: {**x, **y}]
# Optional fields
error: str | None
result: dict | Nonepython
from typing import TypedDict, Annotated, Literal
import operator
class WorkflowState(TypedDict):
# 简单字段(更新时直接替换)
user_id: str
request_id: str
status: Literal["pending", "processing", "complete", "failed"]
# 带 reducer 的列表(追加而非替换)
messages: Annotated[list, operator.add]
# 带自定义 reducer 的字典
metadata: Annotated[dict, lambda x, y: {**x, **y}]
# 可选字段
error: str | None
result: dict | NoneState Reducers
状态 Reducer
Reducers control how state updates are merged.
python
import operator
from typing import AnnotatedReducer 控制状态更新的合并方式。
python
import operator
from typing import AnnotatedBuilt-in reducers
内置 reducer
Annotated[list, operator.add] # Append to list
Annotated[set, operator.or_] # Union of sets
Annotated[int, operator.add] # Sum integers
Annotated[list, operator.add] # 追加到列表
Annotated[set, operator.or_] # 集合求并集
Annotated[int, operator.add] # 整数求和
Custom reducer
自定义 reducer
def merge_dicts(existing: dict, update: dict) -> dict:
"""Deep merge dictionaries."""
result = existing.copy()
for key, value in update.items():
if key in result and isinstance(result[key], dict) and isinstance(value, dict):
result[key] = merge_dicts(result[key], value)
else:
result[key] = value
return result
class State(TypedDict):
config: Annotated[dict, merge_dicts]
undefineddef merge_dicts(existing: dict, update: dict) -> dict:
"""深度合并字典。"""
result = existing.copy()
for key, value in update.items():
if key in result and isinstance(result[key], dict) and isinstance(value, dict):
result[key] = merge_dicts(result[key], value)
else:
result[key] = value
return result
class State(TypedDict):
config: Annotated[dict, merge_dicts]
undefinedState Updates
状态更新
Nodes return partial state - only fields to update:
python
def node_function(state: State) -> dict:
"""Return only fields that should be updated."""
return {
"messages": ["New message"], # Will be appended
"status": "processing" # Will be replaced
}
# Other fields remain unchanged节点返回部分状态——仅需要更新的字段:
python
def node_function(state: State) -> dict:
"""仅返回需要更新的字段。"""
return {
"messages": ["New message"], # 会被追加
"status": "processing" # 会被替换
}
# 其他字段保持不变Conditional Routing
条件路由
Basic Routing Function
基础路由函数
python
def route_based_on_score(state: State) -> str:
"""Route to different nodes based on state."""
score = state["quality_score"]
if score >= 0.9:
return "publish"
elif score >= 0.6:
return "review"
else:
return "revise"
workflow.add_conditional_edges(
"evaluator",
route_based_on_score,
{
"publish": "publisher",
"review": "reviewer",
"revise": "reviser"
}
)python
def route_based_on_score(state: State) -> str:
"""根据状态路由到不同节点。"""
score = state["quality_score"]
if score >= 0.9:
return "publish"
elif score >= 0.6:
return "review"
else:
return "revise"
workflow.add_conditional_edges(
"evaluator",
route_based_on_score,
{
"publish": "publisher",
"review": "reviewer",
"revise": "reviser"
}
)Dynamic Routing with LLM
基于 LLM 的动态路由
python
from langchain_anthropic import ChatAnthropic
def llm_router(state: State) -> str:
"""Use LLM to decide next step."""
llm = ChatAnthropic(model="claude-sonnet-4")
prompt = f"""
Current state: {state['current_step']}
User request: {state['user_input']}
Which agent should handle this next?
Options: researcher, coder, writer, FINISH
Return only one word.
"""
response = llm.invoke(prompt)
next_agent = response.content.strip().lower()
if next_agent == "finish":
return END
else:
return next_agent
workflow.add_conditional_edges(
"supervisor",
llm_router,
{
"researcher": "researcher",
"coder": "coder",
"writer": "writer",
END: END
}
)python
from langchain_anthropic import ChatAnthropic
def llm_router(state: State) -> str:
"""使用 LLM 决定下一步操作。"""
llm = ChatAnthropic(model="claude-sonnet-4")
prompt = f"""
Current state: {state['current_step']}
User request: {state['user_input']}
Which agent should handle this next?
Options: researcher, coder, writer, FINISH
Return only one word.
"""
response = llm.invoke(prompt)
next_agent = response.content.strip().lower()
if next_agent == "finish":
return END
else:
return next_agent
workflow.add_conditional_edges(
"supervisor",
llm_router,
{
"researcher": "researcher",
"coder": "coder",
"writer": "writer",
END: END
}
)Multi-Condition Routing
多条件路由
python
def complex_router(state: State) -> str:
"""Route based on multiple conditions."""
# Check multiple conditions
has_errors = bool(state.get("errors"))
is_approved = state.get("approved", False)
iteration_count = state.get("iterations", 0)
# Priority-based routing
if has_errors:
return "error_handler"
elif is_approved:
return END
elif iteration_count >= 5:
return "escalation"
else:
return "processor"
workflow.add_conditional_edges(
"validator",
complex_router,
{
"error_handler": "error_handler",
"processor": "processor",
"escalation": "escalation",
END: END
}
)python
def complex_router(state: State) -> str:
"""基于多个条件进行路由。"""
# 检查多个条件
has_errors = bool(state.get("errors"))
is_approved = state.get("approved", False)
iteration_count = state.get("iterations", 0)
# 基于优先级的路由
if has_errors:
return "error_handler"
elif is_approved:
return END
elif iteration_count >= 5:
return "escalation"
else:
return "processor"
workflow.add_conditional_edges(
"validator",
complex_router,
{
"error_handler": "error_handler",
"processor": "processor",
"escalation": "escalation",
END: END
}
)Multi-Agent Patterns
多Agent模式
Supervisor Pattern
主管模式
One supervisor coordinates multiple specialized agents.
python
from langgraph.graph import StateGraph, END
from langchain_anthropic import ChatAnthropic
from typing import TypedDict, Literal
class SupervisorState(TypedDict):
task: str
agent_history: list
result: dict
next_agent: str由一个主管Agent协调多个专业化Agent。
python
from langgraph.graph import StateGraph, END
from langchain_anthropic import ChatAnthropic
from typing import TypedDict, Literal
class SupervisorState(TypedDict):
task: str
agent_history: list
result: dict
next_agent: strSpecialized agents
专业化Agent
class ResearchAgent:
def init(self):
self.llm = ChatAnthropic(model="claude-sonnet-4")
def run(self, state: SupervisorState) -> dict:
response = self.llm.invoke(f"Research: {state['task']}")
return {
"agent_history": [f"Research: {response.content}"],
"result": {"research": response.content}
}class CodingAgent:
def init(self):
self.llm = ChatAnthropic(model="claude-sonnet-4")
def run(self, state: SupervisorState) -> dict:
response = self.llm.invoke(f"Code: {state['task']}")
return {
"agent_history": [f"Code: {response.content}"],
"result": {"code": response.content}
}class SupervisorAgent:
def init(self):
self.llm = ChatAnthropic(model="claude-sonnet-4")
def route(self, state: SupervisorState) -> dict:
"""Decide which agent to use next."""
history = "\n".join(state.get("agent_history", []))
prompt = f"""
Task: {state['task']}
Progress: {history}
Which agent should handle the next step?
Options: researcher, coder, FINISH
Return only one word.
"""
response = self.llm.invoke(prompt)
next_agent = response.content.strip().lower()
return {"next_agent": next_agent}class ResearchAgent:
def init(self):
self.llm = ChatAnthropic(model="claude-sonnet-4")
def run(self, state: SupervisorState) -> dict:
response = self.llm.invoke(f"Research: {state['task']}")
return {
"agent_history": [f"Research: {response.content}"],
"result": {"research": response.content}
}class CodingAgent:
def init(self):
self.llm = ChatAnthropic(model="claude-sonnet-4")
def run(self, state: SupervisorState) -> dict:
response = self.llm.invoke(f"Code: {state['task']}")
return {
"agent_history": [f"Code: {response.content}"],
"result": {"code": response.content}
}class SupervisorAgent:
def init(self):
self.llm = ChatAnthropic(model="claude-sonnet-4")
def route(self, state: SupervisorState) -> dict:
"""决定下一步使用哪个Agent。"""
history = "\n".join(state.get("agent_history", []))
prompt = f"""
Task: {state['task']}
Progress: {history}
Which agent should handle the next step?
Options: researcher, coder, FINISH
Return only one word.
"""
response = self.llm.invoke(prompt)
next_agent = response.content.strip().lower()
return {"next_agent": next_agent}Build supervisor workflow
构建主管工作流
def create_supervisor_workflow():
workflow = StateGraph(SupervisorState)
# Initialize agents
research_agent = ResearchAgent()
coding_agent = CodingAgent()
supervisor = SupervisorAgent()
# Add nodes
workflow.add_node("supervisor", supervisor.route)
workflow.add_node("researcher", research_agent.run)
workflow.add_node("coder", coding_agent.run)
# Conditional routing from supervisor
def route_from_supervisor(state: SupervisorState) -> str:
next_agent = state.get("next_agent", "FINISH")
if next_agent == "finish":
return END
return next_agent
workflow.add_conditional_edges(
"supervisor",
route_from_supervisor,
{
"researcher": "researcher",
"coder": "coder",
END: END
}
)
# Loop back to supervisor after each agent
workflow.add_edge("researcher", "supervisor")
workflow.add_edge("coder", "supervisor")
workflow.set_entry_point("supervisor")
return workflow.compile()def create_supervisor_workflow():
workflow = StateGraph(SupervisorState)
# 初始化Agent
research_agent = ResearchAgent()
coding_agent = CodingAgent()
supervisor = SupervisorAgent()
# 添加节点
workflow.add_node("supervisor", supervisor.route)
workflow.add_node("researcher", research_agent.run)
workflow.add_node("coder", coding_agent.run)
# 来自主管的条件路由
def route_from_supervisor(state: SupervisorState) -> str:
next_agent = state.get("next_agent", "FINISH")
if next_agent == "finish":
return END
return next_agent
workflow.add_conditional_edges(
"supervisor",
route_from_supervisor,
{
"researcher": "researcher",
"coder": "coder",
END: END
}
)
# 每个Agent执行完成后回到主管
workflow.add_edge("researcher", "supervisor")
workflow.add_edge("coder", "supervisor")
workflow.set_entry_point("supervisor")
return workflow.compile()Execute
执行
app = create_supervisor_workflow()
result = app.invoke({
"task": "Build a REST API for user management",
"agent_history": [],
"result": {},
"next_agent": ""
})
undefinedapp = create_supervisor_workflow()
result = app.invoke({
"task": "Build a REST API for user management",
"agent_history": [],
"result": {},
"next_agent": ""
})
undefinedHierarchical Multi-Agent
分层多Agent
Nested supervisor pattern with sub-teams.
python
class TeamState(TypedDict):
task: str
team_results: dict
def create_backend_team():
"""Sub-graph for backend development."""
workflow = StateGraph(TeamState)
workflow.add_node("api_designer", design_api)
workflow.add_node("database_designer", design_db)
workflow.add_node("implementer", implement_backend)
workflow.add_edge("api_designer", "database_designer")
workflow.add_edge("database_designer", "implementer")
workflow.add_edge("implementer", END)
workflow.set_entry_point("api_designer")
return workflow.compile()
def create_frontend_team():
"""Sub-graph for frontend development."""
workflow = StateGraph(TeamState)
workflow.add_node("ui_designer", design_ui)
workflow.add_node("component_builder", build_components)
workflow.add_node("integrator", integrate_frontend)
workflow.add_edge("ui_designer", "component_builder")
workflow.add_edge("component_builder", "integrator")
workflow.add_edge("integrator", END)
workflow.set_entry_point("ui_designer")
return workflow.compile()嵌套的主管模式,包含子团队。
python
class TeamState(TypedDict):
task: str
team_results: dict
def create_backend_team():
"""后端开发子图。"""
workflow = StateGraph(TeamState)
workflow.add_node("api_designer", design_api)
workflow.add_node("database_designer", design_db)
workflow.add_node("implementer", implement_backend)
workflow.add_edge("api_designer", "database_designer")
workflow.add_edge("database_designer", "implementer")
workflow.add_edge("implementer", END)
workflow.set_entry_point("api_designer")
return workflow.compile()
def create_frontend_team():
"""前端开发子图。"""
workflow = StateGraph(TeamState)
workflow.add_node("ui_designer", design_ui)
workflow.add_node("component_builder", build_components)
workflow.add_node("integrator", integrate_frontend)
workflow.add_edge("ui_designer", "component_builder")
workflow.add_edge("component_builder", "integrator")
workflow.add_edge("integrator", END)
workflow.set_entry_point("ui_designer")
return workflow.compile()Top-level coordinator
顶层协调器
def create_project_workflow():
workflow = StateGraph(TeamState)
# Add team sub-graphs as nodes
backend_team = create_backend_team()
frontend_team = create_frontend_team()
workflow.add_node("backend_team", backend_team)
workflow.add_node("frontend_team", frontend_team)
workflow.add_node("integrator", integrate_teams)
# Parallel execution of teams
workflow.add_edge("backend_team", "integrator")
workflow.add_edge("frontend_team", "integrator")
workflow.add_edge("integrator", END)
workflow.set_entry_point("backend_team")
return workflow.compile()undefineddef create_project_workflow():
workflow = StateGraph(TeamState)
# 将团队子图作为节点添加
backend_team = create_backend_team()
frontend_team = create_frontend_team()
workflow.add_node("backend_team", backend_team)
workflow.add_node("frontend_team", frontend_team)
workflow.add_node("integrator", integrate_teams)
# 团队并行执行
workflow.add_edge("backend_team", "integrator")
workflow.add_edge("frontend_team", "integrator")
workflow.add_edge("integrator", END)
workflow.set_entry_point("backend_team")
return workflow.compile()undefinedSwarm Pattern (2025)
蜂群模式(2025)
Dynamic agent hand-offs with collaborative decision-making.
python
from typing import Literal
class SwarmState(TypedDict):
task: str
messages: list
current_agent: str
handoff_reason: str
def create_swarm():
"""Agents can dynamically hand off to each other."""
def research_agent(state: SwarmState) -> dict:
llm = ChatAnthropic(model="claude-sonnet-4")
# Perform research
response = llm.invoke(f"Research: {state['task']}")
# Decide if handoff needed
needs_code = "implementation" in response.content.lower()
if needs_code:
return {
"messages": [response.content],
"current_agent": "coder",
"handoff_reason": "Need implementation"
}
else:
return {
"messages": [response.content],
"current_agent": "writer",
"handoff_reason": "Ready for documentation"
}
def coding_agent(state: SwarmState) -> dict:
llm = ChatAnthropic(model="claude-sonnet-4")
response = llm.invoke(f"Implement: {state['task']}")
return {
"messages": [response.content],
"current_agent": "tester",
"handoff_reason": "Code complete, needs testing"
}
def tester_agent(state: SwarmState) -> dict:
# Test the code
tests_pass = True # Simplified
if tests_pass:
return {
"messages": ["Tests passed"],
"current_agent": "DONE",
"handoff_reason": "All tests passing"
}
else:
return {
"messages": ["Tests failed"],
"current_agent": "coder",
"handoff_reason": "Fix failing tests"
}
# Build graph
workflow = StateGraph(SwarmState)
workflow.add_node("researcher", research_agent)
workflow.add_node("coder", coding_agent)
workflow.add_node("tester", tester_agent)
# Dynamic routing based on current_agent
def route_swarm(state: SwarmState) -> str:
next_agent = state.get("current_agent", "DONE")
if next_agent == "DONE":
return END
return next_agent
workflow.add_conditional_edges(
"researcher",
route_swarm,
{"coder": "coder", "writer": "writer"}
)
workflow.add_conditional_edges(
"coder",
route_swarm,
{"tester": "tester"}
)
workflow.add_conditional_edges(
"tester",
route_swarm,
{"coder": "coder", END: END}
)
workflow.set_entry_point("researcher")
return workflow.compile()动态Agent交接,支持协同决策。
python
from typing import Literal
class SwarmState(TypedDict):
task: str
messages: list
current_agent: str
handoff_reason: str
def create_swarm():
"""Agent之间可以动态交接。"""
def research_agent(state: SwarmState) -> dict:
llm = ChatAnthropic(model="claude-sonnet-4")
# 执行调研
response = llm.invoke(f"Research: {state['task']}")
# 决定是否需要交接
needs_code = "implementation" in response.content.lower()
if needs_code:
return {
"messages": [response.content],
"current_agent": "coder",
"handoff_reason": "Need implementation"
}
else:
return {
"messages": [response.content],
"current_agent": "writer",
"handoff_reason": "Ready for documentation"
}
def coding_agent(state: SwarmState) -> dict:
llm = ChatAnthropic(model="claude-sonnet-4")
response = llm.invoke(f"Implement: {state['task']}")
return {
"messages": [response.content],
"current_agent": "tester",
"handoff_reason": "Code complete, needs testing"
}
def tester_agent(state: SwarmState) -> dict:
# 测试代码
tests_pass = True # 简化版
if tests_pass:
return {
"messages": ["Tests passed"],
"current_agent": "DONE",
"handoff_reason": "All tests passing"
}
else:
return {
"messages": ["Tests failed"],
"current_agent": "coder",
"handoff_reason": "Fix failing tests"
}
# 构建图
workflow = StateGraph(SwarmState)
workflow.add_node("researcher", research_agent)
workflow.add_node("coder", coding_agent)
workflow.add_node("tester", tester_agent)
# 基于 current_agent 动态路由
def route_swarm(state: SwarmState) -> str:
next_agent = state.get("current_agent", "DONE")
if next_agent == "DONE":
return END
return next_agent
workflow.add_conditional_edges(
"researcher",
route_swarm,
{"coder": "coder", "writer": "writer"}
)
workflow.add_conditional_edges(
"coder",
route_swarm,
{"tester": "tester"}
)
workflow.add_conditional_edges(
"tester",
route_swarm,
{"coder": "coder", END: END}
)
workflow.set_entry_point("researcher")
return workflow.compile()Human-in-the-Loop
人在回路
Interrupt for Approval
中断等待审批
python
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.graph import StateGraph, ENDpython
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.graph import StateGraph, ENDEnable checkpointing for interrupts
启用检查点以支持中断
memory = SqliteSaver.from_conn_string(":memory:")
def create_approval_workflow():
workflow = StateGraph(dict)
workflow.add_node("draft", create_draft)
workflow.add_node("approve", human_approval) # Interrupt point
workflow.add_node("publish", publish_content)
workflow.add_edge("draft", "approve")
workflow.add_edge("approve", "publish")
workflow.add_edge("publish", END)
workflow.set_entry_point("draft")
# Compile with interrupt before "approve"
return workflow.compile(
checkpointer=memory,
interrupt_before=["approve"] # Pause here
)memory = SqliteSaver.from_conn_string(":memory:")
def create_approval_workflow():
workflow = StateGraph(dict)
workflow.add_node("draft", create_draft)
workflow.add_node("approve", human_approval) # 中断点
workflow.add_node("publish", publish_content)
workflow.add_edge("draft", "approve")
workflow.add_edge("approve", "publish")
workflow.add_edge("publish", END)
workflow.set_entry_point("draft")
# 编译时配置在"approve"节点前中断
return workflow.compile(
checkpointer=memory,
interrupt_before=["approve"] # 在此处暂停
)Usage
使用方法
app = create_approval_workflow()
config = {"configurable": {"thread_id": "1"}}
app = create_approval_workflow()
config = {"configurable": {"thread_id": "1"}}
Step 1: Run until interrupt
步骤1:运行到中断点
for event in app.stream({"content": "Draft article..."}, config):
print(event)
# Workflow pauses at "approve" node
for event in app.stream({"content": "Draft article..."}, config):
print(event)
# 工作流在"approve"节点暂停
Human reviews draft
人工审核草稿
print("Draft ready for review. Approve? (y/n)")
approval = input()
print("Draft ready for review. Approve? (y/n)")
approval = input()
Step 2: Resume with approval decision
步骤2:携带审批结果恢复执行
if approval == "y":
# Continue execution
result = app.invoke(None, config) # Resume from checkpoint
print("Published:", result)
else:
# Optionally update state and retry
app.update_state(config, {"needs_revision": True})
result = app.invoke(None, config)
undefinedif approval == "y":
# 继续执行
result = app.invoke(None, config) # 从检查点恢复
print("Published:", result)
else:
# 可选:更新状态并重试
app.update_state(config, {"needs_revision": True})
result = app.invoke(None, config)
undefinedInteractive Approval Gates
交互式审批关口
python
class ApprovalState(TypedDict):
draft: str
approved: bool
feedback: str
def approval_node(state: ApprovalState) -> dict:
"""This node requires human interaction."""
# This is where workflow pauses
# Human provides input through update_state
return state # No automatic changes
def create_interactive_workflow():
workflow = StateGraph(ApprovalState)
workflow.add_node("writer", write_draft)
workflow.add_node("approval_gate", approval_node)
workflow.add_node("reviser", revise_draft)
workflow.add_node("publisher", publish_final)
workflow.add_edge("writer", "approval_gate")
# Route based on approval
def route_after_approval(state: ApprovalState) -> str:
if state.get("approved"):
return "publisher"
else:
return "reviser"
workflow.add_conditional_edges(
"approval_gate",
route_after_approval,
{"publisher": "publisher", "reviser": "reviser"}
)
workflow.add_edge("reviser", "approval_gate") # Loop back
workflow.add_edge("publisher", END)
workflow.set_entry_point("writer")
memory = SqliteSaver.from_conn_string("approvals.db")
return workflow.compile(
checkpointer=memory,
interrupt_before=["approval_gate"]
)python
class ApprovalState(TypedDict):
draft: str
approved: bool
feedback: str
def approval_node(state: ApprovalState) -> dict:
"""该节点需要人工交互。"""
# 工作流在此处暂停
# 人工通过 update_state 提供输入
return state # 无自动变更
def create_interactive_workflow():
workflow = StateGraph(ApprovalState)
workflow.add_node("writer", write_draft)
workflow.add_node("approval_gate", approval_node)
workflow.add_node("reviser", revise_draft)
workflow.add_node("publisher", publish_final)
workflow.add_edge("writer", "approval_gate")
# 基于审批结果路由
def route_after_approval(state: ApprovalState) -> str:
if state.get("approved"):
return "publisher"
else:
return "reviser"
workflow.add_conditional_edges(
"approval_gate",
route_after_approval,
{"publisher": "publisher", "reviser": "reviser"}
)
workflow.add_edge("reviser", "approval_gate") # 循环回到审批关口
workflow.add_edge("publisher", END)
workflow.set_entry_point("writer")
memory = SqliteSaver.from_conn_string("approvals.db")
return workflow.compile(
checkpointer=memory,
interrupt_before=["approval_gate"]
)Execute with human intervention
执行时人工介入
app = create_interactive_workflow()
config = {"configurable": {"thread_id": "article_123"}}
app = create_interactive_workflow()
config = {"configurable": {"thread_id": "article_123"}}
Step 1: Generate draft (pauses at approval_gate)
步骤1:生成草稿(在approval_gate暂停)
for event in app.stream({"draft": "", "approved": False, "feedback": ""}, config):
print(event)
for event in app.stream({"draft": "", "approved": False, "feedback": ""}, config):
print(event)
Step 2: Human reviews and provides feedback
步骤2:人工审核并提供反馈
current_state = app.get_state(config)
print(f"Review this draft: {current_state.values['draft']}")
current_state = app.get_state(config)
print(f"Review this draft: {current_state.values['draft']}")
Human decides
人工决策
app.update_state(config, {
"approved": False,
"feedback": "Add more examples in section 2"
})
app.update_state(config, {
"approved": False,
"feedback": "Add more examples in section 2"
})
Step 3: Resume (goes to reviser due to approved=False)
步骤3:恢复执行(因为approved=False,会进入reviser节点)
result = app.invoke(None, config)
---result = app.invoke(None, config)
---Checkpointing and Persistence
检查点与持久化
MemorySaver (In-Memory)
MemorySaver(内存级)
python
from langgraph.checkpoint.memory import MemorySaverpython
from langgraph.checkpoint.memory import MemorySaverIn-memory checkpointing (lost on restart)
内存级检查点(重启后丢失)
memory = MemorySaver()
app = workflow.compile(checkpointer=memory)
config = {"configurable": {"thread_id": "conversation_1"}}
memory = MemorySaver()
app = workflow.compile(checkpointer=memory)
config = {"configurable": {"thread_id": "conversation_1"}}
State automatically saved at each step
每一步状态自动保存
result = app.invoke(initial_state, config)
undefinedresult = app.invoke(initial_state, config)
undefinedSQLite Persistence
SQLite 持久化
python
from langgraph.checkpoint.sqlite import SqliteSaverpython
from langgraph.checkpoint.sqlite import SqliteSaverPersistent checkpointing with SQLite
基于SQLite的持久化检查点
checkpointer = SqliteSaver.from_conn_string("./workflow_state.db")
app = workflow.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "user_session_456"}}
checkpointer = SqliteSaver.from_conn_string("./workflow_state.db")
app = workflow.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "user_session_456"}}
State persists across restarts
状态跨重启持久化
result = app.invoke(initial_state, config)
result = app.invoke(initial_state, config)
Later: Resume from same thread
后续:从同一个线程恢复
result2 = app.invoke(None, config) # Continues from last state
undefinedresult2 = app.invoke(None, config) # 从最后一个状态继续
undefinedPostgreSQL for Production
生产级 PostgreSQL
python
from langgraph.checkpoint.postgres import PostgresSaverpython
from langgraph.checkpoint.postgres import PostgresSaverProduction-grade persistence
生产级持久化
checkpointer = PostgresSaver.from_conn_string(
"postgresql://user:pass@localhost/langgraph_db"
)
app = workflow.compile(checkpointer=checkpointer)
checkpointer = PostgresSaver.from_conn_string(
"postgresql://user:pass@localhost/langgraph_db"
)
app = workflow.compile(checkpointer=checkpointer)
Supports concurrent workflows with isolation
支持带隔离的并发工作流
config1 = {"configurable": {"thread_id": "user_1"}}
config2 = {"configurable": {"thread_id": "user_2"}}
config1 = {"configurable": {"thread_id": "user_1"}}
config2 = {"configurable": {"thread_id": "user_2"}}
Each thread maintains independent state
每个线程维护独立状态
result1 = app.invoke(state1, config1)
result2 = app.invoke(state2, config2)
undefinedresult1 = app.invoke(state1, config1)
result2 = app.invoke(state2, config2)
undefinedRedis for Distributed Systems
分布式系统用 Redis
python
from langgraph.checkpoint.redis import RedisSaverpython
from langgraph.checkpoint.redis import RedisSaverDistributed checkpointing
分布式检查点
checkpointer = RedisSaver.from_conn_string(
"redis://localhost:6379",
ttl=3600 # 1 hour TTL
)
app = workflow.compile(checkpointer=checkpointer)
checkpointer = RedisSaver.from_conn_string(
"redis://localhost:6379",
ttl=3600 # 1小时过期时间
)
app = workflow.compile(checkpointer=checkpointer)
Supports horizontal scaling
支持水平扩容
Multiple workers can process different threads
多个Worker可以处理不同的线程
---
---Time-Travel Debugging
时间旅行调试
View State History
查看状态历史
python
from langgraph.checkpoint.sqlite import SqliteSaver
checkpointer = SqliteSaver.from_conn_string("workflow.db")
app = workflow.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "debug_session"}}python
from langgraph.checkpoint.sqlite import SqliteSaver
checkpointer = SqliteSaver.from_conn_string("workflow.db")
app = workflow.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "debug_session"}}Run workflow
运行工作流
result = app.invoke(initial_state, config)
result = app.invoke(initial_state, config)
Retrieve full history
获取完整历史
history = app.get_state_history(config)
for i, state_snapshot in enumerate(history):
print(f"Step {i}:")
print(f" Node: {state_snapshot.next}")
print(f" State: {state_snapshot.values}")
print(f" Metadata: {state_snapshot.metadata}")
print()
undefinedhistory = app.get_state_history(config)
for i, state_snapshot in enumerate(history):
print(f"Step {i}:")
print(f" Node: {state_snapshot.next}")
print(f" State: {state_snapshot.values}")
print(f" Metadata: {state_snapshot.metadata}")
print()
undefinedReplay from Checkpoint
从检查点回放
python
undefinedpython
undefinedGet state at specific step
获取特定步骤的状态
history = list(app.get_state_history(config))
checkpoint_3 = history[3] # Get state at step 3
history = list(app.get_state_history(config))
checkpoint_3 = history[3] # 获取第3步的状态
Replay from that checkpoint
从该检查点回放
config_replay = {
"configurable": {
"thread_id": "debug_session",
"checkpoint_id": checkpoint_3.config["configurable"]["checkpoint_id"]
}
}
config_replay = {
"configurable": {
"thread_id": "debug_session",
"checkpoint_id": checkpoint_3.config["configurable"]["checkpoint_id"]
}
}
Resume from step 3
从第3步恢复执行
result = app.invoke(None, config_replay)
undefinedresult = app.invoke(None, config_replay)
undefinedRewind and Modify
回退并修改
python
undefinedpython
undefinedRewind to step 5
回退到第5步
history = list(app.get_state_history(config))
step_5 = history[5]
history = list(app.get_state_history(config))
step_5 = history[5]
Update state at that point
更新该时间点的状态
app.update_state(
config,
{"messages": ["Modified message"]},
as_node="researcher" # Apply update as if from this node
)
app.update_state(
config,
{"messages": ["Modified message"]},
as_node="researcher" # 模拟从该节点应用更新
)
Continue execution with modified state
使用修改后的状态继续执行
result = app.invoke(None, config)
undefinedresult = app.invoke(None, config)
undefinedDebug Workflow
调试工作流
python
def debug_workflow():
"""Debug workflow step-by-step."""
checkpointer = SqliteSaver.from_conn_string("debug.db")
app = workflow.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "debug"}}
# Execute step-by-step
state = initial_state
for step in range(10): # Max 10 steps
print(f"\n=== Step {step} ===")
# Get current state
current = app.get_state(config)
print(f"Current state: {current.values}")
print(f"Next node: {current.next}")
if not current.next:
print("Workflow complete")
break
# Execute one step
for event in app.stream(None, config):
print(f"Event: {event}")
# Pause for inspection
input("Press Enter to continue...")
# View full history
print("\n=== Full History ===")
for i, snapshot in enumerate(app.get_state_history(config)):
print(f"Step {i}: {snapshot.next} -> {snapshot.values}")
debug_workflow()python
def debug_workflow():
"""逐步调试工作流。"""
checkpointer = SqliteSaver.from_conn_string("debug.db")
app = workflow.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "debug"}}
# 逐步执行
state = initial_state
for step in range(10): # 最多10步
print(f"\n=== Step {step} ===")
# 获取当前状态
current = app.get_state(config)
print(f"Current state: {current.values}")
print(f"Next node: {current.next}")
if not current.next:
print("Workflow complete")
break
# 执行一步
for event in app.stream(None, config):
print(f"Event: {event}")
# 暂停以便检查
input("Press Enter to continue...")
# 查看完整历史
print("\n=== Full History ===")
for i, snapshot in enumerate(app.get_state_history(config)):
print(f"Step {i}: {snapshot.next} -> {snapshot.values}")
debug_workflow()Streaming
流式输出
Token Streaming
Token 流式输出
python
from langchain_anthropic import ChatAnthropic
def streaming_node(state: dict) -> dict:
"""Node that streams LLM output."""
llm = ChatAnthropic(model="claude-sonnet-4")
# Use streaming
response = ""
for chunk in llm.stream(state["prompt"]):
content = chunk.content
print(content, end="", flush=True)
response += content
return {"response": response}python
from langchain_anthropic import ChatAnthropic
def streaming_node(state: dict) -> dict:
"""流式输出LLM结果的节点。"""
llm = ChatAnthropic(model="claude-sonnet-4")
# 使用流式输出
response = ""
for chunk in llm.stream(state["prompt"]):
content = chunk.content
print(content, end="", flush=True)
response += content
return {"response": response}Stream events from workflow
从工作流流式输出事件
for event in app.stream(initial_state):
print(event)
undefinedfor event in app.stream(initial_state):
print(event)
undefinedEvent Streaming
事件流式输出
python
undefinedpython
undefinedStream all events (node execution, state updates)
流式输出所有事件(节点执行、状态更新)
for event in app.stream(initial_state, stream_mode="updates"):
node_name = event.keys()[0]
state_update = event[node_name]
print(f"Node '{node_name}' updated state: {state_update}")
undefinedfor event in app.stream(initial_state, stream_mode="updates"):
node_name = event.keys()[0]
state_update = event[node_name]
print(f"Node '{node_name}' updated state: {state_update}")
undefinedCustom Streaming
自定义流式输出
python
from typing import AsyncGenerator
async def streaming_workflow(input_data: dict) -> AsyncGenerator:
"""Stream workflow progress in real-time."""
config = {"configurable": {"thread_id": "stream"}}
async for event in app.astream(input_data, config):
# Stream each state update
yield {
"type": "state_update",
"data": event
}
# Stream final result
final_state = app.get_state(config)
yield {
"type": "complete",
"data": final_state.values
}python
from typing import AsyncGenerator
async def streaming_workflow(input_data: dict) -> AsyncGenerator:
"""实时流式输出工作流进度。"""
config = {"configurable": {"thread_id": "stream"}}
async for event in app.astream(input_data, config):
# 流式输出每个状态更新
yield {
"type": "state_update",
"data": event
}
# 流式输出最终结果
final_state = app.get_state(config)
yield {
"type": "complete",
"data": final_state.values
}Usage in FastAPI
在FastAPI中使用
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app_api = FastAPI()
@app_api.post("/workflow/stream")
async def stream_workflow(input: dict):
return StreamingResponse(
streaming_workflow(input),
media_type="text/event-stream"
)
---from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app_api = FastAPI()
@app_api.post("/workflow/stream")
async def stream_workflow(input: dict):
return StreamingResponse(
streaming_workflow(input),
media_type="text/event-stream"
)
---Tool Integration
工具集成
LangChain Tools
LangChain 工具
python
from langchain.tools import Tool
from langchain_community.tools import DuckDuckGoSearchRun
from langchain.agents import AgentExecutor, create_react_agentpython
from langchain.tools import Tool
from langchain_community.tools import DuckDuckGoSearchRun
from langchain.agents import AgentExecutor, create_react_agentDefine tools
定义工具
search = DuckDuckGoSearchRun()
tools = [
Tool(
name="Search",
func=search.run,
description="Search the web for information"
)
]
def agent_with_tools(state: dict) -> dict:
"""Node that uses LangChain tools."""
from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-sonnet-4")
# Create agent with tools
agent = create_react_agent(llm, tools, prompt_template)
agent_executor = AgentExecutor(agent=agent, tools=tools)
# Execute with tools
result = agent_executor.invoke({"input": state["query"]})
return {"result": result["output"]}search = DuckDuckGoSearchRun()
tools = [
Tool(
name="Search",
func=search.run,
description="Search the web for information"
)
]
def agent_with_tools(state: dict) -> dict:
"""使用LangChain工具的节点。"""
from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-sonnet-4")
# 创建带工具的Agent
agent = create_react_agent(llm, tools, prompt_template)
agent_executor = AgentExecutor(agent=agent, tools=tools)
# 使用工具执行
result = agent_executor.invoke({"input": state["query"]})
return {"result": result["output"]}Add to graph
添加到图中
workflow.add_node("agent_with_tools", agent_with_tools)
undefinedworkflow.add_node("agent_with_tools", agent_with_tools)
undefinedCustom Tools
自定义工具
python
from langchain.tools import StructuredTool
from pydantic import BaseModel, Field
class CalculatorInput(BaseModel):
expression: str = Field(description="Mathematical expression to evaluate")
def calculate(expression: str) -> str:
"""Evaluate mathematical expression."""
try:
result = eval(expression) # Simplified
return f"Result: {result}"
except Exception as e:
return f"Error: {e}"
calculator_tool = StructuredTool.from_function(
func=calculate,
name="Calculator",
description="Evaluate mathematical expressions",
args_schema=CalculatorInput
)
tools = [calculator_tool]
def tool_using_node(state: dict) -> dict:
"""Use custom tools."""
result = calculator_tool.invoke({"expression": state["math_problem"]})
return {"solution": result}python
from langchain.tools import StructuredTool
from pydantic import BaseModel, Field
class CalculatorInput(BaseModel):
expression: str = Field(description="Mathematical expression to evaluate")
def calculate(expression: str) -> str:
"""计算数学表达式。"""
try:
result = eval(expression) # 简化版
return f"Result: {result}"
except Exception as e:
return f"Error: {e}"
calculator_tool = StructuredTool.from_function(
func=calculate,
name="Calculator",
description="Evaluate mathematical expressions",
args_schema=CalculatorInput
)
tools = [calculator_tool]
def tool_using_node(state: dict) -> dict:
"""使用自定义工具。"""
result = calculator_tool.invoke({"expression": state["math_problem"]})
return {"solution": result}Anthropic Tool Use
Anthropic 工具调用
python
from anthropic import Anthropic
import json
def node_with_anthropic_tools(state: dict) -> dict:
"""Use Anthropic's tool use feature."""
client = Anthropic()
# Define tools
tools = [
{
"name": "get_weather",
"description": "Get weather for a location",
"input_schema": {
"type": "object",
"properties": {
"location": {"type": "string"}
},
"required": ["location"]
}
}
]
# First call: Request tool use
response = client.messages.create(
model="claude-sonnet-4",
max_tokens=1024,
tools=tools,
messages=[{"role": "user", "content": state["query"]}]
)
# Execute tool if requested
if response.stop_reason == "tool_use":
tool_use = next(
block for block in response.content
if block.type == "tool_use"
)
# Execute tool
tool_result = execute_tool(tool_use.name, tool_use.input)
# Second call: Provide tool result
final_response = client.messages.create(
model="claude-sonnet-4",
max_tokens=1024,
tools=tools,
messages=[
{"role": "user", "content": state["query"]},
{"role": "assistant", "content": response.content},
{
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": tool_use.id,
"content": str(tool_result)
}
]
}
]
)
return {"response": final_response.content[0].text}
return {"response": response.content[0].text}python
from anthropic import Anthropic
import json
def node_with_anthropic_tools(state: dict) -> dict:
"""使用Anthropic的工具调用特性。"""
client = Anthropic()
# 定义工具
tools = [
{
"name": "get_weather",
"description": "Get weather for a location",
"input_schema": {
"type": "object",
"properties": {
"location": {"type": "string"}
},
"required": ["location"]
}
}
]
# 第一次调用:请求工具调用
response = client.messages.create(
model="claude-sonnet-4",
max_tokens=1024,
tools=tools,
messages=[{"role": "user", "content": state["query"]}]
)
# 如果请求工具调用则执行
if response.stop_reason == "tool_use":
tool_use = next(
block for block in response.content
if block.type == "tool_use"
)
# 执行工具
tool_result = execute_tool(tool_use.name, tool_use.input)
# 第二次调用:提供工具结果
final_response = client.messages.create(
model="claude-sonnet-4",
max_tokens=1024,
tools=tools,
messages=[
{"role": "user", "content": state["query"]},
{"role": "assistant", "content": response.content},
{
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": tool_use.id,
"content": str(tool_result)
}
]
}
]
)
return {"response": final_response.content[0].text}
return {"response": response.content[0].text}Error Handling
错误处理
Try-Catch in Nodes
节点内的 Try-Catch
python
def robust_node(state: dict) -> dict:
"""Node with error handling."""
try:
# Main logic
result = risky_operation(state["input"])
return {
"result": result,
"error": None,
"status": "success"
}
except ValueError as e:
# Handle specific error
return {
"error": f"Validation error: {e}",
"status": "validation_failed"
}
except Exception as e:
# Handle unexpected errors
return {
"error": f"Unexpected error: {e}",
"status": "failed"
}python
def robust_node(state: dict) -> dict:
"""带错误处理的节点。"""
try:
# 核心逻辑
result = risky_operation(state["input"])
return {
"result": result,
"error": None,
"status": "success"
}
except ValueError as e:
# 处理特定错误
return {
"error": f"Validation error: {e}",
"status": "validation_failed"
}
except Exception as e:
# 处理非预期错误
return {
"error": f"Unexpected error: {e}",
"status": "failed"
}Route based on error status
基于错误状态路由
def error_router(state: dict) -> str:
status = state.get("status")
if status == "success":
return "next_step"
elif status == "validation_failed":
return "validator"
else:
return "error_handler"workflow.add_conditional_edges(
"robust_node",
error_router,
{
"next_step": "next_step",
"validator": "validator",
"error_handler": "error_handler"
}
)
undefineddef error_router(state: dict) -> str:
status = state.get("status")
if status == "success":
return "next_step"
elif status == "validation_failed":
return "validator"
else:
return "error_handler"workflow.add_conditional_edges(
"robust_node",
error_router,
{
"next_step": "next_step",
"validator": "validator",
"error_handler": "error_handler"
}
)
undefinedRetry Logic
重试逻辑
python
from typing import TypedDict
class RetryState(TypedDict):
input: str
result: str
error: str | None
retry_count: int
max_retries: int
def node_with_retry(state: RetryState) -> dict:
"""Node that can be retried on failure."""
try:
result = unstable_operation(state["input"])
return {
"result": result,
"error": None,
"retry_count": 0
}
except Exception as e:
return {
"error": str(e),
"retry_count": state.get("retry_count", 0) + 1
}
def should_retry(state: RetryState) -> str:
"""Decide whether to retry or fail."""
if state.get("error") is None:
return "success"
retry_count = state.get("retry_count", 0)
max_retries = state.get("max_retries", 3)
if retry_count < max_retries:
return "retry"
else:
return "failed"python
from typing import TypedDict
class RetryState(TypedDict):
input: str
result: str
error: str | None
retry_count: int
max_retries: int
def node_with_retry(state: RetryState) -> dict:
"""失败时可重试的节点。"""
try:
result = unstable_operation(state["input"])
return {
"result": result,
"error": None,
"retry_count": 0
}
except Exception as e:
return {
"error": str(e),
"retry_count": state.get("retry_count", 0) + 1
}
def should_retry(state: RetryState) -> str:
"""决定是重试还是失败。"""
if state.get("error") is None:
return "success"
retry_count = state.get("retry_count", 0)
max_retries = state.get("max_retries", 3)
if retry_count < max_retries:
return "retry"
else:
return "failed"Build retry loop
构建重试循环
workflow.add_node("operation", node_with_retry)
workflow.add_node("success_handler", handle_success)
workflow.add_node("failure_handler", handle_failure)
workflow.add_conditional_edges(
"operation",
should_retry,
{
"success": "success_handler",
"retry": "operation", # Loop back
"failed": "failure_handler"
}
)
undefinedworkflow.add_node("operation", node_with_retry)
workflow.add_node("success_handler", handle_success)
workflow.add_node("failure_handler", handle_failure)
workflow.add_conditional_edges(
"operation",
should_retry,
{
"success": "success_handler",
"retry": "operation", # 循环回到节点
"failed": "failure_handler"
}
)
undefinedFallback Strategies
降级策略
python
def node_with_fallback(state: dict) -> dict:
"""Try primary method, fall back to secondary."""
# Try primary LLM
try:
result = primary_llm(state["prompt"])
return {
"result": result,
"method": "primary"
}
except Exception as e_primary:
# Fallback to secondary LLM
try:
result = secondary_llm(state["prompt"])
return {
"result": result,
"method": "fallback",
"primary_error": str(e_primary)
}
except Exception as e_secondary:
# Both failed
return {
"error": f"Both methods failed: {e_primary}, {e_secondary}",
"method": "failed"
}python
def node_with_fallback(state: dict) -> dict:
"""尝试主方法,失败则降级到备用方法。"""
# 尝试主LLM
try:
result = primary_llm(state["prompt"])
return {
"result": result,
"method": "primary"
}
except Exception as e_primary:
# 降级到备用LLM
try:
result = secondary_llm(state["prompt"])
return {
"result": result,
"method": "fallback",
"primary_error": str(e_primary)
}
except Exception as e_secondary:
# 都失败了
return {
"error": f"Both methods failed: {e_primary}, {e_secondary}",
"method": "failed"
}Subgraphs and Composition
子图与组合
Subgraphs as Nodes
子图作为节点
python
def create_subgraph():
"""Create reusable subgraph."""
subgraph = StateGraph(dict)
subgraph.add_node("step1", step1_func)
subgraph.add_node("step2", step2_func)
subgraph.add_edge("step1", "step2")
subgraph.add_edge("step2", END)
subgraph.set_entry_point("step1")
return subgraph.compile()python
def create_subgraph():
"""创建可复用的子图。"""
subgraph = StateGraph(dict)
subgraph.add_node("step1", step1_func)
subgraph.add_node("step2", step2_func)
subgraph.add_edge("step1", "step2")
subgraph.add_edge("step2", END)
subgraph.set_entry_point("step1")
return subgraph.compile()Use subgraph in main graph
在主图中使用子图
def create_main_graph():
workflow = StateGraph(dict)
# Add subgraph as a node
subgraph_compiled = create_subgraph()
workflow.add_node("subgraph", subgraph_compiled)
workflow.add_node("before", before_func)
workflow.add_node("after", after_func)
workflow.add_edge("before", "subgraph")
workflow.add_edge("subgraph", "after")
workflow.add_edge("after", END)
workflow.set_entry_point("before")
return workflow.compile()undefineddef create_main_graph():
workflow = StateGraph(dict)
# 将子图作为节点添加
subgraph_compiled = create_subgraph()
workflow.add_node("subgraph", subgraph_compiled)
workflow.add_node("before", before_func)
workflow.add_node("after", after_func)
workflow.add_edge("before", "subgraph")
workflow.add_edge("subgraph", "after")
workflow.add_edge("after", END)
workflow.set_entry_point("before")
return workflow.compile()undefinedParallel Subgraphs
并行子图
python
from langgraph.graph import StateGraph, END
def create_parallel_workflow():
"""Execute multiple subgraphs in parallel."""
# Create subgraphs
backend_graph = create_backend_subgraph()
frontend_graph = create_frontend_subgraph()
database_graph = create_database_subgraph()
# Main graph
workflow = StateGraph(dict)
# Add subgraphs as parallel nodes
workflow.add_node("backend", backend_graph)
workflow.add_node("frontend", frontend_graph)
workflow.add_node("database", database_graph)
# Merge results
workflow.add_node("merge", merge_results)
# All subgraphs lead to merge
workflow.add_edge("backend", "merge")
workflow.add_edge("frontend", "merge")
workflow.add_edge("database", "merge")
workflow.add_edge("merge", END)
# Set all as entry points (parallel execution)
workflow.set_entry_point("backend")
workflow.set_entry_point("frontend")
workflow.set_entry_point("database")
return workflow.compile()python
from langgraph.graph import StateGraph, END
def create_parallel_workflow():
"""并行执行多个子图。"""
# 创建子图
backend_graph = create_backend_subgraph()
frontend_graph = create_frontend_subgraph()
database_graph = create_database_subgraph()
# 主图
workflow = StateGraph(dict)
# 将子图作为并行节点添加
workflow.add_node("backend", backend_graph)
workflow.add_node("frontend", frontend_graph)
workflow.add_node("database", database_graph)
# 合并结果
workflow.add_node("merge", merge_results)
# 所有子图执行完成后进入合并节点
workflow.add_edge("backend", "merge")
workflow.add_edge("frontend", "merge")
workflow.add_edge("database", "merge")
workflow.add_edge("merge", END)
# 将所有子图设为入口点(并行执行)
workflow.set_entry_point("backend")
workflow.set_entry_point("frontend")
workflow.set_entry_point("database")
return workflow.compile()Map-Reduce Patterns
Map-Reduce 模式
Map-Reduce with LangGraph
基于 LangGraph 的 Map-Reduce
python
from typing import TypedDict, List
class MapReduceState(TypedDict):
documents: List[str]
summaries: List[str]
final_summary: str
def map_node(state: MapReduceState) -> dict:
"""Map: Summarize each document."""
llm = ChatAnthropic(model="claude-sonnet-4")
summaries = []
for doc in state["documents"]:
summary = llm.invoke(f"Summarize: {doc}")
summaries.append(summary.content)
return {"summaries": summaries}
def reduce_node(state: MapReduceState) -> dict:
"""Reduce: Combine summaries into final summary."""
llm = ChatAnthropic(model="claude-sonnet-4")
combined = "\n".join(state["summaries"])
final = llm.invoke(f"Combine these summaries:\n{combined}")
return {"final_summary": final.content}python
from typing import TypedDict, List
class MapReduceState(TypedDict):
documents: List[str]
summaries: List[str]
final_summary: str
def map_node(state: MapReduceState) -> dict:
"""Map:为每个文档生成摘要。"""
llm = ChatAnthropic(model="claude-sonnet-4")
summaries = []
for doc in state["documents"]:
summary = llm.invoke(f"Summarize: {doc}")
summaries.append(summary.content)
return {"summaries": summaries}
def reduce_node(state: MapReduceState) -> dict:
"""Reduce:将摘要合并为最终摘要。"""
llm = ChatAnthropic(model="claude-sonnet-4")
combined = "\n".join(state["summaries"])
final = llm.invoke(f"Combine these summaries:\n{combined}")
return {"final_summary": final.content}Build map-reduce workflow
构建 map-reduce 工作流
workflow = StateGraph(MapReduceState)
workflow.add_node("map", map_node)
workflow.add_node("reduce", reduce_node)
workflow.add_edge("map", "reduce")
workflow.add_edge("reduce", END)
workflow.set_entry_point("map")
app = workflow.compile()
workflow = StateGraph(MapReduceState)
workflow.add_node("map", map_node)
workflow.add_node("reduce", reduce_node)
workflow.add_edge("map", "reduce")
workflow.add_edge("reduce", END)
workflow.set_entry_point("map")
app = workflow.compile()
Execute
执行
result = app.invoke({
"documents": ["Doc 1...", "Doc 2...", "Doc 3..."],
"summaries": [],
"final_summary": ""
})
undefinedresult = app.invoke({
"documents": ["Doc 1...", "Doc 2...", "Doc 3..."],
"summaries": [],
"final_summary": ""
})
undefinedParallel Map with Batching
带分批的并行 Map
python
import asyncio
from typing import List
async def parallel_map_node(state: dict) -> dict:
"""Map: Process documents in parallel."""
llm = ChatAnthropic(model="claude-sonnet-4")
async def summarize(doc: str) -> str:
response = await llm.ainvoke(f"Summarize: {doc}")
return response.content
# Process all documents in parallel
summaries = await asyncio.gather(
*[summarize(doc) for doc in state["documents"]]
)
return {"summaries": list(summaries)}python
import asyncio
from typing import List
async def parallel_map_node(state: dict) -> dict:
"""Map:并行处理文档。"""
llm = ChatAnthropic(model="claude-sonnet-4")
async def summarize(doc: str) -> str:
response = await llm.ainvoke(f"Summarize: {doc}")
return response.content
# 并行处理所有文档
summaries = await asyncio.gather(
*[summarize(doc) for doc in state["documents"]]
)
return {"summaries": list(summaries)}Use async workflow
使用异步工作流
app = workflow.compile()
app = workflow.compile()
Execute with asyncio
用 asyncio 执行
result = asyncio.run(app.ainvoke({
"documents": ["Doc 1", "Doc 2", "Doc 3"],
"summaries": [],
"final_summary": ""
}))
---result = asyncio.run(app.ainvoke({
"documents": ["Doc 1", "Doc 2", "Doc 3"],
"summaries": [],
"final_summary": ""
}))
---Production Deployment
生产部署
LangGraph Cloud
LangGraph Cloud
python
undefinedpython
undefinedDeploy to LangGraph Cloud (managed service)
部署到 LangGraph Cloud(托管服务)
1. Install LangGraph CLI
1. 安装 LangGraph CLI
pip install langgraph-cli
pip install langgraph-cli
2. Create langgraph.json config
2. 创建 langgraph.json 配置
langgraph_config = {
"dependencies": ["langchain", "langchain-anthropic"],
"graphs": {
"agent": "./graph.py:app"
},
"env": {
"ANTHROPIC_API_KEY": "$ANTHROPIC_API_KEY"
}
}
langgraph_config = {
"dependencies": ["langchain", "langchain-anthropic"],
"graphs": {
"agent": "./graph.py:app"
},
"env": {
"ANTHROPIC_API_KEY": "$ANTHROPIC_API_KEY"
}
}
3. Deploy
3. 部署
langgraph deploy
langgraph deploy
4. Use deployed graph via API
4. 通过 API 使用部署好的图
import requests
response = requests.post(
"https://your-deployment.langraph.cloud/invoke",
json={
"input": {"messages": ["Hello"]},
"config": {"thread_id": "user_123"}
},
headers={"Authorization": "Bearer YOUR_API_KEY"}
)
result = response.json()
undefinedimport requests
response = requests.post(
"https://your-deployment.langraph.cloud/invoke",
json={
"input": {"messages": ["Hello"]},
"config": {"thread_id": "user_123"}
},
headers={"Authorization": "Bearer YOUR_API_KEY"}
)
result = response.json()
undefinedSelf-Hosted with FastAPI
基于 FastAPI 自托管
python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from langgraph.checkpoint.postgres import PostgresSaver
import uvicornpython
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from langgraph.checkpoint.postgres import PostgresSaver
import uvicornCreate FastAPI app
创建 FastAPI 应用
app_api = FastAPI()
app_api = FastAPI()
Initialize workflow
初始化工作流
checkpointer = PostgresSaver.from_conn_string(
"postgresql://user:pass@localhost/langgraph"
)
workflow_app = workflow.compile(checkpointer=checkpointer)
class WorkflowRequest(BaseModel):
input: dict
thread_id: str
class WorkflowResponse(BaseModel):
output: dict
thread_id: str
@app_api.post("/invoke", response_model=WorkflowResponse)
async def invoke_workflow(request: WorkflowRequest):
"""Invoke workflow synchronously."""
try:
config = {"configurable": {"thread_id": request.thread_id}}
result = workflow_app.invoke(request.input, config)
return WorkflowResponse(
output=result,
thread_id=request.thread_id
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))@app_api.post("/stream")
async def stream_workflow(request: WorkflowRequest):
"""Stream workflow execution."""
from fastapi.responses import StreamingResponse
config = {"configurable": {"thread_id": request.thread_id}}
async def generate():
async for event in workflow_app.astream(request.input, config):
yield f"data: {json.dumps(event)}\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")checkpointer = PostgresSaver.from_conn_string(
"postgresql://user:pass@localhost/langgraph"
)
workflow_app = workflow.compile(checkpointer=checkpointer)
class WorkflowRequest(BaseModel):
input: dict
thread_id: str
class WorkflowResponse(BaseModel):
output: dict
thread_id: str
@app_api.post("/invoke", response_model=WorkflowResponse)
async def invoke_workflow(request: WorkflowRequest):
"""同步调用工作流。"""
try:
config = {"configurable": {"thread_id": request.thread_id}}
result = workflow_app.invoke(request.input, config)
return WorkflowResponse(
output=result,
thread_id=request.thread_id
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))@app_api.post("/stream")
async def stream_workflow(request: WorkflowRequest):
"""流式输出工作流执行过程。"""
from fastapi.responses import StreamingResponse
config = {"configurable": {"thread_id": request.thread_id}}
async def generate():
async for event in workflow_app.astream(request.input, config):
yield f"data: {json.dumps(event)}\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")Run server
运行服务
if name == "main":
uvicorn.run(app_api, host="0.0.0.0", port=8000)
undefinedif name == "main":
uvicorn.run(app_api, host="0.0.0.0", port=8000)
undefinedDocker Deployment
Docker 部署
dockerfile
undefineddockerfile
undefinedDockerfile
Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["uvicorn", "main:app_api", "--host", "0.0.0.0", "--port", "8000"]
```yamlFROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["uvicorn", "main:app_api", "--host", "0.0.0.0", "--port", "8000"]
```yamldocker-compose.yml
docker-compose.yml
version: '3.8'
services:
app:
build: .
ports:
- "8000:8000"
environment:
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- POSTGRES_URL=postgresql://user:pass@db:5432/langgraph
depends_on:
- db
- redis
db:
image: postgres:15
environment:
- POSTGRES_USER=user
- POSTGRES_PASSWORD=pass
- POSTGRES_DB=langgraph
volumes:
- postgres_data:/var/lib/postgresql/data
redis:
image: redis:7
ports:
- "6379:6379"
volumes:
postgres_data:
---version: '3.8'
services:
app:
build: .
ports:
- "8000:8000"
environment:
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- POSTGRES_URL=postgresql://user:pass@db:5432/langgraph
depends_on:
- db
- redis
db:
image: postgres:15
environment:
- POSTGRES_USER=user
- POSTGRES_PASSWORD=pass
- POSTGRES_DB=langgraph
volumes:
- postgres_data:/var/lib/postgresql/data
redis:
image: redis:7
ports:
- "6379:6379"
volumes:
postgres_data:
---LangSmith Integration
LangSmith 集成
Automatic Tracing
自动链路追踪
python
import os
from langchain_anthropic import ChatAnthropicpython
import os
from langchain_anthropic import ChatAnthropicEnable LangSmith tracing
启用 LangSmith 追踪
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-key"
os.environ["LANGCHAIN_PROJECT"] = "my-langgraph-project"
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-key"
os.environ["LANGCHAIN_PROJECT"] = "my-langgraph-project"
All LangGraph executions automatically traced
所有 LangGraph 执行会自动被追踪
app = workflow.compile()
result = app.invoke(initial_state)
app = workflow.compile()
result = app.invoke(initial_state)
View trace in LangSmith UI: https://smith.langchain.com
在 LangSmith UI 中查看链路:https://smith.langchain.com
undefinedundefinedCustom Metadata
自定义元数据
python
from langsmith import traceable
@traceable(
run_type="chain",
name="research_agent",
metadata={"version": "1.0.0"}
)
def research_agent(state: dict) -> dict:
"""Node with custom LangSmith metadata."""
# Function execution automatically traced
return {"research": "results"}
workflow.add_node("researcher", research_agent)python
from langsmith import traceable
@traceable(
run_type="chain",
name="research_agent",
metadata={"version": "1.0.0"}
)
def research_agent(state: dict) -> dict:
"""带自定义 LangSmith 元数据的节点。"""
# 函数执行会自动被追踪
return {"research": "results"}
workflow.add_node("researcher", research_agent)Evaluation with LangSmith
基于 LangSmith 的评估
python
from langsmith import Client
from langsmith.evaluation import evaluate
client = Client()python
from langsmith import Client
from langsmith.evaluation import evaluate
client = Client()Create test dataset
创建测试数据集
examples = [
{
"inputs": {"topic": "AI Safety"},
"outputs": {"quality_score": 0.9}
}
]
dataset = client.create_dataset("langgraph_tests")
for example in examples:
client.create_example(
dataset_id=dataset.id,
inputs=example["inputs"],
outputs=example["outputs"]
)
examples = [
{
"inputs": {"topic": "AI Safety"},
"outputs": {"quality_score": 0.9}
}
]
dataset = client.create_dataset("langgraph_tests")
for example in examples:
client.create_example(
dataset_id=dataset.id,
inputs=example["inputs"],
outputs=example["outputs"]
)
Evaluate workflow
评估工作流
def workflow_wrapper(inputs: dict) -> dict:
result = app.invoke(inputs)
return result
def quality_evaluator(run, example):
"""Custom evaluator."""
score = calculate_quality(run.outputs)
return {"key": "quality", "score": score}
results = evaluate(
workflow_wrapper,
data="langgraph_tests",
evaluators=[quality_evaluator],
experiment_prefix="langgraph_v1"
)
print(f"Average quality: {results['results']['quality']:.2f}")
---def workflow_wrapper(inputs: dict) -> dict:
result = app.invoke(inputs)
return result
def quality_evaluator(run, example):
"""自定义评估器。"""
score = calculate_quality(run.outputs)
return {"key": "quality", "score": score}
results = evaluate(
workflow_wrapper,
data="langgraph_tests",
evaluators=[quality_evaluator],
experiment_prefix="langgraph_v1"
)
print(f"Average quality: {results['results']['quality']:.2f}")
---Real-World Examples
实际案例
Customer Support Agent
客户支持Agent
python
from typing import TypedDict, Literal
from langchain_anthropic import ChatAnthropic
class SupportState(TypedDict):
customer_query: str
category: Literal["billing", "technical", "general"]
priority: Literal["low", "medium", "high"]
resolution: str
escalated: bool
def categorize(state: SupportState) -> dict:
"""Categorize customer query."""
llm = ChatAnthropic(model="claude-sonnet-4")
prompt = f"""
Categorize this support query:
"{state['customer_query']}"
Categories: billing, technical, general
Priority: low, medium, high
Return format: category|priority
"""
response = llm.invoke(prompt)
category, priority = response.content.strip().split("|")
return {
"category": category,
"priority": priority
}
def handle_billing(state: SupportState) -> dict:
"""Handle billing issues."""
llm = ChatAnthropic(model="claude-sonnet-4")
response = llm.invoke(f"Resolve billing issue: {state['customer_query']}")
return {"resolution": response.content}
def handle_technical(state: SupportState) -> dict:
"""Handle technical issues."""
llm = ChatAnthropic(model="claude-sonnet-4")
response = llm.invoke(f"Resolve technical issue: {state['customer_query']}")
return {"resolution": response.content}
def escalate(state: SupportState) -> dict:
"""Escalate to human agent."""
return {
"escalated": True,
"resolution": "Escalated to senior support team"
}
def route_by_category(state: SupportState) -> str:
"""Route based on category and priority."""
if state["priority"] == "high":
return "escalate"
category = state["category"]
if category == "billing":
return "billing"
elif category == "technical":
return "technical"
else:
return "general"python
from typing import TypedDict, Literal
from langchain_anthropic import ChatAnthropic
class SupportState(TypedDict):
customer_query: str
category: Literal["billing", "technical", "general"]
priority: Literal["low", "medium", "high"]
resolution: str
escalated: bool
def categorize(state: SupportState) -> dict:
"""对客户查询进行分类。"""
llm = ChatAnthropic(model="claude-sonnet-4")
prompt = f"""
Categorize this support query:
"{state['customer_query']}"
Categories: billing, technical, general
Priority: low, medium, high
Return format: category|priority
"""
response = llm.invoke(prompt)
category, priority = response.content.strip().split("|")
return {
"category": category,
"priority": priority
}
def handle_billing(state: SupportState) -> dict:
"""处理账单问题。"""
llm = ChatAnthropic(model="claude-sonnet-4")
response = llm.invoke(f"Resolve billing issue: {state['customer_query']}")
return {"resolution": response.content}
def handle_technical(state: SupportState) -> dict:
"""处理技术问题。"""
llm = ChatAnthropic(model="claude-sonnet-4")
response = llm.invoke(f"Resolve technical issue: {state['customer_query']}")
return {"resolution": response.content}
def escalate(state: SupportState) -> dict:
"""升级到人工Agent。"""
return {
"escalated": True,
"resolution": "Escalated to senior support team"
}
def route_by_category(state: SupportState) -> str:
"""基于分类和优先级路由。"""
if state["priority"] == "high":
return "escalate"
category = state["category"]
if category == "billing":
return "billing"
elif category == "technical":
return "technical"
else:
return "general"Build support workflow
构建支持工作流
workflow = StateGraph(SupportState)
workflow.add_node("categorize", categorize)
workflow.add_node("billing", handle_billing)
workflow.add_node("technical", handle_technical)
workflow.add_node("general", handle_technical) # Reuse
workflow.add_node("escalate", escalate)
workflow.add_edge("categorize", "router")
workflow.add_conditional_edges(
"categorize",
route_by_category,
{
"billing": "billing",
"technical": "technical",
"general": "general",
"escalate": "escalate"
}
)
workflow.add_edge("billing", END)
workflow.add_edge("technical", END)
workflow.add_edge("general", END)
workflow.add_edge("escalate", END)
workflow.set_entry_point("categorize")
support_app = workflow.compile()
workflow = StateGraph(SupportState)
workflow.add_node("categorize", categorize)
workflow.add_node("billing", handle_billing)
workflow.add_node("technical", handle_technical)
workflow.add_node("general", handle_technical) # 复用
workflow.add_node("escalate", escalate)
workflow.add_edge("categorize", "router")
workflow.add_conditional_edges(
"categorize",
route_by_category,
{
"billing": "billing",
"technical": "technical",
"general": "general",
"escalate": "escalate"
}
)
workflow.add_edge("billing", END)
workflow.add_edge("technical", END)
workflow.add_edge("general", END)
workflow.add_edge("escalate", END)
workflow.set_entry_point("categorize")
support_app = workflow.compile()
Usage
使用
result = support_app.invoke({
"customer_query": "I was charged twice for my subscription",
"category": "",
"priority": "",
"resolution": "",
"escalated": False
})
undefinedresult = support_app.invoke({
"customer_query": "I was charged twice for my subscription",
"category": "",
"priority": "",
"resolution": "",
"escalated": False
})
undefinedResearch Assistant
调研助手
python
class ResearchState(TypedDict):
topic: str
research_notes: list
outline: str
draft: str
final_article: str
revision_count: int
def research(state: ResearchState) -> dict:
"""Gather information on topic."""
# Use search tools
results = search_web(state["topic"])
return {
"research_notes": [f"Research: {results}"]
}
def create_outline(state: ResearchState) -> dict:
"""Create article outline."""
llm = ChatAnthropic(model="claude-sonnet-4")
notes = "\n".join(state["research_notes"])
response = llm.invoke(f"Create outline for: {notes}")
return {"outline": response.content}
def write_draft(state: ResearchState) -> dict:
"""Write article draft."""
llm = ChatAnthropic(model="claude-sonnet-4")
prompt = f"""
Topic: {state['topic']}
Outline: {state['outline']}
Research: {state['research_notes']}
Write comprehensive article.
"""
response = llm.invoke(prompt)
return {"draft": response.content}
def review_and_revise(state: ResearchState) -> dict:
"""Review draft quality."""
llm = ChatAnthropic(model="claude-sonnet-4")
response = llm.invoke(f"Review and improve:\n{state['draft']}")
return {
"final_article": response.content,
"revision_count": state.get("revision_count", 0) + 1
}python
class ResearchState(TypedDict):
topic: str
research_notes: list
outline: str
draft: str
final_article: str
revision_count: int
def research(state: ResearchState) -> dict:
"""收集主题相关信息。"""
# 使用搜索工具
results = search_web(state["topic"])
return {
"research_notes": [f"Research: {results}"]
}
def create_outline(state: ResearchState) -> dict:
"""生成文章大纲。"""
llm = ChatAnthropic(model="claude-sonnet-4")
notes = "\n".join(state["research_notes"])
response = llm.invoke(f"Create outline for: {notes}")
return {"outline": response.content}
def write_draft(state: ResearchState) -> dict:
"""撰写文章草稿。"""
llm = ChatAnthropic(model="claude-sonnet-4")
prompt = f"""
Topic: {state['topic']}
Outline: {state['outline']}
Research: {state['research_notes']}
Write comprehensive article.
"""
response = llm.invoke(prompt)
return {"draft": response.content}
def review_and_revise(state: ResearchState) -> dict:
"""审核草稿质量。"""
llm = ChatAnthropic(model="claude-sonnet-4")
response = llm.invoke(f"Review and improve:\n{state['draft']}")
return {
"final_article": response.content,
"revision_count": state.get("revision_count", 0) + 1
}Build research workflow
构建调研工作流
workflow = StateGraph(ResearchState)
workflow.add_node("research", research)
workflow.add_node("outline", create_outline)
workflow.add_node("write", write_draft)
workflow.add_node("review", review_and_revise)
workflow.add_edge("research", "outline")
workflow.add_edge("outline", "write")
workflow.add_edge("write", "review")
def check_quality(state: ResearchState) -> str:
if state["revision_count"] >= 2:
return END
# Simple quality check
if len(state.get("final_article", "")) > 1000:
return END
else:
return "write" # Reviseworkflow.add_conditional_edges(
"review",
check_quality,
{END: END, "write": "write"}
)
workflow.set_entry_point("research")
research_app = workflow.compile()
undefinedworkflow = StateGraph(ResearchState)
workflow.add_node("research", research)
workflow.add_node("outline", create_outline)
workflow.add_node("write", write_draft)
workflow.add_node("review", review_and_revise)
workflow.add_edge("research", "outline")
workflow.add_edge("outline", "write")
workflow.add_edge("write", "review")
def check_quality(state: ResearchState) -> str:
if state["revision_count"] >= 2:
return END
# 简单质量检查
if len(state.get("final_article", "")) > 1000:
return END
else:
return "write" # 修订workflow.add_conditional_edges(
"review",
check_quality,
{END: END, "write": "write"}
)
workflow.set_entry_point("research")
research_app = workflow.compile()
undefinedCode Review Workflow
代码评审工作流
python
class CodeReviewState(TypedDict):
code: str
language: str
lint_results: list
test_results: dict
security_scan: dict
review_comments: list
approved: bool
def lint_code(state: CodeReviewState) -> dict:
"""Run linters on code."""
# Run appropriate linter
issues = run_linter(state["code"], state["language"])
return {"lint_results": issues}
def run_tests(state: CodeReviewState) -> dict:
"""Execute test suite."""
results = execute_tests(state["code"])
return {"test_results": results}
def security_scan(state: CodeReviewState) -> dict:
"""Scan for security vulnerabilities."""
scan_results = scan_for_vulnerabilities(state["code"])
return {"security_scan": scan_results}
def ai_review(state: CodeReviewState) -> dict:
"""AI-powered code review."""
llm = ChatAnthropic(model="claude-sonnet-4")
prompt = f"""
Review this {state['language']} code:
{state['code']}
Lint issues: {state['lint_results']}
Test results: {state['test_results']}
Security: {state['security_scan']}
Provide review comments.
"""
response = llm.invoke(prompt)
return {"review_comments": [response.content]}
def approve_or_reject(state: CodeReviewState) -> dict:
"""Final approval decision."""
# Auto-approve if all checks pass
lint_ok = len(state["lint_results"]) == 0
tests_ok = state["test_results"].get("passed", 0) == state["test_results"].get("total", 0)
security_ok = len(state["security_scan"].get("vulnerabilities", [])) == 0
approved = lint_ok and tests_ok and security_ok
return {"approved": approved}python
class CodeReviewState(TypedDict):
code: str
language: str
lint_results: list
test_results: dict
security_scan: dict
review_comments: list
approved: bool
def lint_code(state: CodeReviewState) -> dict:
"""对代码运行静态检查。"""
# 运行对应的 linter
issues = run_linter(state["code"], state["language"])
return {"lint_results": issues}
def run_tests(state: CodeReviewState) -> dict:
"""执行测试套件。"""
results = execute_tests(state["code"])
return {"test_results": results}
def security_scan(state: CodeReviewState) -> dict:
"""扫描安全漏洞。"""
scan_results = scan_for_vulnerabilities(state["code"])
return {"security_scan": scan_results}
def ai_review(state: CodeReviewState) -> dict:
"""AI驱动的代码评审。"""
llm = ChatAnthropic(model="claude-sonnet-4")
prompt = f"""
Review this {state['language']} code:
{state['code']}
Lint issues: {state['lint_results']}
Test results: {state['test_results']}
Security: {state['security_scan']}
Provide review comments.
"""
response = llm.invoke(prompt)
return {"review_comments": [response.content]}
def approve_or_reject(state: CodeReviewState) -> dict:
"""最终审批决策。"""
# 如果所有检查通过则自动审批
lint_ok = len(state["lint_results"]) == 0
tests_ok = state["test_results"].get("passed", 0) == state["test_results"].get("total", 0)
security_ok = len(state["security_scan"].get("vulnerabilities", [])) == 0
approved = lint_ok and tests_ok and security_ok
return {"approved": approved}Build code review workflow
构建代码评审工作流
workflow = StateGraph(CodeReviewState)
workflow.add_node("lint", lint_code)
workflow.add_node("test", run_tests)
workflow.add_node("security", security_scan)
workflow.add_node("ai_review", ai_review)
workflow.add_node("decision", approve_or_reject)
workflow = StateGraph(CodeReviewState)
workflow.add_node("lint", lint_code)
workflow.add_node("test", run_tests)
workflow.add_node("security", security_scan)
workflow.add_node("ai_review", ai_review)
workflow.add_node("decision", approve_or_reject)
Parallel execution of checks
检查并行执行
workflow.add_edge("lint", "ai_review")
workflow.add_edge("test", "ai_review")
workflow.add_edge("security", "ai_review")
workflow.add_edge("ai_review", "decision")
workflow.add_edge("decision", END)
workflow.set_entry_point("lint")
workflow.set_entry_point("test")
workflow.set_entry_point("security")
code_review_app = workflow.compile()
---workflow.add_edge("lint", "ai_review")
workflow.add_edge("test", "ai_review")
workflow.add_edge("security", "ai_review")
workflow.add_edge("ai_review", "decision")
workflow.add_edge("decision", END)
workflow.set_entry_point("lint")
workflow.set_entry_point("test")
workflow.set_entry_point("security")
code_review_app = workflow.compile()
---Performance Optimization
性能优化
Parallel Execution
并行执行
python
undefinedpython
undefinedMultiple nodes with same parent execute in parallel
同一个父节点的多个子节点会并行执行
workflow.add_edge("start", "task_a")
workflow.add_edge("start", "task_b")
workflow.add_edge("start", "task_c")
workflow.add_edge("start", "task_a")
workflow.add_edge("start", "task_b")
workflow.add_edge("start", "task_c")
task_a, task_b, task_c run concurrently
task_a、task_b、task_c 并发运行
undefinedundefinedCaching with Checkpointers
基于检查点的缓存
python
from langgraph.checkpoint.sqlite import SqliteSaverpython
from langgraph.checkpoint.sqlite import SqliteSaverCheckpoint results for reuse
检查点结果可复用
checkpointer = SqliteSaver.from_conn_string("cache.db")
app = workflow.compile(checkpointer=checkpointer)
checkpointer = SqliteSaver.from_conn_string("cache.db")
app = workflow.compile(checkpointer=checkpointer)
Same thread_id reuses cached results
相同的 thread_id 会复用缓存结果
config = {"configurable": {"thread_id": "cached_session"}}
config = {"configurable": {"thread_id": "cached_session"}}
First run: executes all nodes
第一次运行:执行所有节点
result1 = app.invoke(input_data, config)
result1 = app.invoke(input_data, config)
Second run: uses cached state
第二次运行:使用缓存的状态
result2 = app.invoke(None, config) # Instant
undefinedresult2 = app.invoke(None, config) # 瞬间完成
undefinedBatching
分批处理
python
def batch_processing_node(state: dict) -> dict:
"""Process items in batches."""
items = state["items"]
batch_size = 10
results = []
for i in range(0, len(items), batch_size):
batch = items[i:i+batch_size]
batch_result = process_batch(batch)
results.extend(batch_result)
return {"results": results}python
def batch_processing_node(state: dict) -> dict:
"""分批处理条目。"""
items = state["items"]
batch_size = 10
results = []
for i in range(0, len(items), batch_size):
batch = items[i:i+batch_size]
batch_result = process_batch(batch)
results.extend(batch_result)
return {"results": results}Async Execution
异步执行
python
from langgraph.graph import StateGraph
import asyncio
async def async_node(state: dict) -> dict:
"""Async node for I/O-bound operations."""
results = await asyncio.gather(
fetch_api_1(state["input"]),
fetch_api_2(state["input"]),
fetch_api_3(state["input"])
)
return {"results": results}python
from langgraph.graph import StateGraph
import asyncio
async def async_node(state: dict) -> dict:
"""适用于IO密集型操作的异步节点。"""
results = await asyncio.gather(
fetch_api_1(state["input"]),
fetch_api_2(state["input"]),
fetch_api_3(state["input"])
)
return {"results": results}Use async invoke
使用异步调用
result = await app.ainvoke(input_data)
---result = await app.ainvoke(input_data)
---Comparison with Simple Chains
与简单链的对比
LangChain LCEL (Simple Chain)
LangChain LCEL(简单链)
python
from langchain_core.runnables import RunnablePassthrough
from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-sonnet-4")python
from langchain_core.runnables import RunnablePassthrough
from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-sonnet-4")Simple linear chain
简单线性链
chain = (
RunnablePassthrough()
| llm
| {"output": lambda x: x.content}
)
result = chain.invoke("Hello")
**Limitations**:
- ❌ No state persistence
- ❌ No conditional branching
- ❌ No cycles/loops
- ❌ No human-in-the-loop
- ❌ Limited debuggingchain = (
RunnablePassthrough()
| llm
| {"output": lambda x: x.content}
)
result = chain.invoke("Hello")
**局限性**:
- ❌ 无状态持久化
- ❌ 无条件分支
- ❌ 无循环/回路
- ❌ 无人在回路支持
- ❌ 调试能力有限LangGraph (Cyclic Workflow)
LangGraph(循环工作流)
python
from langgraph.graph import StateGraph, END
workflow = StateGraph(dict)
workflow.add_node("step1", step1_func)
workflow.add_node("step2", step2_func)python
from langgraph.graph import StateGraph, END
workflow = StateGraph(dict)
workflow.add_node("step1", step1_func)
workflow.add_node("step2", step2_func)Conditional loop
条件循环
workflow.add_conditional_edges(
"step2",
should_continue,
{"step1": "step1", END: END}
)
app = workflow.compile(checkpointer=memory)
**Advantages**:
- ✅ Persistent state across steps
- ✅ Conditional branching
- ✅ Cycles and loops
- ✅ Human-in-the-loop support
- ✅ Time-travel debugging
- ✅ Production-ready
---workflow.add_conditional_edges(
"step2",
should_continue,
{"step1": "step1", END: END}
)
app = workflow.compile(checkpointer=memory)
**优势**:
- ✅ 跨步骤持久化状态
- ✅ 条件分支
- ✅ 循环和回路
- ✅ 人在回路支持
- ✅ 时间旅行调试
- ✅ 生产就绪
---Migration from LangChain LCEL
从 LangChain LCEL 迁移
Before: LCEL Chain
迁移前:LCEL 链
python
from langchain_core.runnables import RunnablePassthrough
from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-sonnet-4")
chain = (
{"input": RunnablePassthrough()}
| llm
| {"output": lambda x: x.content}
)
result = chain.invoke("Question")python
from langchain_core.runnables import RunnablePassthrough
from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-sonnet-4")
chain = (
{"input": RunnablePassthrough()}
| llm
| {"output": lambda x: x.content}
)
result = chain.invoke("Question")After: LangGraph
迁移后:LangGraph
python
from langgraph.graph import StateGraph, END
from typing import TypedDict
class State(TypedDict):
input: str
output: str
def llm_node(state: State) -> dict:
llm = ChatAnthropic(model="claude-sonnet-4")
response = llm.invoke(state["input"])
return {"output": response.content}
workflow = StateGraph(State)
workflow.add_node("llm", llm_node)
workflow.add_edge("llm", END)
workflow.set_entry_point("llm")
app = workflow.compile()
result = app.invoke({"input": "Question", "output": ""})python
from langgraph.graph import StateGraph, END
from typing import TypedDict
class State(TypedDict):
input: str
output: str
def llm_node(state: State) -> dict:
llm = ChatAnthropic(model="claude-sonnet-4")
response = llm.invoke(state["input"])
return {"output": response.content}
workflow = StateGraph(State)
workflow.add_node("llm", llm_node)
workflow.add_edge("llm", END)
workflow.set_entry_point("llm")
app = workflow.compile()
result = app.invoke({"input": "Question", "output": ""})Migration Checklist
迁移检查清单
- Identify stateful vs stateless operations
- Convert chain steps to graph nodes
- Define state schema (TypedDict)
- Add edges between nodes
- Implement conditional routing if needed
- Add checkpointing for state persistence
- Test with same inputs
- Verify output matches original chain
- 识别有状态和无状态操作
- 将链步骤转换为图节点
- 定义状态 schema(TypedDict)
- 添加节点之间的边
- 若需要则实现条件路由
- 添加检查点以支持状态持久化
- 使用相同输入测试
- 验证输出与原链一致
Best Practices
最佳实践
State Design
状态设计
python
undefinedpython
undefined✅ GOOD: Flat, explicit state
✅ 好的设计:扁平化、明确的状态
class GoodState(TypedDict):
user_id: str
messages: list
current_step: str
result: dict
class GoodState(TypedDict):
user_id: str
messages: list
current_step: str
result: dict
❌ BAD: Nested, ambiguous state
❌ 坏的设计:嵌套、模糊的状态
class BadState(TypedDict):
data: dict # Too generic
context: Any # No type safety
undefinedclass BadState(TypedDict):
data: dict # 过于通用
context: Any # 无类型安全
undefinedNode Granularity
节点粒度
python
undefinedpython
undefined✅ GOOD: Single responsibility per node
✅ 好的设计:每个节点单一职责
def validate_input(state): ...
def process_data(state): ...
def format_output(state): ...
def validate_input(state): ...
def process_data(state): ...
def format_output(state): ...
❌ BAD: Monolithic node
❌ 坏的设计:单体节点
def do_everything(state): ...
undefineddef do_everything(state): ...
undefinedError Handling
错误处理
python
undefinedpython
undefined✅ GOOD: Explicit error states
✅ 好的设计:明确的错误状态
class State(TypedDict):
result: dict | None
error: str | None
status: Literal["pending", "success", "failed"]
def robust_node(state):
try:
result = operation()
return {"result": result, "status": "success"}
except Exception as e:
return {"error": str(e), "status": "failed"}
class State(TypedDict):
result: dict | None
error: str | None
status: Literal["pending", "success", "failed"]
def robust_node(state):
try:
result = operation()
return {"result": result, "status": "success"}
except Exception as e:
return {"error": str(e), "status": "failed"}
❌ BAD: Silent failures
❌ 坏的设计:静默失败
def fragile_node(state):
try:
return operation()
except:
return {} # Error lost
undefineddef fragile_node(state):
try:
return operation()
except:
return {} # 错误丢失
undefinedTesting
测试
python
undefinedpython
undefinedTest individual nodes
测试单个节点
def test_node():
state = {"input": "test"}
result = my_node(state)
assert result["output"] == "expected"
def test_node():
state = {"input": "test"}
result = my_node(state)
assert result["output"] == "expected"
Test full workflow
测试完整工作流
def test_workflow():
app = workflow.compile()
result = app.invoke(initial_state)
assert result["final_output"] == "expected"
def test_workflow():
app = workflow.compile()
result = app.invoke(initial_state)
assert result["final_output"] == "expected"
Test error paths
测试错误路径
def test_error_handling():
state = {"input": "invalid"}
result = my_node(state)
assert result["error"] is not None
undefineddef test_error_handling():
state = {"input": "invalid"}
result = my_node(state)
assert result["error"] is not None
undefinedMonitoring
监控
python
import logging
logger = logging.getLogger(__name__)
def monitored_node(state: dict) -> dict:
"""Node with logging."""
logger.info(f"Executing node with state: {state.keys()}")
try:
result = operation()
logger.info("Node succeeded")
return {"result": result}
except Exception as e:
logger.error(f"Node failed: {e}")
raisepython
import logging
logger = logging.getLogger(__name__)
def monitored_node(state: dict) -> dict:
"""带日志的节点。"""
logger.info(f"Executing node with state: {state.keys()}")
try:
result = operation()
logger.info("Node succeeded")
return {"result": result}
except Exception as e:
logger.error(f"Node failed: {e}")
raiseSummary
总结
LangGraph transforms agent development from linear chains into cyclic, stateful workflows with production-grade capabilities:
Core Strengths:
- 🔄 Cyclic workflows with loops and branches
- 💾 Persistent state across sessions
- 🕐 Time-travel debugging and replay
- 👤 Human-in-the-loop approval gates
- 🔧 Tool integration (LangChain, Anthropic, custom)
- 📊 LangSmith integration for tracing
- 🏭 Production deployment (cloud or self-hosted)
When to Choose LangGraph:
- Multi-agent coordination required
- Complex branching logic
- State persistence needed
- Human approvals in workflow
- Production systems with debugging needs
Learning Path:
- Start with StateGraph basics
- Add conditional routing
- Implement checkpointing
- Build multi-agent patterns
- Deploy to production
Production Checklist:
- State schema with TypedDict
- Error handling in all nodes
- Checkpointing configured
- LangSmith tracing enabled
- Monitoring and logging
- Testing (unit + integration)
- Deployment strategy (cloud/self-hosted)
For simple request-response patterns, use LangChain LCEL. For complex stateful workflows, LangGraph is the industry-standard solution.
LangGraph将Agent开发从线性链转变为具备生产级能力的循环有状态工作流:
核心优势:
- 🔄 带循环和分支的循环工作流
- 💾 跨会话持久化状态
- 🕐 时间旅行调试和回放
- 👤 人在回路审批关口
- 🔧 工具集成(LangChain、Anthropic、自定义)
- 📊 LangSmith集成支持链路追踪
- 🏭 生产部署(云或自托管)
选择LangGraph的场景:
- 需要多Agent协调
- 复杂分支逻辑
- 需要状态持久化
- 工作流中需要人工审批
- 需要调试能力的生产系统
学习路径:
- 从StateGraph基础开始
- 添加条件路由
- 实现检查点
- 构建多Agent模式
- 部署到生产
生产检查清单:
- 基于TypedDict的状态schema
- 所有节点都有错误处理
- 配置了检查点
- 启用了LangSmith追踪
- 监控和日志
- 测试(单元+集成)
- 部署策略(云/自托管)
对于简单的请求-响应模式,使用LangChain LCEL即可。对于复杂的有状态工作流,LangGraph是行业标准解决方案。
Related Skills
相关技能
When using Langgraph, these skills enhance your workflow:
- dspy: DSPy for LLM prompt optimization (complementary to LangGraph orchestration)
- test-driven-development: Testing LangGraph workflows and state machines
- systematic-debugging: Debugging multi-agent workflows and state transitions
[Full documentation available in these skills if deployed in your bundle]
使用Langgraph时,以下技能可以提升你的工作流能力:
- dspy:用于LLM提示优化的DSPy(是LangGraph编排的补充)
- test-driven-development:测试LangGraph工作流和状态机
- systematic-debugging:调试多Agent工作流和状态流转
[如果你的技能包中部署了这些技能,可以查看完整文档]