langchain-architecture

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

LangChain & LangGraph Architecture

LangChain & LangGraph 架构设计

Master modern LangChain 1.x and LangGraph for building sophisticated LLM applications with agents, state management, memory, and tool integration.
掌握现代LangChain 1.x与LangGraph技术,构建具备智能体、状态管理、记忆功能及工具集成的复杂LLM应用。

When to Use This Skill

技能适用场景

  • Building autonomous AI agents with tool access
  • Implementing complex multi-step LLM workflows
  • Managing conversation memory and state
  • Integrating LLMs with external data sources and APIs
  • Creating modular, reusable LLM application components
  • Implementing document processing pipelines
  • Building production-grade LLM applications
  • 构建具备工具调用能力的自主AI Agent
  • 实现复杂的多步骤LLM工作流
  • 管理对话记忆与状态
  • 将LLM与外部数据源及API集成
  • 创建模块化、可复用的LLM应用组件
  • 实现文档处理流水线
  • 构建生产级LLM应用

Package Structure (LangChain 1.x)

包结构(LangChain 1.x)

langchain (1.2.x)         # High-level orchestration
langchain-core (1.2.x)    # Core abstractions (messages, prompts, tools)
langchain-community       # Third-party integrations
langgraph                 # Agent orchestration and state management
langchain-openai          # OpenAI integrations
langchain-anthropic       # Anthropic/Claude integrations
langchain-voyageai        # Voyage AI embeddings
langchain-pinecone        # Pinecone vector store
langchain (1.2.x)         # 高层编排框架
langchain-core (1.2.x)    # 核心抽象(消息、提示词、工具)
langchain-community       # 第三方集成
langgraph                 # 智能体编排与状态管理
langchain-openai          # OpenAI集成
langchain-anthropic       # Anthropic/Claude集成
langchain-voyageai        # Voyage AI嵌入
langchain-pinecone        # Pinecone向量存储

Core Concepts

核心概念

1. LangGraph Agents

1. LangGraph 智能体

LangGraph is the standard for building agents in 2026. It provides:
Key Features:
  • StateGraph: Explicit state management with typed state
  • Durable Execution: Agents persist through failures
  • Human-in-the-Loop: Inspect and modify state at any point
  • Memory: Short-term and long-term memory across sessions
  • Checkpointing: Save and resume agent state
Agent Patterns:
  • ReAct: Reasoning + Acting with
    create_react_agent
  • Plan-and-Execute: Separate planning and execution nodes
  • Multi-Agent: Supervisor routing between specialized agents
  • Tool-Calling: Structured tool invocation with Pydantic schemas
LangGraph是2026年构建智能体的标准框架,具备以下特性:
核心功能:
  • StateGraph:基于类型化状态的显式状态管理
  • Durable Execution:智能体可在故障后恢复执行
  • Human-in-the-Loop:可在任意节点检查和修改状态
  • Memory:支持会话间的短期与长期记忆
  • Checkpointing:保存与恢复智能体状态
智能体模式:
  • ReAct:推理+执行模式,使用
    create_react_agent
  • Plan-and-Execute:分离规划与执行节点
  • Multi-Agent:由调度器路由的多专业智能体协作
  • Tool-Calling:基于Pydantic schema的结构化工具调用

2. State Management

2. 状态管理

LangGraph uses TypedDict for explicit state:
python
from typing import Annotated, TypedDict
from langgraph.graph import MessagesState
LangGraph使用TypedDict实现显式状态:
python
from typing import Annotated, TypedDict
from langgraph.graph import MessagesState

Simple message-based state

基于消息的简单状态

class AgentState(MessagesState): """Extends MessagesState with custom fields.""" context: Annotated[list, "retrieved documents"]
class AgentState(MessagesState): """扩展MessagesState,添加自定义字段。""" context: Annotated[list, "检索到的文档"]

Custom state for complex agents

复杂智能体的自定义状态

class CustomState(TypedDict): messages: Annotated[list, "conversation history"] context: Annotated[dict, "retrieved context"] current_step: str results: list
undefined
class CustomState(TypedDict): messages: Annotated[list, "对话历史"] context: Annotated[dict, "检索到的上下文"] current_step: str results: list
undefined

3. Memory Systems

3. 记忆系统

Modern memory implementations:
  • ConversationBufferMemory: Stores all messages (short conversations)
  • ConversationSummaryMemory: Summarizes older messages (long conversations)
  • ConversationTokenBufferMemory: Token-based windowing
  • VectorStoreRetrieverMemory: Semantic similarity retrieval
  • LangGraph Checkpointers: Persistent state across sessions
现代记忆实现方案:
  • ConversationBufferMemory:存储所有消息(适用于短对话)
  • ConversationSummaryMemory:总结旧消息(适用于长对话)
  • ConversationTokenBufferMemory:基于Token的窗口管理
  • VectorStoreRetrieverMemory:基于语义相似度的记忆检索
  • LangGraph Checkpointers:跨会话的持久化状态

4. Document Processing

4. 文档处理

Loading, transforming, and storing documents:
Components:
  • Document Loaders: Load from various sources
  • Text Splitters: Chunk documents intelligently
  • Vector Stores: Store and retrieve embeddings
  • Retrievers: Fetch relevant documents
文档的加载、转换与存储:
核心组件:
  • Document Loaders:从多源加载文档
  • Text Splitters:智能切分文档内容
  • Vector Stores:存储与检索嵌入向量
  • Retrievers:获取相关文档

5. Callbacks & Tracing

5. 回调与追踪

LangSmith is the standard for observability:
  • Request/response logging
  • Token usage tracking
  • Latency monitoring
  • Error tracking
  • Trace visualization
LangSmith是标准的可观测性平台:
  • 请求/响应日志
  • Token使用量追踪
  • 延迟监控
  • 错误追踪
  • 执行链路可视化

Quick Start

快速开始

Modern ReAct Agent with LangGraph

基于LangGraph的现代ReAct智能体

python
from langgraph.prebuilt import create_react_agent
from langgraph.checkpoint.memory import MemorySaver
from langchain_anthropic import ChatAnthropic
from langchain_core.tools import tool
import ast
import operator
python
from langgraph.prebuilt import create_react_agent
from langgraph.checkpoint.memory import MemorySaver
from langchain_anthropic import ChatAnthropic
from langchain_core.tools import tool
import ast
import operator

Initialize LLM (Claude Sonnet 4.5 recommended)

初始化LLM(推荐使用Claude Sonnet 4.5)

llm = ChatAnthropic(model="claude-sonnet-4-5", temperature=0)
llm = ChatAnthropic(model="claude-sonnet-4-5", temperature=0)

Define tools with Pydantic schemas

使用Pydantic schema定义工具

@tool def search_database(query: str) -> str: """Search internal database for information.""" # Your database search logic return f"Results for: {query}"
@tool def calculate(expression: str) -> str: """Safely evaluate a mathematical expression.
Supports: +, -, *, /, **, %, parentheses
Example: '(2 + 3) * 4' returns '20'
"""
# Safe math evaluation using ast
allowed_operators = {
    ast.Add: operator.add,
    ast.Sub: operator.sub,
    ast.Mult: operator.mul,
    ast.Div: operator.truediv,
    ast.Pow: operator.pow,
    ast.Mod: operator.mod,
    ast.USub: operator.neg,
}

def _eval(node):
    if isinstance(node, ast.Constant):
        return node.value
    elif isinstance(node, ast.BinOp):
        left = _eval(node.left)
        right = _eval(node.right)
        return allowed_operators[type(node.op)](left, right)
    elif isinstance(node, ast.UnaryOp):
        operand = _eval(node.operand)
        return allowed_operators[type(node.op)](operand)
    else:
        raise ValueError(f"Unsupported operation: {type(node)}")

try:
    tree = ast.parse(expression, mode='eval')
    return str(_eval(tree.body))
except Exception as e:
    return f"Error: {e}"
tools = [search_database, calculate]
@tool def search_database(query: str) -> str: """检索内部数据库信息。""" # 数据库检索逻辑 return f"搜索结果: {query}"
@tool def calculate(expression: str) -> str: """安全计算数学表达式。
支持运算符: +, -, *, /, **, %, 括号
示例: '(2 + 3) * 4' 返回 '20'
"""
# 使用ast实现安全计算
allowed_operators = {
    ast.Add: operator.add,
    ast.Sub: operator.sub,
    ast.Mult: operator.mul,
    ast.Div: operator.truediv,
    ast.Pow: operator.pow,
    ast.Mod: operator.mod,
    ast.USub: operator.neg,
}

def _eval(node):
    if isinstance(node, ast.Constant):
        return node.value
    elif isinstance(node, ast.BinOp):
        left = _eval(node.left)
        right = _eval(node.right)
        return allowed_operators[type(node.op)](left, right)
    elif isinstance(node, ast.UnaryOp):
        operand = _eval(node.operand)
        return allowed_operators[type(node.op)](operand)
    else:
        raise ValueError(f"不支持的操作: {type(node)}")

try:
    tree = ast.parse(expression, mode='eval')
    return str(_eval(tree.body))
except Exception as e:
    return f"错误: {e}"
tools = [search_database, calculate]

Create checkpointer for memory persistence

创建用于记忆持久化的检查点

checkpointer = MemorySaver()
checkpointer = MemorySaver()

Create ReAct agent

创建ReAct智能体

agent = create_react_agent( llm, tools, checkpointer=checkpointer )
agent = create_react_agent( llm, tools, checkpointer=checkpointer )

Run agent with thread ID for memory

使用线程ID维护记忆

config = {"configurable": {"thread_id": "user-123"}} result = await agent.ainvoke( {"messages": [("user", "Search for Python tutorials and calculate 25 * 4")]}, config=config )
undefined
config = {"configurable": {"thread_id": "user-123"}} result = await agent.ainvoke( {"messages": [("user", "搜索Python教程并计算25 * 4")]}, config=config )
undefined

Architecture Patterns

架构模式

Pattern 1: RAG with LangGraph

模式1:基于LangGraph的RAG

python
from langgraph.graph import StateGraph, START, END
from langchain_anthropic import ChatAnthropic
from langchain_voyageai import VoyageAIEmbeddings
from langchain_pinecone import PineconeVectorStore
from langchain_core.documents import Document
from langchain_core.prompts import ChatPromptTemplate
from typing import TypedDict, Annotated

class RAGState(TypedDict):
    question: str
    context: Annotated[list[Document], "retrieved documents"]
    answer: str
python
from langgraph.graph import StateGraph, START, END
from langchain_anthropic import ChatAnthropic
from langchain_voyageai import VoyageAIEmbeddings
from langchain_pinecone import PineconeVectorStore
from langchain_core.documents import Document
from langchain_core.prompts import ChatPromptTemplate
from typing import TypedDict, Annotated

class RAGState(TypedDict):
    question: str
    context: Annotated[list[Document], "检索到的文档"]
    answer: str

Initialize components

初始化组件

llm = ChatAnthropic(model="claude-sonnet-4-5") embeddings = VoyageAIEmbeddings(model="voyage-3-large") vectorstore = PineconeVectorStore(index_name="docs", embedding=embeddings) retriever = vectorstore.as_retriever(search_kwargs={"k": 4})
llm = ChatAnthropic(model="claude-sonnet-4-5") embeddings = VoyageAIEmbeddings(model="voyage-3-large") vectorstore = PineconeVectorStore(index_name="docs", embedding=embeddings) retriever = vectorstore.as_retriever(search_kwargs={"k": 4})

Define nodes

定义节点

async def retrieve(state: RAGState) -> RAGState: """Retrieve relevant documents.""" docs = await retriever.ainvoke(state["question"]) return {"context": docs}
async def generate(state: RAGState) -> RAGState: """Generate answer from context.""" prompt = ChatPromptTemplate.from_template( """Answer based on the context below. If you cannot answer, say so.
    Context: {context}

    Question: {question}

    Answer:"""
)
context_text = "\n\n".join(doc.page_content for doc in state["context"])
response = await llm.ainvoke(
    prompt.format(context=context_text, question=state["question"])
)
return {"answer": response.content}
async def retrieve(state: RAGState) -> RAGState: """检索相关文档。""" docs = await retriever.ainvoke(state["question"]) return {"context": docs}
async def generate(state: RAGState) -> RAGState: """基于上下文生成回答。""" prompt = ChatPromptTemplate.from_template( """基于以下上下文回答问题。如果无法回答,请直接说明。
    上下文: {context}

    问题: {question}

    回答:"""
)
context_text = "\n\n".join(doc.page_content for doc in state["context"])
response = await llm.ainvoke(
    prompt.format(context=context_text, question=state["question"])
)
return {"answer": response.content}

Build graph

构建图

builder = StateGraph(RAGState) builder.add_node("retrieve", retrieve) builder.add_node("generate", generate) builder.add_edge(START, "retrieve") builder.add_edge("retrieve", "generate") builder.add_edge("generate", END)
rag_chain = builder.compile()
builder = StateGraph(RAGState) builder.add_node("retrieve", retrieve) builder.add_node("generate", generate) builder.add_edge(START, "retrieve") builder.add_edge("retrieve", "generate") builder.add_edge("generate", END)
rag_chain = builder.compile()

Use the chain

使用RAG链

result = await rag_chain.ainvoke({"question": "What is the main topic?"})
undefined
result = await rag_chain.ainvoke({"question": "核心主题是什么?"})
undefined

Pattern 2: Custom Agent with Structured Tools

模式2:带结构化工具的自定义智能体

python
from langchain_core.tools import StructuredTool
from pydantic import BaseModel, Field

class SearchInput(BaseModel):
    """Input for database search."""
    query: str = Field(description="Search query")
    filters: dict = Field(default={}, description="Optional filters")

class EmailInput(BaseModel):
    """Input for sending email."""
    recipient: str = Field(description="Email recipient")
    subject: str = Field(description="Email subject")
    content: str = Field(description="Email body")

async def search_database(query: str, filters: dict = {}) -> str:
    """Search internal database for information."""
    # Your database search logic
    return f"Results for '{query}' with filters {filters}"

async def send_email(recipient: str, subject: str, content: str) -> str:
    """Send an email to specified recipient."""
    # Email sending logic
    return f"Email sent to {recipient}"

tools = [
    StructuredTool.from_function(
        coroutine=search_database,
        name="search_database",
        description="Search internal database",
        args_schema=SearchInput
    ),
    StructuredTool.from_function(
        coroutine=send_email,
        name="send_email",
        description="Send an email",
        args_schema=EmailInput
    )
]

agent = create_react_agent(llm, tools)
python
from langchain_core.tools import StructuredTool
from pydantic import BaseModel, Field

class SearchInput(BaseModel):
    """数据库检索输入参数。"""
    query: str = Field(description="检索关键词")
    filters: dict = Field(default={}, description="可选过滤条件")

class EmailInput(BaseModel):
    """发送邮件输入参数。"""
    recipient: str = Field(description="邮件收件人")
    subject: str = Field(description="邮件主题")
    content: str = Field(description="邮件内容")

async def search_database(query: str, filters: dict = {}) -> str:
    """检索内部数据库信息。"""
    # 数据库检索逻辑
    return f"'{query}'的检索结果,过滤条件: {filters}"

async def send_email(recipient: str, subject: str, content: str) -> str:
    """向指定收件人发送邮件。"""
    # 邮件发送逻辑
    return f"已向{recipient}发送邮件"

tools = [
    StructuredTool.from_function(
        coroutine=search_database,
        name="search_database",
        description="检索内部数据库",
        args_schema=SearchInput
    ),
    StructuredTool.from_function(
        coroutine=send_email,
        name="send_email",
        description="发送邮件",
        args_schema=EmailInput
    )
]

agent = create_react_agent(llm, tools)

Pattern 3: Multi-Step Workflow with StateGraph

模式3:基于StateGraph的多步骤工作流

python
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Literal

class WorkflowState(TypedDict):
    text: str
    entities: list
    analysis: str
    summary: str
    current_step: str

async def extract_entities(state: WorkflowState) -> WorkflowState:
    """Extract key entities from text."""
    prompt = f"Extract key entities from: {state['text']}\n\nReturn as JSON list."
    response = await llm.ainvoke(prompt)
    return {"entities": response.content, "current_step": "analyze"}

async def analyze_entities(state: WorkflowState) -> WorkflowState:
    """Analyze extracted entities."""
    prompt = f"Analyze these entities: {state['entities']}\n\nProvide insights."
    response = await llm.ainvoke(prompt)
    return {"analysis": response.content, "current_step": "summarize"}

async def generate_summary(state: WorkflowState) -> WorkflowState:
    """Generate final summary."""
    prompt = f"""Summarize:
    Entities: {state['entities']}
    Analysis: {state['analysis']}

    Provide a concise summary."""
    response = await llm.ainvoke(prompt)
    return {"summary": response.content, "current_step": "complete"}

def route_step(state: WorkflowState) -> Literal["analyze", "summarize", "end"]:
    """Route to next step based on current state."""
    step = state.get("current_step", "extract")
    if step == "analyze":
        return "analyze"
    elif step == "summarize":
        return "summarize"
    return "end"
python
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Literal

class WorkflowState(TypedDict):
    text: str
    entities: list
    analysis: str
    summary: str
    current_step: str

async def extract_entities(state: WorkflowState) -> WorkflowState:
    """从文本中提取关键实体。"""
    prompt = f"从以下文本提取关键实体: {state['text']}\n\n以JSON列表返回。"
    response = await llm.ainvoke(prompt)
    return {"entities": response.content, "current_step": "analyze"}

async def analyze_entities(state: WorkflowState) -> WorkflowState:
    """分析提取的实体。"""
    prompt = f"分析以下实体: {state['entities']}\n\n提供分析见解。"
    response = await llm.ainvoke(prompt)
    return {"analysis": response.content, "current_step": "summarize"}

async def generate_summary(state: WorkflowState) -> WorkflowState:
    """生成最终总结。"""
    prompt = f"""总结以下内容:
    实体: {state['entities']}
    分析: {state['analysis']}

    提供简洁总结。"""
    response = await llm.ainvoke(prompt)
    return {"summary": response.content, "current_step": "complete"}

def route_step(state: WorkflowState) -> Literal["analyze", "summarize", "end"]:
    """根据当前状态路由到下一步。"""
    step = state.get("current_step", "extract")
    if step == "analyze":
        return "analyze"
    elif step == "summarize":
        return "summarize"
    return "end"

Build workflow

构建工作流

builder = StateGraph(WorkflowState) builder.add_node("extract", extract_entities) builder.add_node("analyze", analyze_entities) builder.add_node("summarize", generate_summary)
builder.add_edge(START, "extract") builder.add_conditional_edges("extract", route_step, { "analyze": "analyze", "summarize": "summarize", "end": END }) builder.add_conditional_edges("analyze", route_step, { "summarize": "summarize", "end": END }) builder.add_edge("summarize", END)
workflow = builder.compile()
undefined
builder = StateGraph(WorkflowState) builder.add_node("extract", extract_entities) builder.add_node("analyze", analyze_entities) builder.add_node("summarize", generate_summary)
builder.add_edge(START, "extract") builder.add_conditional_edges("extract", route_step, { "analyze": "analyze", "summarize": "summarize", "end": END }) builder.add_conditional_edges("analyze", route_step, { "summarize": "summarize", "end": END }) builder.add_edge("summarize", END)
workflow = builder.compile()
undefined

Pattern 4: Multi-Agent Orchestration

模式4:多智能体编排

python
from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import create_react_agent
from langchain_core.messages import HumanMessage
from typing import Literal

class MultiAgentState(TypedDict):
    messages: list
    next_agent: str
python
from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import create_react_agent
from langchain_core.messages import HumanMessage
from typing import Literal

class MultiAgentState(TypedDict):
    messages: list
    next_agent: str

Create specialized agents

创建专业智能体

researcher = create_react_agent(llm, research_tools) writer = create_react_agent(llm, writing_tools) reviewer = create_react_agent(llm, review_tools)
async def supervisor(state: MultiAgentState) -> MultiAgentState: """Route to appropriate agent based on task.""" prompt = f"""Based on the conversation, which agent should handle this?
Options:
- researcher: For finding information
- writer: For creating content
- reviewer: For reviewing and editing
- FINISH: Task is complete

Messages: {state['messages']}

Respond with just the agent name."""

response = await llm.ainvoke(prompt)
return {"next_agent": response.content.strip().lower()}
def route_to_agent(state: MultiAgentState) -> Literal["researcher", "writer", "reviewer", "end"]: """Route based on supervisor decision.""" next_agent = state.get("next_agent", "").lower() if next_agent == "finish": return "end" return next_agent if next_agent in ["researcher", "writer", "reviewer"] else "end"
researcher = create_react_agent(llm, research_tools) writer = create_react_agent(llm, writing_tools) reviewer = create_react_agent(llm, review_tools)
async def supervisor(state: MultiAgentState) -> MultiAgentState: """根据任务路由到合适的智能体。""" prompt = f"""根据对话内容,选择合适的处理智能体:
选项:
- researcher: 用于信息检索
- writer: 用于内容创作
- reviewer: 用于内容审核
- FINISH: 任务已完成

对话记录: {state['messages']}

仅返回智能体名称。"""

response = await llm.ainvoke(prompt)
return {"next_agent": response.content.strip().lower()}
def route_to_agent(state: MultiAgentState) -> Literal["researcher", "writer", "reviewer", "end"]: """根据调度器决策路由。""" next_agent = state.get("next_agent", "").lower() if next_agent == "finish": return "end" return next_agent if next_agent in ["researcher", "writer", "reviewer"] else "end"

Build multi-agent graph

构建多智能体图

builder = StateGraph(MultiAgentState) builder.add_node("supervisor", supervisor) builder.add_node("researcher", researcher) builder.add_node("writer", writer) builder.add_node("reviewer", reviewer)
builder.add_edge(START, "supervisor") builder.add_conditional_edges("supervisor", route_to_agent, { "researcher": "researcher", "writer": "writer", "reviewer": "reviewer", "end": END })
builder = StateGraph(MultiAgentState) builder.add_node("supervisor", supervisor) builder.add_node("researcher", researcher) builder.add_node("writer", writer) builder.add_node("reviewer", reviewer)
builder.add_edge(START, "supervisor") builder.add_conditional_edges("supervisor", route_to_agent, { "researcher": "researcher", "writer": "writer", "reviewer": "reviewer", "end": END })

Each agent returns to supervisor

每个智能体完成后返回调度器

for agent in ["researcher", "writer", "reviewer"]: builder.add_edge(agent, "supervisor")
multi_agent = builder.compile()
undefined
for agent in ["researcher", "writer", "reviewer"]: builder.add_edge(agent, "supervisor")
multi_agent = builder.compile()
undefined

Memory Management

记忆管理

Token-Based Memory with LangGraph

基于LangGraph的Token级记忆

python
from langgraph.checkpoint.memory import MemorySaver
from langgraph.prebuilt import create_react_agent
python
from langgraph.checkpoint.memory import MemorySaver
from langgraph.prebuilt import create_react_agent

In-memory checkpointer (development)

内存型检查点(开发环境)

checkpointer = MemorySaver()
checkpointer = MemorySaver()

Create agent with persistent memory

创建带持久化记忆的智能体

agent = create_react_agent(llm, tools, checkpointer=checkpointer)
agent = create_react_agent(llm, tools, checkpointer=checkpointer)

Each thread_id maintains separate conversation

每个thread_id维护独立对话

config = {"configurable": {"thread_id": "session-abc123"}}
config = {"configurable": {"thread_id": "session-abc123"}}

Messages persist across invocations with same thread_id

相同thread_id的调用会保留对话历史

result1 = await agent.ainvoke({"messages": [("user", "My name is Alice")]}, config) result2 = await agent.ainvoke({"messages": [("user", "What's my name?")]}, config)
result1 = await agent.ainvoke({"messages": [("user", "我叫Alice")]}, config) result2 = await agent.ainvoke({"messages": [("user", "我叫什么名字?")]}, config)

Agent remembers: "Your name is Alice"

智能体将回复: "你的名字是Alice"

undefined
undefined

Production Memory with PostgreSQL

基于PostgreSQL的生产级记忆

python
from langgraph.checkpoint.postgres import PostgresSaver
python
from langgraph.checkpoint.postgres import PostgresSaver

Production checkpointer

生产环境检查点

checkpointer = PostgresSaver.from_conn_string( "postgresql://user:pass@localhost/langgraph" )
agent = create_react_agent(llm, tools, checkpointer=checkpointer)
undefined
checkpointer = PostgresSaver.from_conn_string( "postgresql://user:pass@localhost/langgraph" )
agent = create_react_agent(llm, tools, checkpointer=checkpointer)
undefined

Vector Store Memory for Long-Term Context

用于长期上下文的向量存储记忆

python
from langchain_community.vectorstores import Chroma
from langchain_voyageai import VoyageAIEmbeddings

embeddings = VoyageAIEmbeddings(model="voyage-3-large")
memory_store = Chroma(
    collection_name="conversation_memory",
    embedding_function=embeddings,
    persist_directory="./memory_db"
)

async def retrieve_relevant_memory(query: str, k: int = 5) -> list:
    """Retrieve relevant past conversations."""
    docs = await memory_store.asimilarity_search(query, k=k)
    return [doc.page_content for doc in docs]

async def store_memory(content: str, metadata: dict = {}):
    """Store conversation in long-term memory."""
    await memory_store.aadd_texts([content], metadatas=[metadata])
python
from langchain_community.vectorstores import Chroma
from langchain_voyageai import VoyageAIEmbeddings

embeddings = VoyageAIEmbeddings(model="voyage-3-large")
memory_store = Chroma(
    collection_name="conversation_memory",
    embedding_function=embeddings,
    persist_directory="./memory_db"
)

async def retrieve_relevant_memory(query: str, k: int = 5) -> list:
    """检索相关历史对话。"""
    docs = await memory_store.asimilarity_search(query, k=k)
    return [doc.page_content for doc in docs]

async def store_memory(content: str, metadata: dict = {}):
    """将对话存储到长期记忆。"""
    await memory_store.aadd_texts([content], metadatas=[metadata])

Callback System & LangSmith

回调系统与LangSmith

LangSmith Tracing

LangSmith追踪

python
import os
from langchain_anthropic import ChatAnthropic
python
import os
from langchain_anthropic import ChatAnthropic

Enable LangSmith tracing

启用LangSmith追踪

os.environ["LANGCHAIN_TRACING_V2"] = "true" os.environ["LANGCHAIN_API_KEY"] = "your-api-key" os.environ["LANGCHAIN_PROJECT"] = "my-project"
os.environ["LANGCHAIN_TRACING_V2"] = "true" os.environ["LANGCHAIN_API_KEY"] = "your-api-key" os.environ["LANGCHAIN_PROJECT"] = "my-project"

All LangChain/LangGraph operations are automatically traced

所有LangChain/LangGraph操作将自动被追踪

llm = ChatAnthropic(model="claude-sonnet-4-5")
undefined
llm = ChatAnthropic(model="claude-sonnet-4-5")
undefined

Custom Callback Handler

自定义回调处理器

python
from langchain_core.callbacks import BaseCallbackHandler
from typing import Any, Dict, List

class CustomCallbackHandler(BaseCallbackHandler):
    def on_llm_start(
        self, serialized: Dict[str, Any], prompts: List[str], **kwargs
    ) -> None:
        print(f"LLM started with {len(prompts)} prompts")

    def on_llm_end(self, response, **kwargs) -> None:
        print(f"LLM completed: {len(response.generations)} generations")

    def on_llm_error(self, error: Exception, **kwargs) -> None:
        print(f"LLM error: {error}")

    def on_tool_start(
        self, serialized: Dict[str, Any], input_str: str, **kwargs
    ) -> None:
        print(f"Tool started: {serialized.get('name')}")

    def on_tool_end(self, output: str, **kwargs) -> None:
        print(f"Tool completed: {output[:100]}...")
python
from langchain_core.callbacks import BaseCallbackHandler
from typing import Any, Dict, List

class CustomCallbackHandler(BaseCallbackHandler):
    def on_llm_start(
        self, serialized: Dict[str, Any], prompts: List[str], **kwargs
    ) -> None:
        print(f"LLM启动,共{len(prompts)}个提示词")

    def on_llm_end(self, response, **kwargs) -> None:
        print(f"LLM完成,生成{len(response.generations)}个结果")

    def on_llm_error(self, error: Exception, **kwargs) -> None:
        print(f"LLM错误: {error}")

    def on_tool_start(
        self, serialized: Dict[str, Any], input_str: str, **kwargs
    ) -> None:
        print(f"工具启动: {serialized.get('name')}")

    def on_tool_end(self, output: str, **kwargs) -> None:
        print(f"工具完成: {output[:100]}...")

Use callbacks

使用自定义回调

result = await agent.ainvoke( {"messages": [("user", "query")]}, config={"callbacks": [CustomCallbackHandler()]} )
undefined
result = await agent.ainvoke( {"messages": [("user", "query")]}, config={"callbacks": [CustomCallbackHandler()]} )
undefined

Streaming Responses

流式响应

python
from langchain_anthropic import ChatAnthropic

llm = ChatAnthropic(model="claude-sonnet-4-5", streaming=True)
python
from langchain_anthropic import ChatAnthropic

llm = ChatAnthropic(model="claude-sonnet-4-5", streaming=True)

Stream tokens

流式输出Token

async for chunk in llm.astream("Tell me a story"): print(chunk.content, end="", flush=True)
async for chunk in llm.astream("给我讲个故事"): print(chunk.content, end="", flush=True)

Stream agent events

流式输出智能体事件

async for event in agent.astream_events( {"messages": [("user", "Search and summarize")]}, version="v2" ): if event["event"] == "on_chat_model_stream": print(event["data"]["chunk"].content, end="") elif event["event"] == "on_tool_start": print(f"\n[Using tool: {event['name']}]")
undefined
async for event in agent.astream_events( {"messages": [("user", "搜索并总结")]}, version="v2" ): if event["event"] == "on_chat_model_stream": print(event["data"]["chunk"].content, end="") elif event["event"] == "on_tool_start": print(f"\n[正在使用工具: {event['name']}]")
undefined

Testing Strategies

测试策略

python
import pytest
from unittest.mock import AsyncMock, patch

@pytest.mark.asyncio
async def test_agent_tool_selection():
    """Test agent selects correct tool."""
    with patch.object(llm, 'ainvoke') as mock_llm:
        mock_llm.return_value = AsyncMock(content="Using search_database")

        result = await agent.ainvoke({
            "messages": [("user", "search for documents")]
        })

        # Verify tool was called
        assert "search_database" in str(result)

@pytest.mark.asyncio
async def test_memory_persistence():
    """Test memory persists across invocations."""
    config = {"configurable": {"thread_id": "test-thread"}}

    # First message
    await agent.ainvoke(
        {"messages": [("user", "Remember: the code is 12345")]},
        config
    )

    # Second message should remember
    result = await agent.ainvoke(
        {"messages": [("user", "What was the code?")]},
        config
    )

    assert "12345" in result["messages"][-1].content
python
import pytest
from unittest.mock import AsyncMock, patch

@pytest.mark.asyncio
async def test_agent_tool_selection():
    """测试智能体是否选择正确工具。"""
    with patch.object(llm, 'ainvoke') as mock_llm:
        mock_llm.return_value = AsyncMock(content="Using search_database")

        result = await agent.ainvoke({
            "messages": [("user", "搜索文档")]
        })

        # 验证工具是否被调用
        assert "search_database" in str(result)

@pytest.mark.asyncio
async def test_memory_persistence():
    """测试记忆是否在调用间持久化。"""
    config = {"configurable": {"thread_id": "test-thread"}}

    # 第一条消息
    await agent.ainvoke(
        {"messages": [("user", "记住: 验证码是12345")]},
        config
    )

    # 第二条消息应能获取记忆
    result = await agent.ainvoke(
        {"messages": [("user", "验证码是什么?")]},
        config
    )

    assert "12345" in result["messages"][-1].content

Performance Optimization

性能优化

1. Caching with Redis

1. Redis缓存

python
from langchain_community.cache import RedisCache
from langchain_core.globals import set_llm_cache
import redis

redis_client = redis.Redis.from_url("redis://localhost:6379")
set_llm_cache(RedisCache(redis_client))
python
from langchain_community.cache import RedisCache
from langchain_core.globals import set_llm_cache
import redis

redis_client = redis.Redis.from_url("redis://localhost:6379")
set_llm_cache(RedisCache(redis_client))

2. Async Batch Processing

2. 异步批量处理

python
import asyncio
from langchain_core.documents import Document

async def process_documents(documents: list[Document]) -> list:
    """Process documents in parallel."""
    tasks = [process_single(doc) for doc in documents]
    return await asyncio.gather(*tasks)

async def process_single(doc: Document) -> dict:
    """Process a single document."""
    chunks = text_splitter.split_documents([doc])
    embeddings = await embeddings_model.aembed_documents(
        [c.page_content for c in chunks]
    )
    return {"doc_id": doc.metadata.get("id"), "embeddings": embeddings}
python
import asyncio
from langchain_core.documents import Document

async def process_documents(documents: list[Document]) -> list:
    """并行处理文档。"""
    tasks = [process_single(doc) for doc in documents]
    return await asyncio.gather(*tasks)

async def process_single(doc: Document) -> dict:
    """处理单个文档。"""
    chunks = text_splitter.split_documents([doc])
    embeddings = await embeddings_model.aembed_documents(
        [c.page_content for c in chunks]
    )
    return {"doc_id": doc.metadata.get("id"), "embeddings": embeddings}

3. Connection Pooling

3. 连接池

python
from langchain_pinecone import PineconeVectorStore
from pinecone import Pinecone
python
from langchain_pinecone import PineconeVectorStore
from pinecone import Pinecone

Reuse Pinecone client

复用Pinecone客户端

pc = Pinecone(api_key=os.environ["PINECONE_API_KEY"]) index = pc.Index("my-index")
pc = Pinecone(api_key=os.environ["PINECONE_API_KEY"]) index = pc.Index("my-index")

Create vector store with existing index

使用现有索引创建向量存储

vectorstore = PineconeVectorStore(index=index, embedding=embeddings)
undefined
vectorstore = PineconeVectorStore(index=index, embedding=embeddings)
undefined

Resources

资源链接

Common Pitfalls

常见陷阱

  1. Using Deprecated APIs: Use LangGraph for agents, not
    initialize_agent
  2. Memory Overflow: Use checkpointers with TTL for long-running agents
  3. Poor Tool Descriptions: Clear descriptions help LLM select correct tools
  4. Context Window Exceeded: Use summarization or sliding window memory
  5. No Error Handling: Wrap tool functions with try/except
  6. Blocking Operations: Use async methods (
    ainvoke
    ,
    astream
    )
  7. Missing Observability: Always enable LangSmith tracing in production
  1. 使用已废弃API:使用LangGraph构建智能体,而非
    initialize_agent
  2. 内存溢出:为长期运行的智能体使用带TTL的检查点
  3. 工具描述模糊:清晰的描述帮助LLM选择正确工具
  4. 上下文窗口超限:使用总结或滑动窗口记忆
  5. 缺少错误处理:为工具函数添加try/except包裹
  6. 阻塞操作:使用异步方法(
    ainvoke
    ,
    astream
  7. 缺少可观测性:生产环境必须启用LangSmith追踪

Production Checklist

生产环境检查清单

  • Use LangGraph StateGraph for agent orchestration
  • Implement async patterns throughout (
    ainvoke
    ,
    astream
    )
  • Add production checkpointer (PostgreSQL, Redis)
  • Enable LangSmith tracing
  • Implement structured tools with Pydantic schemas
  • Add timeout limits for agent execution
  • Implement rate limiting
  • Add comprehensive error handling
  • Set up health checks
  • Version control prompts and configurations
  • Write integration tests for agent workflows
  • 使用LangGraph StateGraph进行智能体编排
  • 全链路实现异步模式(
    ainvoke
    ,
    astream
  • 配置生产级检查点(PostgreSQL, Redis)
  • 启用LangSmith追踪
  • 实现基于Pydantic schema的结构化工具
  • 为智能体执行添加超时限制
  • 实现速率限制
  • 添加全面的错误处理
  • 配置健康检查
  • 版本控制提示词与配置
  • 为智能体工作流编写集成测试