langchain-architecture
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseLangChain & LangGraph Architecture
LangChain & LangGraph 架构设计
Master modern LangChain 1.x and LangGraph for building sophisticated LLM applications with agents, state management, memory, and tool integration.
掌握现代LangChain 1.x与LangGraph技术,构建具备智能体、状态管理、记忆功能及工具集成的复杂LLM应用。
When to Use This Skill
技能适用场景
- Building autonomous AI agents with tool access
- Implementing complex multi-step LLM workflows
- Managing conversation memory and state
- Integrating LLMs with external data sources and APIs
- Creating modular, reusable LLM application components
- Implementing document processing pipelines
- Building production-grade LLM applications
- 构建具备工具调用能力的自主AI Agent
- 实现复杂的多步骤LLM工作流
- 管理对话记忆与状态
- 将LLM与外部数据源及API集成
- 创建模块化、可复用的LLM应用组件
- 实现文档处理流水线
- 构建生产级LLM应用
Package Structure (LangChain 1.x)
包结构(LangChain 1.x)
langchain (1.2.x) # High-level orchestration
langchain-core (1.2.x) # Core abstractions (messages, prompts, tools)
langchain-community # Third-party integrations
langgraph # Agent orchestration and state management
langchain-openai # OpenAI integrations
langchain-anthropic # Anthropic/Claude integrations
langchain-voyageai # Voyage AI embeddings
langchain-pinecone # Pinecone vector storelangchain (1.2.x) # 高层编排框架
langchain-core (1.2.x) # 核心抽象(消息、提示词、工具)
langchain-community # 第三方集成
langgraph # 智能体编排与状态管理
langchain-openai # OpenAI集成
langchain-anthropic # Anthropic/Claude集成
langchain-voyageai # Voyage AI嵌入
langchain-pinecone # Pinecone向量存储Core Concepts
核心概念
1. LangGraph Agents
1. LangGraph 智能体
LangGraph is the standard for building agents in 2026. It provides:
Key Features:
- StateGraph: Explicit state management with typed state
- Durable Execution: Agents persist through failures
- Human-in-the-Loop: Inspect and modify state at any point
- Memory: Short-term and long-term memory across sessions
- Checkpointing: Save and resume agent state
Agent Patterns:
- ReAct: Reasoning + Acting with
create_react_agent - Plan-and-Execute: Separate planning and execution nodes
- Multi-Agent: Supervisor routing between specialized agents
- Tool-Calling: Structured tool invocation with Pydantic schemas
LangGraph是2026年构建智能体的标准框架,具备以下特性:
核心功能:
- StateGraph:基于类型化状态的显式状态管理
- Durable Execution:智能体可在故障后恢复执行
- Human-in-the-Loop:可在任意节点检查和修改状态
- Memory:支持会话间的短期与长期记忆
- Checkpointing:保存与恢复智能体状态
智能体模式:
- ReAct:推理+执行模式,使用
create_react_agent - Plan-and-Execute:分离规划与执行节点
- Multi-Agent:由调度器路由的多专业智能体协作
- Tool-Calling:基于Pydantic schema的结构化工具调用
2. State Management
2. 状态管理
LangGraph uses TypedDict for explicit state:
python
from typing import Annotated, TypedDict
from langgraph.graph import MessagesStateLangGraph使用TypedDict实现显式状态:
python
from typing import Annotated, TypedDict
from langgraph.graph import MessagesStateSimple message-based state
基于消息的简单状态
class AgentState(MessagesState):
"""Extends MessagesState with custom fields."""
context: Annotated[list, "retrieved documents"]
class AgentState(MessagesState):
"""扩展MessagesState,添加自定义字段。"""
context: Annotated[list, "检索到的文档"]
Custom state for complex agents
复杂智能体的自定义状态
class CustomState(TypedDict):
messages: Annotated[list, "conversation history"]
context: Annotated[dict, "retrieved context"]
current_step: str
results: list
undefinedclass CustomState(TypedDict):
messages: Annotated[list, "对话历史"]
context: Annotated[dict, "检索到的上下文"]
current_step: str
results: list
undefined3. Memory Systems
3. 记忆系统
Modern memory implementations:
- ConversationBufferMemory: Stores all messages (short conversations)
- ConversationSummaryMemory: Summarizes older messages (long conversations)
- ConversationTokenBufferMemory: Token-based windowing
- VectorStoreRetrieverMemory: Semantic similarity retrieval
- LangGraph Checkpointers: Persistent state across sessions
现代记忆实现方案:
- ConversationBufferMemory:存储所有消息(适用于短对话)
- ConversationSummaryMemory:总结旧消息(适用于长对话)
- ConversationTokenBufferMemory:基于Token的窗口管理
- VectorStoreRetrieverMemory:基于语义相似度的记忆检索
- LangGraph Checkpointers:跨会话的持久化状态
4. Document Processing
4. 文档处理
Loading, transforming, and storing documents:
Components:
- Document Loaders: Load from various sources
- Text Splitters: Chunk documents intelligently
- Vector Stores: Store and retrieve embeddings
- Retrievers: Fetch relevant documents
文档的加载、转换与存储:
核心组件:
- Document Loaders:从多源加载文档
- Text Splitters:智能切分文档内容
- Vector Stores:存储与检索嵌入向量
- Retrievers:获取相关文档
5. Callbacks & Tracing
5. 回调与追踪
LangSmith is the standard for observability:
- Request/response logging
- Token usage tracking
- Latency monitoring
- Error tracking
- Trace visualization
LangSmith是标准的可观测性平台:
- 请求/响应日志
- Token使用量追踪
- 延迟监控
- 错误追踪
- 执行链路可视化
Quick Start
快速开始
Modern ReAct Agent with LangGraph
基于LangGraph的现代ReAct智能体
python
from langgraph.prebuilt import create_react_agent
from langgraph.checkpoint.memory import MemorySaver
from langchain_anthropic import ChatAnthropic
from langchain_core.tools import tool
import ast
import operatorpython
from langgraph.prebuilt import create_react_agent
from langgraph.checkpoint.memory import MemorySaver
from langchain_anthropic import ChatAnthropic
from langchain_core.tools import tool
import ast
import operatorInitialize LLM (Claude Sonnet 4.5 recommended)
初始化LLM(推荐使用Claude Sonnet 4.5)
llm = ChatAnthropic(model="claude-sonnet-4-5", temperature=0)
llm = ChatAnthropic(model="claude-sonnet-4-5", temperature=0)
Define tools with Pydantic schemas
使用Pydantic schema定义工具
@tool
def search_database(query: str) -> str:
"""Search internal database for information."""
# Your database search logic
return f"Results for: {query}"
@tool
def calculate(expression: str) -> str:
"""Safely evaluate a mathematical expression.
Supports: +, -, *, /, **, %, parentheses
Example: '(2 + 3) * 4' returns '20'
"""
# Safe math evaluation using ast
allowed_operators = {
ast.Add: operator.add,
ast.Sub: operator.sub,
ast.Mult: operator.mul,
ast.Div: operator.truediv,
ast.Pow: operator.pow,
ast.Mod: operator.mod,
ast.USub: operator.neg,
}
def _eval(node):
if isinstance(node, ast.Constant):
return node.value
elif isinstance(node, ast.BinOp):
left = _eval(node.left)
right = _eval(node.right)
return allowed_operators[type(node.op)](left, right)
elif isinstance(node, ast.UnaryOp):
operand = _eval(node.operand)
return allowed_operators[type(node.op)](operand)
else:
raise ValueError(f"Unsupported operation: {type(node)}")
try:
tree = ast.parse(expression, mode='eval')
return str(_eval(tree.body))
except Exception as e:
return f"Error: {e}"tools = [search_database, calculate]
@tool
def search_database(query: str) -> str:
"""检索内部数据库信息。"""
# 数据库检索逻辑
return f"搜索结果: {query}"
@tool
def calculate(expression: str) -> str:
"""安全计算数学表达式。
支持运算符: +, -, *, /, **, %, 括号
示例: '(2 + 3) * 4' 返回 '20'
"""
# 使用ast实现安全计算
allowed_operators = {
ast.Add: operator.add,
ast.Sub: operator.sub,
ast.Mult: operator.mul,
ast.Div: operator.truediv,
ast.Pow: operator.pow,
ast.Mod: operator.mod,
ast.USub: operator.neg,
}
def _eval(node):
if isinstance(node, ast.Constant):
return node.value
elif isinstance(node, ast.BinOp):
left = _eval(node.left)
right = _eval(node.right)
return allowed_operators[type(node.op)](left, right)
elif isinstance(node, ast.UnaryOp):
operand = _eval(node.operand)
return allowed_operators[type(node.op)](operand)
else:
raise ValueError(f"不支持的操作: {type(node)}")
try:
tree = ast.parse(expression, mode='eval')
return str(_eval(tree.body))
except Exception as e:
return f"错误: {e}"tools = [search_database, calculate]
Create checkpointer for memory persistence
创建用于记忆持久化的检查点
checkpointer = MemorySaver()
checkpointer = MemorySaver()
Create ReAct agent
创建ReAct智能体
agent = create_react_agent(
llm,
tools,
checkpointer=checkpointer
)
agent = create_react_agent(
llm,
tools,
checkpointer=checkpointer
)
Run agent with thread ID for memory
使用线程ID维护记忆
config = {"configurable": {"thread_id": "user-123"}}
result = await agent.ainvoke(
{"messages": [("user", "Search for Python tutorials and calculate 25 * 4")]},
config=config
)
undefinedconfig = {"configurable": {"thread_id": "user-123"}}
result = await agent.ainvoke(
{"messages": [("user", "搜索Python教程并计算25 * 4")]},
config=config
)
undefinedArchitecture Patterns
架构模式
Pattern 1: RAG with LangGraph
模式1:基于LangGraph的RAG
python
from langgraph.graph import StateGraph, START, END
from langchain_anthropic import ChatAnthropic
from langchain_voyageai import VoyageAIEmbeddings
from langchain_pinecone import PineconeVectorStore
from langchain_core.documents import Document
from langchain_core.prompts import ChatPromptTemplate
from typing import TypedDict, Annotated
class RAGState(TypedDict):
question: str
context: Annotated[list[Document], "retrieved documents"]
answer: strpython
from langgraph.graph import StateGraph, START, END
from langchain_anthropic import ChatAnthropic
from langchain_voyageai import VoyageAIEmbeddings
from langchain_pinecone import PineconeVectorStore
from langchain_core.documents import Document
from langchain_core.prompts import ChatPromptTemplate
from typing import TypedDict, Annotated
class RAGState(TypedDict):
question: str
context: Annotated[list[Document], "检索到的文档"]
answer: strInitialize components
初始化组件
llm = ChatAnthropic(model="claude-sonnet-4-5")
embeddings = VoyageAIEmbeddings(model="voyage-3-large")
vectorstore = PineconeVectorStore(index_name="docs", embedding=embeddings)
retriever = vectorstore.as_retriever(search_kwargs={"k": 4})
llm = ChatAnthropic(model="claude-sonnet-4-5")
embeddings = VoyageAIEmbeddings(model="voyage-3-large")
vectorstore = PineconeVectorStore(index_name="docs", embedding=embeddings)
retriever = vectorstore.as_retriever(search_kwargs={"k": 4})
Define nodes
定义节点
async def retrieve(state: RAGState) -> RAGState:
"""Retrieve relevant documents."""
docs = await retriever.ainvoke(state["question"])
return {"context": docs}
async def generate(state: RAGState) -> RAGState:
"""Generate answer from context."""
prompt = ChatPromptTemplate.from_template(
"""Answer based on the context below. If you cannot answer, say so.
Context: {context}
Question: {question}
Answer:"""
)
context_text = "\n\n".join(doc.page_content for doc in state["context"])
response = await llm.ainvoke(
prompt.format(context=context_text, question=state["question"])
)
return {"answer": response.content}async def retrieve(state: RAGState) -> RAGState:
"""检索相关文档。"""
docs = await retriever.ainvoke(state["question"])
return {"context": docs}
async def generate(state: RAGState) -> RAGState:
"""基于上下文生成回答。"""
prompt = ChatPromptTemplate.from_template(
"""基于以下上下文回答问题。如果无法回答,请直接说明。
上下文: {context}
问题: {question}
回答:"""
)
context_text = "\n\n".join(doc.page_content for doc in state["context"])
response = await llm.ainvoke(
prompt.format(context=context_text, question=state["question"])
)
return {"answer": response.content}Build graph
构建图
builder = StateGraph(RAGState)
builder.add_node("retrieve", retrieve)
builder.add_node("generate", generate)
builder.add_edge(START, "retrieve")
builder.add_edge("retrieve", "generate")
builder.add_edge("generate", END)
rag_chain = builder.compile()
builder = StateGraph(RAGState)
builder.add_node("retrieve", retrieve)
builder.add_node("generate", generate)
builder.add_edge(START, "retrieve")
builder.add_edge("retrieve", "generate")
builder.add_edge("generate", END)
rag_chain = builder.compile()
Use the chain
使用RAG链
result = await rag_chain.ainvoke({"question": "What is the main topic?"})
undefinedresult = await rag_chain.ainvoke({"question": "核心主题是什么?"})
undefinedPattern 2: Custom Agent with Structured Tools
模式2:带结构化工具的自定义智能体
python
from langchain_core.tools import StructuredTool
from pydantic import BaseModel, Field
class SearchInput(BaseModel):
"""Input for database search."""
query: str = Field(description="Search query")
filters: dict = Field(default={}, description="Optional filters")
class EmailInput(BaseModel):
"""Input for sending email."""
recipient: str = Field(description="Email recipient")
subject: str = Field(description="Email subject")
content: str = Field(description="Email body")
async def search_database(query: str, filters: dict = {}) -> str:
"""Search internal database for information."""
# Your database search logic
return f"Results for '{query}' with filters {filters}"
async def send_email(recipient: str, subject: str, content: str) -> str:
"""Send an email to specified recipient."""
# Email sending logic
return f"Email sent to {recipient}"
tools = [
StructuredTool.from_function(
coroutine=search_database,
name="search_database",
description="Search internal database",
args_schema=SearchInput
),
StructuredTool.from_function(
coroutine=send_email,
name="send_email",
description="Send an email",
args_schema=EmailInput
)
]
agent = create_react_agent(llm, tools)python
from langchain_core.tools import StructuredTool
from pydantic import BaseModel, Field
class SearchInput(BaseModel):
"""数据库检索输入参数。"""
query: str = Field(description="检索关键词")
filters: dict = Field(default={}, description="可选过滤条件")
class EmailInput(BaseModel):
"""发送邮件输入参数。"""
recipient: str = Field(description="邮件收件人")
subject: str = Field(description="邮件主题")
content: str = Field(description="邮件内容")
async def search_database(query: str, filters: dict = {}) -> str:
"""检索内部数据库信息。"""
# 数据库检索逻辑
return f"'{query}'的检索结果,过滤条件: {filters}"
async def send_email(recipient: str, subject: str, content: str) -> str:
"""向指定收件人发送邮件。"""
# 邮件发送逻辑
return f"已向{recipient}发送邮件"
tools = [
StructuredTool.from_function(
coroutine=search_database,
name="search_database",
description="检索内部数据库",
args_schema=SearchInput
),
StructuredTool.from_function(
coroutine=send_email,
name="send_email",
description="发送邮件",
args_schema=EmailInput
)
]
agent = create_react_agent(llm, tools)Pattern 3: Multi-Step Workflow with StateGraph
模式3:基于StateGraph的多步骤工作流
python
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Literal
class WorkflowState(TypedDict):
text: str
entities: list
analysis: str
summary: str
current_step: str
async def extract_entities(state: WorkflowState) -> WorkflowState:
"""Extract key entities from text."""
prompt = f"Extract key entities from: {state['text']}\n\nReturn as JSON list."
response = await llm.ainvoke(prompt)
return {"entities": response.content, "current_step": "analyze"}
async def analyze_entities(state: WorkflowState) -> WorkflowState:
"""Analyze extracted entities."""
prompt = f"Analyze these entities: {state['entities']}\n\nProvide insights."
response = await llm.ainvoke(prompt)
return {"analysis": response.content, "current_step": "summarize"}
async def generate_summary(state: WorkflowState) -> WorkflowState:
"""Generate final summary."""
prompt = f"""Summarize:
Entities: {state['entities']}
Analysis: {state['analysis']}
Provide a concise summary."""
response = await llm.ainvoke(prompt)
return {"summary": response.content, "current_step": "complete"}
def route_step(state: WorkflowState) -> Literal["analyze", "summarize", "end"]:
"""Route to next step based on current state."""
step = state.get("current_step", "extract")
if step == "analyze":
return "analyze"
elif step == "summarize":
return "summarize"
return "end"python
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Literal
class WorkflowState(TypedDict):
text: str
entities: list
analysis: str
summary: str
current_step: str
async def extract_entities(state: WorkflowState) -> WorkflowState:
"""从文本中提取关键实体。"""
prompt = f"从以下文本提取关键实体: {state['text']}\n\n以JSON列表返回。"
response = await llm.ainvoke(prompt)
return {"entities": response.content, "current_step": "analyze"}
async def analyze_entities(state: WorkflowState) -> WorkflowState:
"""分析提取的实体。"""
prompt = f"分析以下实体: {state['entities']}\n\n提供分析见解。"
response = await llm.ainvoke(prompt)
return {"analysis": response.content, "current_step": "summarize"}
async def generate_summary(state: WorkflowState) -> WorkflowState:
"""生成最终总结。"""
prompt = f"""总结以下内容:
实体: {state['entities']}
分析: {state['analysis']}
提供简洁总结。"""
response = await llm.ainvoke(prompt)
return {"summary": response.content, "current_step": "complete"}
def route_step(state: WorkflowState) -> Literal["analyze", "summarize", "end"]:
"""根据当前状态路由到下一步。"""
step = state.get("current_step", "extract")
if step == "analyze":
return "analyze"
elif step == "summarize":
return "summarize"
return "end"Build workflow
构建工作流
builder = StateGraph(WorkflowState)
builder.add_node("extract", extract_entities)
builder.add_node("analyze", analyze_entities)
builder.add_node("summarize", generate_summary)
builder.add_edge(START, "extract")
builder.add_conditional_edges("extract", route_step, {
"analyze": "analyze",
"summarize": "summarize",
"end": END
})
builder.add_conditional_edges("analyze", route_step, {
"summarize": "summarize",
"end": END
})
builder.add_edge("summarize", END)
workflow = builder.compile()
undefinedbuilder = StateGraph(WorkflowState)
builder.add_node("extract", extract_entities)
builder.add_node("analyze", analyze_entities)
builder.add_node("summarize", generate_summary)
builder.add_edge(START, "extract")
builder.add_conditional_edges("extract", route_step, {
"analyze": "analyze",
"summarize": "summarize",
"end": END
})
builder.add_conditional_edges("analyze", route_step, {
"summarize": "summarize",
"end": END
})
builder.add_edge("summarize", END)
workflow = builder.compile()
undefinedPattern 4: Multi-Agent Orchestration
模式4:多智能体编排
python
from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import create_react_agent
from langchain_core.messages import HumanMessage
from typing import Literal
class MultiAgentState(TypedDict):
messages: list
next_agent: strpython
from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import create_react_agent
from langchain_core.messages import HumanMessage
from typing import Literal
class MultiAgentState(TypedDict):
messages: list
next_agent: strCreate specialized agents
创建专业智能体
researcher = create_react_agent(llm, research_tools)
writer = create_react_agent(llm, writing_tools)
reviewer = create_react_agent(llm, review_tools)
async def supervisor(state: MultiAgentState) -> MultiAgentState:
"""Route to appropriate agent based on task."""
prompt = f"""Based on the conversation, which agent should handle this?
Options:
- researcher: For finding information
- writer: For creating content
- reviewer: For reviewing and editing
- FINISH: Task is complete
Messages: {state['messages']}
Respond with just the agent name."""
response = await llm.ainvoke(prompt)
return {"next_agent": response.content.strip().lower()}def route_to_agent(state: MultiAgentState) -> Literal["researcher", "writer", "reviewer", "end"]:
"""Route based on supervisor decision."""
next_agent = state.get("next_agent", "").lower()
if next_agent == "finish":
return "end"
return next_agent if next_agent in ["researcher", "writer", "reviewer"] else "end"
researcher = create_react_agent(llm, research_tools)
writer = create_react_agent(llm, writing_tools)
reviewer = create_react_agent(llm, review_tools)
async def supervisor(state: MultiAgentState) -> MultiAgentState:
"""根据任务路由到合适的智能体。"""
prompt = f"""根据对话内容,选择合适的处理智能体:
选项:
- researcher: 用于信息检索
- writer: 用于内容创作
- reviewer: 用于内容审核
- FINISH: 任务已完成
对话记录: {state['messages']}
仅返回智能体名称。"""
response = await llm.ainvoke(prompt)
return {"next_agent": response.content.strip().lower()}def route_to_agent(state: MultiAgentState) -> Literal["researcher", "writer", "reviewer", "end"]:
"""根据调度器决策路由。"""
next_agent = state.get("next_agent", "").lower()
if next_agent == "finish":
return "end"
return next_agent if next_agent in ["researcher", "writer", "reviewer"] else "end"
Build multi-agent graph
构建多智能体图
builder = StateGraph(MultiAgentState)
builder.add_node("supervisor", supervisor)
builder.add_node("researcher", researcher)
builder.add_node("writer", writer)
builder.add_node("reviewer", reviewer)
builder.add_edge(START, "supervisor")
builder.add_conditional_edges("supervisor", route_to_agent, {
"researcher": "researcher",
"writer": "writer",
"reviewer": "reviewer",
"end": END
})
builder = StateGraph(MultiAgentState)
builder.add_node("supervisor", supervisor)
builder.add_node("researcher", researcher)
builder.add_node("writer", writer)
builder.add_node("reviewer", reviewer)
builder.add_edge(START, "supervisor")
builder.add_conditional_edges("supervisor", route_to_agent, {
"researcher": "researcher",
"writer": "writer",
"reviewer": "reviewer",
"end": END
})
Each agent returns to supervisor
每个智能体完成后返回调度器
for agent in ["researcher", "writer", "reviewer"]:
builder.add_edge(agent, "supervisor")
multi_agent = builder.compile()
undefinedfor agent in ["researcher", "writer", "reviewer"]:
builder.add_edge(agent, "supervisor")
multi_agent = builder.compile()
undefinedMemory Management
记忆管理
Token-Based Memory with LangGraph
基于LangGraph的Token级记忆
python
from langgraph.checkpoint.memory import MemorySaver
from langgraph.prebuilt import create_react_agentpython
from langgraph.checkpoint.memory import MemorySaver
from langgraph.prebuilt import create_react_agentIn-memory checkpointer (development)
内存型检查点(开发环境)
checkpointer = MemorySaver()
checkpointer = MemorySaver()
Create agent with persistent memory
创建带持久化记忆的智能体
agent = create_react_agent(llm, tools, checkpointer=checkpointer)
agent = create_react_agent(llm, tools, checkpointer=checkpointer)
Each thread_id maintains separate conversation
每个thread_id维护独立对话
config = {"configurable": {"thread_id": "session-abc123"}}
config = {"configurable": {"thread_id": "session-abc123"}}
Messages persist across invocations with same thread_id
相同thread_id的调用会保留对话历史
result1 = await agent.ainvoke({"messages": [("user", "My name is Alice")]}, config)
result2 = await agent.ainvoke({"messages": [("user", "What's my name?")]}, config)
result1 = await agent.ainvoke({"messages": [("user", "我叫Alice")]}, config)
result2 = await agent.ainvoke({"messages": [("user", "我叫什么名字?")]}, config)
Agent remembers: "Your name is Alice"
智能体将回复: "你的名字是Alice"
undefinedundefinedProduction Memory with PostgreSQL
基于PostgreSQL的生产级记忆
python
from langgraph.checkpoint.postgres import PostgresSaverpython
from langgraph.checkpoint.postgres import PostgresSaverProduction checkpointer
生产环境检查点
checkpointer = PostgresSaver.from_conn_string(
"postgresql://user:pass@localhost/langgraph"
)
agent = create_react_agent(llm, tools, checkpointer=checkpointer)
undefinedcheckpointer = PostgresSaver.from_conn_string(
"postgresql://user:pass@localhost/langgraph"
)
agent = create_react_agent(llm, tools, checkpointer=checkpointer)
undefinedVector Store Memory for Long-Term Context
用于长期上下文的向量存储记忆
python
from langchain_community.vectorstores import Chroma
from langchain_voyageai import VoyageAIEmbeddings
embeddings = VoyageAIEmbeddings(model="voyage-3-large")
memory_store = Chroma(
collection_name="conversation_memory",
embedding_function=embeddings,
persist_directory="./memory_db"
)
async def retrieve_relevant_memory(query: str, k: int = 5) -> list:
"""Retrieve relevant past conversations."""
docs = await memory_store.asimilarity_search(query, k=k)
return [doc.page_content for doc in docs]
async def store_memory(content: str, metadata: dict = {}):
"""Store conversation in long-term memory."""
await memory_store.aadd_texts([content], metadatas=[metadata])python
from langchain_community.vectorstores import Chroma
from langchain_voyageai import VoyageAIEmbeddings
embeddings = VoyageAIEmbeddings(model="voyage-3-large")
memory_store = Chroma(
collection_name="conversation_memory",
embedding_function=embeddings,
persist_directory="./memory_db"
)
async def retrieve_relevant_memory(query: str, k: int = 5) -> list:
"""检索相关历史对话。"""
docs = await memory_store.asimilarity_search(query, k=k)
return [doc.page_content for doc in docs]
async def store_memory(content: str, metadata: dict = {}):
"""将对话存储到长期记忆。"""
await memory_store.aadd_texts([content], metadatas=[metadata])Callback System & LangSmith
回调系统与LangSmith
LangSmith Tracing
LangSmith追踪
python
import os
from langchain_anthropic import ChatAnthropicpython
import os
from langchain_anthropic import ChatAnthropicEnable LangSmith tracing
启用LangSmith追踪
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-api-key"
os.environ["LANGCHAIN_PROJECT"] = "my-project"
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-api-key"
os.environ["LANGCHAIN_PROJECT"] = "my-project"
All LangChain/LangGraph operations are automatically traced
所有LangChain/LangGraph操作将自动被追踪
llm = ChatAnthropic(model="claude-sonnet-4-5")
undefinedllm = ChatAnthropic(model="claude-sonnet-4-5")
undefinedCustom Callback Handler
自定义回调处理器
python
from langchain_core.callbacks import BaseCallbackHandler
from typing import Any, Dict, List
class CustomCallbackHandler(BaseCallbackHandler):
def on_llm_start(
self, serialized: Dict[str, Any], prompts: List[str], **kwargs
) -> None:
print(f"LLM started with {len(prompts)} prompts")
def on_llm_end(self, response, **kwargs) -> None:
print(f"LLM completed: {len(response.generations)} generations")
def on_llm_error(self, error: Exception, **kwargs) -> None:
print(f"LLM error: {error}")
def on_tool_start(
self, serialized: Dict[str, Any], input_str: str, **kwargs
) -> None:
print(f"Tool started: {serialized.get('name')}")
def on_tool_end(self, output: str, **kwargs) -> None:
print(f"Tool completed: {output[:100]}...")python
from langchain_core.callbacks import BaseCallbackHandler
from typing import Any, Dict, List
class CustomCallbackHandler(BaseCallbackHandler):
def on_llm_start(
self, serialized: Dict[str, Any], prompts: List[str], **kwargs
) -> None:
print(f"LLM启动,共{len(prompts)}个提示词")
def on_llm_end(self, response, **kwargs) -> None:
print(f"LLM完成,生成{len(response.generations)}个结果")
def on_llm_error(self, error: Exception, **kwargs) -> None:
print(f"LLM错误: {error}")
def on_tool_start(
self, serialized: Dict[str, Any], input_str: str, **kwargs
) -> None:
print(f"工具启动: {serialized.get('name')}")
def on_tool_end(self, output: str, **kwargs) -> None:
print(f"工具完成: {output[:100]}...")Use callbacks
使用自定义回调
result = await agent.ainvoke(
{"messages": [("user", "query")]},
config={"callbacks": [CustomCallbackHandler()]}
)
undefinedresult = await agent.ainvoke(
{"messages": [("user", "query")]},
config={"callbacks": [CustomCallbackHandler()]}
)
undefinedStreaming Responses
流式响应
python
from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-sonnet-4-5", streaming=True)python
from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-sonnet-4-5", streaming=True)Stream tokens
流式输出Token
async for chunk in llm.astream("Tell me a story"):
print(chunk.content, end="", flush=True)
async for chunk in llm.astream("给我讲个故事"):
print(chunk.content, end="", flush=True)
Stream agent events
流式输出智能体事件
async for event in agent.astream_events(
{"messages": [("user", "Search and summarize")]},
version="v2"
):
if event["event"] == "on_chat_model_stream":
print(event["data"]["chunk"].content, end="")
elif event["event"] == "on_tool_start":
print(f"\n[Using tool: {event['name']}]")
undefinedasync for event in agent.astream_events(
{"messages": [("user", "搜索并总结")]},
version="v2"
):
if event["event"] == "on_chat_model_stream":
print(event["data"]["chunk"].content, end="")
elif event["event"] == "on_tool_start":
print(f"\n[正在使用工具: {event['name']}]")
undefinedTesting Strategies
测试策略
python
import pytest
from unittest.mock import AsyncMock, patch
@pytest.mark.asyncio
async def test_agent_tool_selection():
"""Test agent selects correct tool."""
with patch.object(llm, 'ainvoke') as mock_llm:
mock_llm.return_value = AsyncMock(content="Using search_database")
result = await agent.ainvoke({
"messages": [("user", "search for documents")]
})
# Verify tool was called
assert "search_database" in str(result)
@pytest.mark.asyncio
async def test_memory_persistence():
"""Test memory persists across invocations."""
config = {"configurable": {"thread_id": "test-thread"}}
# First message
await agent.ainvoke(
{"messages": [("user", "Remember: the code is 12345")]},
config
)
# Second message should remember
result = await agent.ainvoke(
{"messages": [("user", "What was the code?")]},
config
)
assert "12345" in result["messages"][-1].contentpython
import pytest
from unittest.mock import AsyncMock, patch
@pytest.mark.asyncio
async def test_agent_tool_selection():
"""测试智能体是否选择正确工具。"""
with patch.object(llm, 'ainvoke') as mock_llm:
mock_llm.return_value = AsyncMock(content="Using search_database")
result = await agent.ainvoke({
"messages": [("user", "搜索文档")]
})
# 验证工具是否被调用
assert "search_database" in str(result)
@pytest.mark.asyncio
async def test_memory_persistence():
"""测试记忆是否在调用间持久化。"""
config = {"configurable": {"thread_id": "test-thread"}}
# 第一条消息
await agent.ainvoke(
{"messages": [("user", "记住: 验证码是12345")]},
config
)
# 第二条消息应能获取记忆
result = await agent.ainvoke(
{"messages": [("user", "验证码是什么?")]},
config
)
assert "12345" in result["messages"][-1].contentPerformance Optimization
性能优化
1. Caching with Redis
1. Redis缓存
python
from langchain_community.cache import RedisCache
from langchain_core.globals import set_llm_cache
import redis
redis_client = redis.Redis.from_url("redis://localhost:6379")
set_llm_cache(RedisCache(redis_client))python
from langchain_community.cache import RedisCache
from langchain_core.globals import set_llm_cache
import redis
redis_client = redis.Redis.from_url("redis://localhost:6379")
set_llm_cache(RedisCache(redis_client))2. Async Batch Processing
2. 异步批量处理
python
import asyncio
from langchain_core.documents import Document
async def process_documents(documents: list[Document]) -> list:
"""Process documents in parallel."""
tasks = [process_single(doc) for doc in documents]
return await asyncio.gather(*tasks)
async def process_single(doc: Document) -> dict:
"""Process a single document."""
chunks = text_splitter.split_documents([doc])
embeddings = await embeddings_model.aembed_documents(
[c.page_content for c in chunks]
)
return {"doc_id": doc.metadata.get("id"), "embeddings": embeddings}python
import asyncio
from langchain_core.documents import Document
async def process_documents(documents: list[Document]) -> list:
"""并行处理文档。"""
tasks = [process_single(doc) for doc in documents]
return await asyncio.gather(*tasks)
async def process_single(doc: Document) -> dict:
"""处理单个文档。"""
chunks = text_splitter.split_documents([doc])
embeddings = await embeddings_model.aembed_documents(
[c.page_content for c in chunks]
)
return {"doc_id": doc.metadata.get("id"), "embeddings": embeddings}3. Connection Pooling
3. 连接池
python
from langchain_pinecone import PineconeVectorStore
from pinecone import Pineconepython
from langchain_pinecone import PineconeVectorStore
from pinecone import PineconeReuse Pinecone client
复用Pinecone客户端
pc = Pinecone(api_key=os.environ["PINECONE_API_KEY"])
index = pc.Index("my-index")
pc = Pinecone(api_key=os.environ["PINECONE_API_KEY"])
index = pc.Index("my-index")
Create vector store with existing index
使用现有索引创建向量存储
vectorstore = PineconeVectorStore(index=index, embedding=embeddings)
undefinedvectorstore = PineconeVectorStore(index=index, embedding=embeddings)
undefinedResources
资源链接
Common Pitfalls
常见陷阱
- Using Deprecated APIs: Use LangGraph for agents, not
initialize_agent - Memory Overflow: Use checkpointers with TTL for long-running agents
- Poor Tool Descriptions: Clear descriptions help LLM select correct tools
- Context Window Exceeded: Use summarization or sliding window memory
- No Error Handling: Wrap tool functions with try/except
- Blocking Operations: Use async methods (,
ainvoke)astream - Missing Observability: Always enable LangSmith tracing in production
- 使用已废弃API:使用LangGraph构建智能体,而非
initialize_agent - 内存溢出:为长期运行的智能体使用带TTL的检查点
- 工具描述模糊:清晰的描述帮助LLM选择正确工具
- 上下文窗口超限:使用总结或滑动窗口记忆
- 缺少错误处理:为工具函数添加try/except包裹
- 阻塞操作:使用异步方法(,
ainvoke)astream - 缺少可观测性:生产环境必须启用LangSmith追踪
Production Checklist
生产环境检查清单
- Use LangGraph StateGraph for agent orchestration
- Implement async patterns throughout (,
ainvoke)astream - Add production checkpointer (PostgreSQL, Redis)
- Enable LangSmith tracing
- Implement structured tools with Pydantic schemas
- Add timeout limits for agent execution
- Implement rate limiting
- Add comprehensive error handling
- Set up health checks
- Version control prompts and configurations
- Write integration tests for agent workflows
- 使用LangGraph StateGraph进行智能体编排
- 全链路实现异步模式(,
ainvoke)astream - 配置生产级检查点(PostgreSQL, Redis)
- 启用LangSmith追踪
- 实现基于Pydantic schema的结构化工具
- 为智能体执行添加超时限制
- 实现速率限制
- 添加全面的错误处理
- 配置健康检查
- 版本控制提示词与配置
- 为智能体工作流编写集成测试