agentic-rag-for-dummies
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseAgentic RAG for Dummies
Agentic RAG入门指南
Skill by ara.so — AI Agent Skills collection.
This skill enables you to build modular Agentic RAG (Retrieval-Augmented Generation) systems using LangGraph. The framework provides hierarchical document indexing, conversation memory, query clarification with human-in-the-loop, multi-agent map-reduce for complex queries, self-correction, and context compression.
由ara.so提供的技能 — AI Agent技能合集。
本技能可帮助你使用LangGraph构建模块化Agentic RAG(检索增强生成)系统。该框架提供分层文档索引、对话记忆、人机协作的查询澄清、复杂查询的多智能体Map-Reduce处理、自我修正以及上下文压缩功能。
What This Project Does
项目功能介绍
Agentic RAG for Dummies is a production-ready framework for building intelligent document retrieval systems that go beyond basic RAG:
- Hierarchical Indexing: Search small child chunks for precision, retrieve large parent chunks for context
- Conversation Memory: Maintains dialogue context across multiple questions
- Query Clarification: Rewrites ambiguous queries or pauses for human clarification
- Multi-Agent Orchestration: Decomposes complex queries into parallel sub-agents using LangGraph
- Self-Correction: Automatically re-queries when initial results are insufficient
- Context Compression: Prevents redundant retrievals across long conversations
- Provider Agnostic: Works with Ollama, OpenAI, Anthropic, Google, or any LangChain-supported LLM
Agentic RAG入门指南是一套可用于生产环境的框架,用于构建超越基础RAG能力的智能文档检索系统:
- 分层索引:搜索小型子块以保证精度,检索大型父块以获取完整上下文
- 对话记忆:在多轮问题中保持对话上下文
- 查询澄清:重写模糊查询或暂停以等待人工澄清
- 多智能体编排:使用LangGraph将复杂查询分解为并行子智能体处理
- 自我修正:当初始结果不足时自动重新查询
- 上下文压缩:避免长对话中的重复检索
- 兼容多提供商:支持Ollama、OpenAI、Anthropic、Google或任何LangChain兼容的LLM
Installation
安装步骤
Clone and Set Up Environment
克隆项目并配置环境
bash
git clone https://github.com/GiovanniPasq/agentic-rag-for-dummies.git
cd agentic-rag-for-dummies
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
pip install -r requirements.txtbash
git clone https://github.com/GiovanniPasq/agentic-rag-for-dummies.git
cd agentic-rag-for-dummies
python -m venv venv
source venv/bin/activate # Windows系统请执行:venv\Scripts\activate
pip install -r requirements.txtInstall Ollama (for Local LLMs)
安装Ollama(用于本地LLM)
bash
undefinedbash
undefinedDownload from https://ollama.com or use:
curl -fsSL https://ollama.com/install.sh | sh
curl -fsSL https://ollama.com/install.sh | sh
Pull a recommended model (7B+ for reliable tool calling)
拉取推荐模型(7B+参数模型可保证可靠的工具调用能力)
ollama pull qwen3:4b-instruct-2507-q4_K_M
ollama pull qwen3:4b-instruct-2507-q4_K_M
Or for better performance:
如需更好性能:
ollama pull llama3.1:8b-instruct-q4_K_M
undefinedollama pull llama3.1:8b-instruct-q4_K_M
undefinedFor Cloud Providers
云提供商配置
bash
undefinedbash
undefinedOpenAI
OpenAI
pip install langchain-openai
export OPENAI_API_KEY="your-key-here"
pip install langchain-openai
export OPENAI_API_KEY="your-key-here"
Anthropic
Anthropic
pip install langchain-anthropic
export ANTHROPIC_API_KEY="your-key-here"
pip install langchain-anthropic
export ANTHROPIC_API_KEY="your-key-here"
pip install langchain-google-genai
export GOOGLE_API_KEY="your-key-here"
undefinedpip install langchain-google-genai
export GOOGLE_API_KEY="your-key-here"
undefinedCore Configuration
核心配置
Initialize Components
初始化组件
python
import os
from pathlib import Path
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_qdrant.fastembed_sparse import FastEmbedSparse
from qdrant_client import QdrantClient
from langchain_ollama import ChatOllamapython
import os
from pathlib import Path
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_qdrant.fastembed_sparse import FastEmbedSparse
from qdrant_client import QdrantClient
from langchain_ollama import ChatOllamaDirectory structure
目录结构
DOCS_DIR = "docs" # Your PDF files
MARKDOWN_DIR = "markdown_docs" # Converted markdown
PARENT_STORE_PATH = "parent_store" # Parent chunk storage
CHILD_COLLECTION = "document_child_chunks" # Vector DB collection
os.makedirs(DOCS_DIR, exist_ok=True)
os.makedirs(MARKDOWN_DIR, exist_ok=True)
os.makedirs(PARENT_STORE_PATH, exist_ok=True)
DOCS_DIR = "docs" # 存放PDF文件
MARKDOWN_DIR = "markdown_docs" # 存放转换后的Markdown文件
PARENT_STORE_PATH = "parent_store" # 父块存储目录
CHILD_COLLECTION = "document_child_chunks" # 向量数据库集合
os.makedirs(DOCS_DIR, exist_ok=True)
os.makedirs(MARKDOWN_DIR, exist_ok=True)
os.makedirs(PARENT_STORE_PATH, exist_ok=True)
Initialize LLM (swap provider easily)
初始化LLM(可轻松切换提供商)
llm = ChatOllama(model="qwen3:4b-instruct-2507-q4_K_M", temperature=0)
llm = ChatOllama(model="qwen3:4b-instruct-2507-q4_K_M", temperature=0)
Embeddings for hybrid search
用于混合搜索的嵌入模型
dense_embeddings = HuggingFaceEmbeddings(
model_name="sentence-transformers/all-mpnet-base-v2"
)
sparse_embeddings = FastEmbedSparse(model_name="Qdrant/bm25")
dense_embeddings = HuggingFaceEmbeddings(
model_name="sentence-transformers/all-mpnet-base-v2"
)
sparse_embeddings = FastEmbedSparse(model_name="Qdrant/bm25")
Vector database
向量数据库
client = QdrantClient(path="qdrant_db")
undefinedclient = QdrantClient(path="qdrant_db")
undefinedSwitch LLM Providers
切换LLM提供商
python
undefinedpython
undefinedOpenAI
OpenAI
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
Anthropic
Anthropic
from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-sonnet-4-5-20250929", temperature=0)
from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-sonnet-4-5-20250929", temperature=0)
from langchain_google_genai import ChatGoogleGenerativeAI
llm = ChatGoogleGenerativeAI(model="gemini-2.5-flash", temperature=0)
undefinedfrom langchain_google_genai import ChatGoogleGenerativeAI
llm = ChatGoogleGenerativeAI(model="gemini-2.5-flash", temperature=0)
undefinedDocument Processing Pipeline
文档处理流程
1. Convert PDFs to Markdown
1. 将PDF转换为Markdown
python
import pymupdf
import pymupdf4llm
import glob
def pdf_to_markdown(pdf_path, output_dir):
"""Convert a single PDF to Markdown."""
doc = pymupdf.open(pdf_path)
md = pymupdf4llm.to_markdown(
doc,
header=False,
footer=False,
page_separators=True,
ignore_images=True
)
md_cleaned = md.encode('utf-8', errors='surrogatepass').decode('utf-8', errors='ignore')
output_path = Path(output_dir) / Path(doc.name).stem
Path(output_path).with_suffix(".md").write_bytes(md_cleaned.encode('utf-8'))
def pdfs_to_markdowns(path_pattern, overwrite=False):
"""Convert all PDFs matching pattern."""
output_dir = Path(MARKDOWN_DIR)
for pdf_path in map(Path, glob.glob(path_pattern)):
md_path = (output_dir / pdf_path.stem).with_suffix(".md")
if overwrite or not md_path.exists():
pdf_to_markdown(pdf_path, output_dir)python
import pymupdf
import pymupdf4llm
import glob
def pdf_to_markdown(pdf_path, output_dir):
"""将单个PDF文件转换为Markdown格式。"""
doc = pymupdf.open(pdf_path)
md = pymupdf4llm.to_markdown(
doc,
header=False,
footer=False,
page_separators=True,
ignore_images=True
)
md_cleaned = md.encode('utf-8', errors='surrogatepass').decode('utf-8', errors='ignore')
output_path = Path(output_dir) / Path(doc.name).stem
Path(output_path).with_suffix(".md").write_bytes(md_cleaned.encode('utf-8'))
def pdfs_to_markdowns(path_pattern, overwrite=False):
"""将所有匹配指定模式的PDF文件转换为Markdown格式。"""
output_dir = Path(MARKDOWN_DIR)
for pdf_path in map(Path, glob.glob(path_pattern)):
md_path = (output_dir / pdf_path.stem).with_suffix(".md")
if overwrite or not md_path.exists():
pdf_to_markdown(pdf_path, output_dir)Convert all PDFs in docs directory
转换docs目录下的所有PDF文件
pdfs_to_markdowns(f"{DOCS_DIR}/*.pdf")
undefinedpdfs_to_markdowns(f"{DOCS_DIR}/*.pdf")
undefined2. Hierarchical Chunking (Parent/Child)
2. 分层分块(父块/子块)
python
from langchain_text_splitters import MarkdownHeaderTextSplitter, RecursiveCharacterTextSplitter
import json
def process_document_hierarchical(markdown_path):
"""Split document into parent and child chunks."""
content = Path(markdown_path).read_text(encoding='utf-8')
# Parent chunks: split by headers
header_splitter = MarkdownHeaderTextSplitter(
headers_to_split_on=[
("#", "Header 1"),
("##", "Header 2"),
("###", "Header 3"),
],
strip_headers=False
)
parent_chunks = header_splitter.split_text(content)
# Child chunks: fixed-size from each parent
child_splitter = RecursiveCharacterTextSplitter(
chunk_size=500,
chunk_overlap=100
)
parent_ids = []
child_chunks = []
for i, parent in enumerate(parent_chunks):
parent_id = f"{Path(markdown_path).stem}_parent_{i}"
parent_ids.append(parent_id)
# Store parent chunk
parent_data = {
"id": parent_id,
"content": parent.page_content,
"metadata": parent.metadata
}
parent_file = Path(PARENT_STORE_PATH) / f"{parent_id}.json"
parent_file.write_text(json.dumps(parent_data, ensure_ascii=False))
# Create child chunks
children = child_splitter.split_documents([parent])
for j, child in enumerate(children):
child.metadata["parent_id"] = parent_id
child.metadata["child_index"] = j
child_chunks.append(child)
return parent_ids, child_chunkspython
from langchain_text_splitters import MarkdownHeaderTextSplitter, RecursiveCharacterTextSplitter
import json
def process_document_hierarchical(markdown_path):
"""将文档拆分为父块和子块。"""
content = Path(markdown_path).read_text(encoding='utf-8')
# 父块:按标题拆分
header_splitter = MarkdownHeaderTextSplitter(
headers_to_split_on=[
("#", "Header 1"),
("##", "Header 2"),
("###", "Header 3"),
],
strip_headers=False
)
parent_chunks = header_splitter.split_text(content)
# 子块:从每个父块中拆分固定大小的块
child_splitter = RecursiveCharacterTextSplitter(
chunk_size=500,
chunk_overlap=100
)
parent_ids = []
child_chunks = []
for i, parent in enumerate(parent_chunks):
parent_id = f"{Path(markdown_path).stem}_parent_{i}"
parent_ids.append(parent_id)
# 存储父块
parent_data = {
"id": parent_id,
"content": parent.page_content,
"metadata": parent.metadata
}
parent_file = Path(PARENT_STORE_PATH) / f"{parent_id}.json"
parent_file.write_text(json.dumps(parent_data, ensure_ascii=False))
# 创建子块
children = child_splitter.split_documents([parent])
for j, child in enumerate(children):
child.metadata["parent_id"] = parent_id
child.metadata["child_index"] = j
child_chunks.append(child)
return parent_ids, child_chunks3. Index Documents in Vector Database
3. 在向量数据库中索引文档
python
from qdrant_client.http import models as qmodels
from langchain_qdrant import QdrantVectorStore, RetrievalMode
def ensure_collection(collection_name):
"""Create Qdrant collection if it doesn't exist."""
embedding_dimension = len(dense_embeddings.embed_query("test"))
if not client.collection_exists(collection_name):
client.create_collection(
collection_name=collection_name,
vectors_config=qmodels.VectorParams(
size=embedding_dimension,
distance=qmodels.Distance.COSINE
),
sparse_vectors_config={
"sparse": qmodels.SparseVectorParams()
},
)
def index_documents(markdown_files):
"""Index all documents with hierarchical chunking."""
ensure_collection(CHILD_COLLECTION)
vector_store = QdrantVectorStore(
client=client,
collection_name=CHILD_COLLECTION,
embedding=dense_embeddings,
sparse_embedding=sparse_embeddings,
retrieval_mode=RetrievalMode.HYBRID,
)
all_child_chunks = []
for md_file in glob.glob(f"{MARKDOWN_DIR}/*.md"):
parent_ids, child_chunks = process_document_hierarchical(md_file)
all_child_chunks.extend(child_chunks)
print(f"Processed {Path(md_file).name}: {len(parent_ids)} parents, {len(child_chunks)} children")
# Batch index all child chunks
vector_store.add_documents(all_child_chunks)
return vector_storepython
from qdrant_client.http import models as qmodels
from langchain_qdrant import QdrantVectorStore, RetrievalMode
def ensure_collection(collection_name):
"""如果Qdrant集合不存在则创建。"""
embedding_dimension = len(dense_embeddings.embed_query("test"))
if not client.collection_exists(collection_name):
client.create_collection(
collection_name=collection_name,
vectors_config=qmodels.VectorParams(
size=embedding_dimension,
distance=qmodels.Distance.COSINE
),
sparse_vectors_config={
"sparse": qmodels.SparseVectorParams()
},
)
def index_documents(markdown_files):
"""通过分层分块索引所有文档。"""
ensure_collection(CHILD_COLLECTION)
vector_store = QdrantVectorStore(
client=client,
collection_name=CHILD_COLLECTION,
embedding=dense_embeddings,
sparse_embedding=sparse_embeddings,
retrieval_mode=RetrievalMode.HYBRID,
)
all_child_chunks = []
for md_file in glob.glob(f"{MARKDOWN_DIR}/*.md"):
parent_ids, child_chunks = process_document_hierarchical(md_file)
all_child_chunks.extend(child_chunks)
print(f"已处理 {Path(md_file).name}: {len(parent_ids)} 个父块,{len(child_chunks)} 个子块")
# 批量索引所有子块
vector_store.add_documents(all_child_chunks)
return vector_storeIndex all markdown documents
索引所有Markdown文档
vector_store = index_documents(f"{MARKDOWN_DIR}/*.md")
undefinedvector_store = index_documents(f"{MARKDOWN_DIR}/*.md")
undefinedBuilding the Agentic RAG System
构建Agentic RAG系统
Define Agent Tools
定义智能体工具
python
from langchain_core.tools import tool
@tool
def retrieve_documents(query: str) -> list[str]:
"""
Search the knowledge base using hybrid search (dense + sparse embeddings).
Returns relevant document chunks.
Args:
query: The search query
"""
results = vector_store.similarity_search(query, k=5)
return [doc.page_content for doc in results]
@tool
def get_parent_context(parent_id: str) -> str:
"""
Retrieve the full parent chunk for additional context.
Args:
parent_id: The parent chunk identifier
"""
parent_file = Path(PARENT_STORE_PATH) / f"{parent_id}.json"
if parent_file.exists():
data = json.loads(parent_file.read_text())
return data["content"]
return "Parent chunk not found."
tools = [retrieve_documents, get_parent_context]python
from langchain_core.tools import tool
@tool
def retrieve_documents(query: str) -> list[str]:
"""
使用混合搜索(稠密+稀疏嵌入)搜索知识库。
返回相关文档块。
参数:
query: 搜索查询语句
"""
results = vector_store.similarity_search(query, k=5)
return [doc.page_content for doc in results]
@tool
def get_parent_context(parent_id: str) -> str:
"""
检索完整的父块以获取额外上下文。
参数:
parent_id: 父块标识符
"""
parent_file = Path(PARENT_STORE_PATH) / f"{parent_id}.json"
if parent_file.exists():
data = json.loads(parent_file.read_text())
return data["content"]
return "未找到父块。"
tools = [retrieve_documents, get_parent_context]Define System Prompts
定义系统提示词
python
CONVERSATION_SUMMARIZER_PROMPT = """You are a conversation summarizer.
Extract key context from the conversation history that is relevant to the current query.
Focus on: entities mentioned, topics discussed, user intent.
Conversation History:
{history}
Current Query: {query}
Provide a concise summary of relevant context."""
QUERY_CLARIFICATION_PROMPT = """You are a query clarification assistant.
Analyze the query and conversation context.
If the query is:
- Ambiguous or contains pronouns without clear referents: Rewrite it clearly
- Multi-part (multiple questions): Split into focused sub-queries
- Clear and focused: Return it unchanged
Context: {context}
Query: {query}
Return a JSON object:
{{
"needs_clarification": boolean,
"clarification_question": string or null,
"rewritten_queries": [list of clear, focused queries]
}}"""
AGENT_PROMPT = """You are a RAG agent. Use the retrieve_documents tool to search for information.
If results are insufficient, try rephrasing your search query.
If you find relevant parent_id metadata, use get_parent_context for full context.
Available tools:
- retrieve_documents(query: str): Search the knowledge base
- get_parent_context(parent_id: str): Get full parent chunk
Question: {query}
Context: {context}
Provide a comprehensive answer based on retrieved documents."""python
CONVERSATION_SUMMARIZER_PROMPT = """你是一名对话总结助手。
从对话历史中提取与当前查询相关的关键上下文。
重点关注:提及的实体、讨论的主题、用户意图。
对话历史:
{history}
当前查询:
{query}
请提供相关上下文的简洁总结。"""
QUERY_CLARIFICATION_PROMPT = """你是一名查询澄清助手。
分析查询语句和对话上下文。
如果查询语句:
- 模糊或包含指代不明的代词:清晰重写查询
- 多部分内容(多个问题):拆分为聚焦的子查询
- 清晰且聚焦:保持原查询不变
上下文:
{context}
查询:
{query}
返回JSON对象:
{{
"needs_clarification": boolean,
"clarification_question": string or null,
"rewritten_queries": [清晰聚焦的查询列表]
}}"""
AGENT_PROMPT = """你是一个RAG智能体。使用retrieve_documents工具搜索信息。
如果结果不足,尝试重新表述搜索查询。
如果找到相关的parent_id元数据,使用get_parent_context获取完整上下文。
可用工具:
- retrieve_documents(query: str): 搜索知识库
- get_parent_context(parent_id: str): 获取完整父块
问题:
{query}
上下文:
{context}
基于检索到的文档提供全面的答案。"""Define State Models
定义状态模型
python
from typing import TypedDict, Annotated, Sequence
from langgraph.graph import MessagesState
from langchain_core.messages import BaseMessage
class AgentState(TypedDict):
"""State for individual RAG agents."""
messages: Annotated[Sequence[BaseMessage], "The messages in the conversation"]
query: str
context: str
retrieved_docs: list[str]
parent_contexts: list[str]
search_attempts: int
max_searches: int
answer: str
class OrchestratorState(TypedDict):
"""State for the main orchestration graph."""
user_query: str
conversation_history: list[dict]
conversation_summary: str
clarified_queries: list[str]
needs_human_input: bool
clarification_question: str
agent_results: list[dict]
final_answer: strpython
from typing import TypedDict, Annotated, Sequence
from langgraph.graph import MessagesState
from langchain_core.messages import BaseMessage
class AgentState(TypedDict):
"""单个RAG智能体的状态。"""
messages: Annotated[Sequence[BaseMessage], "对话中的消息"]
query: str
context: str
retrieved_docs: list[str]
parent_contexts: list[str]
search_attempts: int
max_searches: int
answer: str
class OrchestratorState(TypedDict):
"""主编排图的状态。"""
user_query: str
conversation_history: list[dict]
conversation_summary: str
clarified_queries: list[str]
needs_human_input: bool
clarification_question: str
agent_results: list[dict]
final_answer: strBuild LangGraph Agent
构建LangGraph智能体
python
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolNode
from langchain_core.messages import HumanMessage, AIMessage
def should_continue(state: AgentState) -> str:
"""Decide if agent should continue searching or finish."""
if state["answer"]:
return "end"
if state["search_attempts"] >= state["max_searches"]:
return "end"
return "continue"
def agent_node(state: AgentState) -> AgentState:
"""Main agent reasoning node."""
llm_with_tools = llm.bind_tools(tools)
messages = state["messages"]
if not messages:
messages = [HumanMessage(content=AGENT_PROMPT.format(
query=state["query"],
context=state.get("context", "")
))]
response = llm_with_tools.invoke(messages)
# Check if we have a final answer (no tool calls)
if not response.tool_calls:
return {
**state,
"answer": response.content,
"messages": messages + [response]
}
return {
**state,
"messages": messages + [response],
"search_attempts": state["search_attempts"] + 1
}
def build_agent_graph():
"""Build the RAG agent graph."""
workflow = StateGraph(AgentState)
workflow.add_node("agent", agent_node)
workflow.add_node("tools", ToolNode(tools))
workflow.set_entry_point("agent")
workflow.add_conditional_edges(
"agent",
should_continue,
{
"continue": "tools",
"end": END
}
)
workflow.add_edge("tools", "agent")
return workflow.compile()
agent_graph = build_agent_graph()python
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolNode
from langchain_core.messages import HumanMessage, AIMessage
def should_continue(state: AgentState) -> str:
"""决定智能体应继续搜索还是结束。"""
if state["answer"]:
return "end"
if state["search_attempts"] >= state["max_searches"]:
return "end"
return "continue"
def agent_node(state: AgentState) -> AgentState:
"""主智能体推理节点。"""
llm_with_tools = llm.bind_tools(tools)
messages = state["messages"]
if not messages:
messages = [HumanMessage(content=AGENT_PROMPT.format(
query=state["query"],
context=state.get("context", "")
))]
response = llm_with_tools.invoke(messages)
# 检查是否有最终答案(无工具调用)
if not response.tool_calls:
return {
**state,
"answer": response.content,
"messages": messages + [response]
}
return {
**state,
"messages": messages + [response],
"search_attempts": state["search_attempts"] + 1
}
def build_agent_graph():
"""构建RAG智能体图。"""
workflow = StateGraph(AgentState)
workflow.add_node("agent", agent_node)
workflow.add_node("tools", ToolNode(tools))
workflow.set_entry_point("agent")
workflow.add_conditional_edges(
"agent",
should_continue,
{
"continue": "tools",
"end": END
}
)
workflow.add_edge("tools", "agent")
return workflow.compile()
agent_graph = build_agent_graph()Multi-Agent Orchestration
多智能体编排
python
from langgraph.graph import StateGraph, END
import json
def summarize_conversation(state: OrchestratorState) -> OrchestratorState:
"""Summarize conversation history for context."""
history_text = "\n".join([
f"{msg['role']}: {msg['content']}"
for msg in state["conversation_history"][-5:] # Last 5 messages
])
summary_prompt = CONVERSATION_SUMMARIZER_PROMPT.format(
history=history_text,
query=state["user_query"]
)
summary = llm.invoke(summary_prompt).content
return {**state, "conversation_summary": summary}
def clarify_query(state: OrchestratorState) -> OrchestratorState:
"""Clarify and potentially decompose the query."""
clarification_prompt = QUERY_CLARIFICATION_PROMPT.format(
context=state.get("conversation_summary", ""),
query=state["user_query"]
)
response = llm.invoke(clarification_prompt).content
result = json.loads(response)
return {
**state,
"needs_human_input": result["needs_clarification"],
"clarification_question": result.get("clarification_question"),
"clarified_queries": result["rewritten_queries"]
}
def route_after_clarification(state: OrchestratorState) -> str:
"""Route based on whether human input is needed."""
if state["needs_human_input"]:
return "wait_for_human"
return "execute_agents"
def execute_parallel_agents(state: OrchestratorState) -> OrchestratorState:
"""Execute multiple agents in parallel for query decomposition."""
results = []
for query in state["clarified_queries"]:
agent_state = {
"messages": [],
"query": query,
"context": state.get("conversation_summary", ""),
"retrieved_docs": [],
"parent_contexts": [],
"search_attempts": 0,
"max_searches": 3,
"answer": ""
}
# Run agent graph
final_state = agent_graph.invoke(agent_state)
results.append({
"query": query,
"answer": final_state["answer"],
"docs": final_state["retrieved_docs"]
})
return {**state, "agent_results": results}
def aggregate_results(state: OrchestratorState) -> OrchestratorState:
"""Combine all agent results into final answer."""
combined = "\n\n".join([
f"Sub-query: {r['query']}\nAnswer: {r['answer']}"
for r in state["agent_results"]
])
aggregation_prompt = f"""Synthesize these sub-answers into a coherent response:
{combined}
Original question: {state['user_query']}
Provide a unified, well-structured answer."""
final_answer = llm.invoke(aggregation_prompt).content
return {**state, "final_answer": final_answer}
def build_orchestrator_graph():
"""Build the main orchestration graph."""
workflow = StateGraph(OrchestratorState)
workflow.add_node("summarize", summarize_conversation)
workflow.add_node("clarify", clarify_query)
workflow.add_node("execute", execute_parallel_agents)
workflow.add_node("aggregate", aggregate_results)
workflow.set_entry_point("summarize")
workflow.add_edge("summarize", "clarify")
workflow.add_conditional_edges(
"clarify",
route_after_clarification,
{
"wait_for_human": END, # Pause for human input
"execute_agents": "execute"
}
)
workflow.add_edge("execute", "aggregate")
workflow.add_edge("aggregate", END)
return workflow.compile()
orchestrator_graph = build_orchestrator_graph()python
from langgraph.graph import StateGraph, END
import json
def summarize_conversation(state: OrchestratorState) -> OrchestratorState:
"""总结对话历史以获取上下文。"""
history_text = "\n".join([
f"{msg['role']}: {msg['content']}"
for msg in state["conversation_history"][-5:] # 最近5条消息
])
summary_prompt = CONVERSATION_SUMMARIZER_PROMPT.format(
history=history_text,
query=state["user_query"]
)
summary = llm.invoke(summary_prompt).content
return {**state, "conversation_summary": summary}
def clarify_query(state: OrchestratorState) -> OrchestratorState:
"""澄清并可能分解查询。"""
clarification_prompt = QUERY_CLARIFICATION_PROMPT.format(
context=state.get("conversation_summary", ""),
query=state["user_query"]
)
response = llm.invoke(clarification_prompt).content
result = json.loads(response)
return {
**state,
"needs_human_input": result["needs_clarification"],
"clarification_question": result.get("clarification_question"),
"clarified_queries": result["rewritten_queries"]
}
def route_after_clarification(state: OrchestratorState) -> str:
"""根据是否需要人工输入进行路由。"""
if state["needs_human_input"]:
return "wait_for_human"
return "execute_agents"
def execute_parallel_agents(state: OrchestratorState) -> OrchestratorState:
"""并行执行多个智能体以处理查询分解。"""
results = []
for query in state["clarified_queries"]:
agent_state = {
"messages": [],
"query": query,
"context": state.get("conversation_summary", ""),
"retrieved_docs": [],
"parent_contexts": [],
"search_attempts": 0,
"max_searches": 3,
"answer": ""
}
# 运行智能体图
final_state = agent_graph.invoke(agent_state)
results.append({
"query": query,
"answer": final_state["answer"],
"docs": final_state["retrieved_docs"]
})
return {**state, "agent_results": results}
def aggregate_results(state: OrchestratorState) -> OrchestratorState:
"""将所有智能体结果合并为最终答案。"""
combined = "\n\n".join([
f"子查询: {r['query']}\n答案: {r['answer']}"
for r in state["agent_results"]
])
aggregation_prompt = f"""将这些子答案合成为连贯的响应:
{combined}
原始问题: {state['user_query']}
提供统一、结构清晰的答案。"""
final_answer = llm.invoke(aggregation_prompt).content
return {**state, "final_answer": final_answer}
def build_orchestrator_graph():
"""构建主编排图。"""
workflow = StateGraph(OrchestratorState)
workflow.add_node("summarize", summarize_conversation)
workflow.add_node("clarify", clarify_query)
workflow.add_node("execute", execute_parallel_agents)
workflow.add_node("aggregate", aggregate_results)
workflow.set_entry_point("summarize")
workflow.add_edge("summarize", "clarify")
workflow.add_conditional_edges(
"clarify",
route_after_clarification,
{
"wait_for_human": END, # 暂停等待人工输入
"execute_agents": "execute"
}
)
workflow.add_edge("execute", "aggregate")
workflow.add_edge("aggregate", END)
return workflow.compile()
orchestrator_graph = build_orchestrator_graph()Usage Patterns
使用模式
Basic Query Execution
基础查询执行
python
def query_rag_system(user_query: str, conversation_history: list = None):
"""Execute a query through the full agentic RAG system."""
initial_state = {
"user_query": user_query,
"conversation_history": conversation_history or [],
"conversation_summary": "",
"clarified_queries": [],
"needs_human_input": False,
"clarification_question": "",
"agent_results": [],
"final_answer": ""
}
result = orchestrator_graph.invoke(initial_state)
if result["needs_human_input"]:
return {
"needs_clarification": True,
"question": result["clarification_question"]
}
return {
"needs_clarification": False,
"answer": result["final_answer"],
"sub_queries": result["clarified_queries"],
"sources": [r["docs"] for r in result["agent_results"]]
}python
def query_rag_system(user_query: str, conversation_history: list = None):
"""通过完整的Agentic RAG系统执行查询。"""
initial_state = {
"user_query": user_query,
"conversation_history": conversation_history or [],
"conversation_summary": "",
"clarified_queries": [],
"needs_human_input": False,
"clarification_question": "",
"agent_results": [],
"final_answer": ""
}
result = orchestrator_graph.invoke(initial_state)
if result["needs_human_input"]:
return {
"needs_clarification": True,
"question": result["clarification_question"]
}
return {
"needs_clarification": False,
"answer": result["final_answer"],
"sub_queries": result["clarified_queries"],
"sources": [r["docs"] for r in result["agent_results"]]
}Example usage
使用示例
response = query_rag_system(
"What is the difference between JavaScript and Python?",
conversation_history=[
{"role": "user", "content": "Tell me about programming languages"},
{"role": "assistant", "content": "Programming languages are..."}
]
)
if response["needs_clarification"]:
print(f"Clarification needed: {response['question']}")
else:
print(f"Answer: {response['answer']}")
print(f"Decomposed into: {response['sub_queries']}")
undefinedresponse = query_rag_system(
"JavaScript和Python的区别是什么?",
conversation_history=[
{"role": "user", "content": "给我讲讲编程语言"},
{"role": "assistant", "content": "编程语言是..."}
]
)
if response["needs_clarification"]:
print(f"需要澄清: {response['question']}")
else:
print(f"答案: {response['answer']}")
print(f"分解为: {response['sub_queries']}")
undefinedInteractive Chat Loop
交互式聊天循环
python
def chat_loop():
"""Interactive chat session with conversation memory."""
conversation_history = []
print("Agentic RAG Chat (type 'quit' to exit)")
while True:
user_input = input("\nYou: ").strip()
if user_input.lower() == 'quit':
break
response = query_rag_system(user_input, conversation_history)
if response["needs_clarification"]:
print(f"\nBot: {response['question']}")
clarification = input("You: ").strip()
# Re-run with clarified input
response = query_rag_system(clarification, conversation_history)
print(f"\nBot: {response['answer']}")
# Update history
conversation_history.append({"role": "user", "content": user_input})
conversation_history.append({"role": "assistant", "content": response['answer']})python
def chat_loop():
"""带对话记忆的交互式聊天会话。"""
conversation_history = []
print("Agentic RAG聊天(输入'quit'退出)")
while True:
user_input = input("\n你: ").strip()
if user_input.lower() == 'quit':
break
response = query_rag_system(user_input, conversation_history)
if response["needs_clarification"]:
print(f"\n机器人: {response['question']}")
clarification = input("你: ").strip()
# 使用澄清后的输入重新运行
response = query_rag_system(clarification, conversation_history)
print(f"\n机器人: {response['answer']}")
# 更新对话历史
conversation_history.append({"role": "user", "content": user_input})
conversation_history.append({"role": "assistant", "content": response['answer']})Run interactive chat
运行交互式聊天
chat_loop()
undefinedchat_loop()
undefinedProgrammatic Multi-Query
程序化多查询
python
queries = [
"What is machine learning?",
"How does neural network training work?",
"What are common ML frameworks?"
]
results = []
history = []
for q in queries:
result = query_rag_system(q, history)
results.append(result)
history.append({"role": "user", "content": q})
history.append({"role": "assistant", "content": result["answer"]})python
queries = [
"什么是机器学习?",
"神经网络训练的工作原理是什么?",
"常见的ML框架有哪些?"
]
results = []
history = []
for q in queries:
result = query_rag_system(q, history)
results.append(result)
history.append({"role": "user", "content": q})
history.append({"role": "assistant", "content": result["answer"]})Results now contain context-aware answers
结果现在包含上下文感知的答案
for i, r in enumerate(results):
print(f"\nQ{i+1}: {queries[i]}")
print(f"A: {r['answer']}\n")
undefinedfor i, r in enumerate(results):
print(f"\n问题{i+1}: {queries[i]}")
print(f"答案: {r['answer']}\n")
undefinedRunning the Gradio UI
运行Gradio UI
python
undefinedpython
undefinedUse the provided Gradio interface
使用提供的Gradio界面
python src/chat_app.py
python src/chat_app.py
Or programmatically
或程序化调用
from src.chat_app import create_chat_interface
demo = create_chat_interface(
orchestrator_graph=orchestrator_graph,
parent_store_path=PARENT_STORE_PATH
)
demo.launch(share=True)
undefinedfrom src.chat_app import create_chat_interface
demo = create_chat_interface(
orchestrator_graph=orchestrator_graph,
parent_store_path=PARENT_STORE_PATH
)
demo.launch(share=True)
undefinedConfiguration Options
配置选项
Tuning Retrieval Parameters
调整检索参数
python
undefinedpython
undefinedAdjust number of retrieved chunks
调整检索块的数量
vector_store.similarity_search(query, k=10) # Retrieve top 10
vector_store.similarity_search(query, k=10) # 检索前10个结果
Adjust child chunk size
调整子块大小
child_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000, # Larger chunks = more context
chunk_overlap=200 # More overlap = better boundary handling
)
child_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000, # 更大的块=更多上下文
chunk_overlap=200 # 更多重叠=更好的边界处理
)
Adjust max search attempts per agent
调整每个智能体的最大搜索尝试次数
agent_state = {
"max_searches": 5, # Allow more self-correction loops
...
}
undefinedagent_state = {
"max_searches": 5, # 允许更多自我修正循环
...
}
undefinedAdjusting Agent Behavior
调整智能体行为
python
undefinedpython
undefinedMore temperature for creative answers
提高temperature以获得更具创意的答案
llm = ChatOllama(model="qwen3:4b-instruct-2507-q4_K_M", temperature=0.3)
llm = ChatOllama(model="qwen3:4b-instruct-2507-q4_K_M", temperature=0.3)
More aggressive query decomposition
更激进的查询分解
Modify QUERY_CLARIFICATION_PROMPT to split more aggressively
修改QUERY_CLARIFICATION_PROMPT以更激进地拆分查询
Longer conversation memory
更长的对话记忆
history_text = "\n".join([
f"{msg['role']}: {msg['content']}"
for msg in state["conversation_history"][-10:] # Last 10 instead of 5
])
undefinedhistory_text = "\n".join([
f"{msg['role']}: {msg['content']}"
for msg in state["conversation_history"][-10:] # 最近10条消息而非5条
])
undefinedTroubleshooting
故障排除
Small Models Ignore Tools
小型模型忽略工具调用
Problem: Ollama models <7B parameters ignore tool calls or hallucinate answers.
Solution:
bash
undefined问题:Ollama中小于7B参数的模型忽略工具调用或生成幻觉答案。
解决方案:
bash
undefinedUse larger models for reliable tool calling
使用更大的模型以保证可靠的工具调用
ollama pull llama3.1:8b-instruct-q4_K_M
ollama pull mistral:7b-instruct-q4_K_M
ollama pull llama3.1:8b-instruct-q4_K_M
ollama pull mistral:7b-instruct-q4_K_M
Or switch to cloud providers
或切换到云提供商
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
undefinedllm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
undefinedQdrant Collection Errors
Qdrant集合错误
Problem: or dimension mismatch errors.
Collection already existsSolution:
python
undefined问题:出现或维度不匹配错误。
Collection already exists解决方案:
python
undefinedDelete and recreate collection
删除并重新创建集合
client.delete_collection(CHILD_COLLECTION)
ensure_collection(CHILD_COLLECTION)
client.delete_collection(CHILD_COLLECTION)
ensure_collection(CHILD_COLLECTION)
Or use a new collection name
或使用新的集合名称
CHILD_COLLECTION = "document_child_chunks_v2"
undefinedCHILD_COLLECTION = "document_child_chunks_v2"
undefinedParent Chunks Not Found
父块未找到
Problem: returns "Parent chunk not found".
get_parent_contextSolution:
python
undefined问题:返回"未找到父块"。
get_parent_context解决方案:
python
undefinedCheck parent store exists
检查父块存储目录是否存在
print(list(Path(PARENT_STORE_PATH).glob("*.json")))
print(list(Path(PARENT_STORE_PATH).glob("*.json")))
Verify metadata in child chunks
验证子块中的元数据
results = vector_store.similarity_search("test", k=1)
print(results[0].metadata) # Should have "parent_id" key
undefinedresults = vector_store.similarity_search("test", k=1)
print(results[0].metadata) # 应包含"parent_id"键
undefinedMemory Issues with Large Documents
处理大型文档时内存不足
Problem: Out of memory when processing many large PDFs.
Solution:
python
undefined问题:处理大量大型PDF时出现内存不足。
解决方案:
python
undefinedProcess documents in batches
批量处理文档
def index_documents_batched(markdown_files, batch_size=10):
for i in range(0, len(markdown_files), batch_size):
batch = markdown_files[i:i+batch_size]
all_child_chunks = []
for md_file in batch:
parent_ids, child_chunks = process_document_hierarchical(md_file)
all_child_chunks.extend(child_chunks)
vector_store.add_documents(all_child_chunks)
print(f"Indexed batch {i//batch_size + 1}")
undefineddef index_documents_batched(markdown_files, batch_size=10):
for i in range(0, len(markdown_files), batch_size):
batch = markdown_files[i:i+batch_size]
all_child_chunks = []
for md_file in batch:
parent_ids, child_chunks = process_document_hierarchical(md_file)
all_child_chunks.extend(child_chunks)
vector_store.add_documents(all_child_chunks)
print(f"已索引第 {i//batch_size + 1} 批文档")
undefinedAgent Loops Indefinitely
智能体无限循环
Problem: Agent keeps calling tools without producing an answer.
Solution:
python
undefined问题:智能体持续调用工具而不生成答案。
解决方案:
python
undefinedEnforce stricter max_searches
设置更严格的max_searches限制
agent_state["max_searches"] = 2
agent_state["max_searches"] = 2
Add explicit termination in agent_node
在agent_node中添加显式终止逻辑
def agent_node(state: AgentState) -> AgentState:
if state["search_attempts"] >= state["max_searches"]:
return {
**state,
"answer": "Unable to find sufficient information after multiple attempts."
}
# ... rest of logic
undefineddef agent_node(state: AgentState) -> AgentState:
if state["search_attempts"] >= state["max_searches"]:
return {
**state,
"answer": "多次尝试后仍无法找到足够信息。"
}
# ... 其余逻辑
undefinedQuery Clarification Too Aggressive
查询澄清过于激进
Problem: System asks for clarification on clear queries.
Solution:
python
undefined问题:系统对清晰的查询也要求澄清。
解决方案:
python
undefinedAdjust QUERY_CLARIFICATION_PROMPT
调整QUERY_CLARIFICATION_PROMPT
QUERY_CLARIFICATION_PROMPT = """...
Only set needs_clarification=true if the query is genuinely ambiguous
(contains unresolved pronouns, missing critical context, or is nonsensical).
..."""
QUERY_CLARIFICATION_PROMPT = """...
仅当查询确实模糊(包含未解析的代词、缺失关键上下文或无意义)时,才将needs_clarification设置为true。
..."""
Or skip clarification node for simple queries
或对简单查询跳过澄清节点
def route_after_clarification(state: OrchestratorState) -> str:
if len(state["user_query"].split()) < 5: # Short queries skip
return "execute_agents"
if state["needs_human_input"]:
return "wait_for_human"
return "execute_agents"
undefineddef route_after_clarification(state: OrchestratorState) -> str:
if len(state["user_query"].split()) < 5: # 短查询直接跳过
return "execute_agents"
if state["needs_human_input"]:
return "wait_for_human"
return "execute_agents"
undefinedAdvanced Patterns
高级模式
Add Observability with Langfuse
使用Langfuse添加可观测性
python
undefinedpython
undefinedSet up Langfuse tracing
设置Langfuse追踪
import os
os.environ["LANGFUSE_PUBLIC_KEY"] = "your-public-key"
os.environ["LANGFUSE_SECRET_KEY"] = "your-secret-key"
os.environ["LANGFUSE_HOST"] = "https://cloud.langfuse.com"
from langfuse.callback import CallbackHandler
langfuse_handler = CallbackHandler()
import os
os.environ["LANGFUSE_PUBLIC_KEY"] = "your-public-key"
os.environ["LANGFUSE_SECRET_KEY"] = "your-secret-key"
os.environ["LANGFUSE_HOST"] = "https://cloud.langfuse.com"
from langfuse.callback import CallbackHandler
langfuse_handler = CallbackHandler()
Add to LLM calls
添加到LLM调用
llm = ChatOllama(
model="qwen3:4b-instruct-2507-q4_K_M",
temperature=0,
callbacks=[langfuse_handler]
)
llm = ChatOllama(
model="qwen3:4b-instruct-2507-q4_K_M",
temperature=0,
callbacks=[langfuse_handler]
)
Trace graph execution
追踪图执行
result = orchestrator_graph.invoke(
initial_state,
config={"callbacks": [langfuse_handler]}
)
undefinedresult = orchestrator_graph.invoke(
initial_state,
config={"callbacks": [langfuse_handler]}
)
undefinedCustom Embedding Models
自定义嵌入模型
python
undefinedpython
undefinedUse different embedding for domain-specific docs
为特定领域文档使用不同的嵌入模型
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_huggingface import HuggingFaceEmbeddings
Legal documents
法律文档
legal_embeddings = HuggingFaceEmbeddings(
model_name="nlpaueb/legal-bert-base-uncased"
)
legal_embeddings = HuggingFaceEmbeddings(
model_name="nlpaueb/legal-bert-base-uncased"
)
Medical documents
医疗文档
medical_embeddings = HuggingFaceEmbeddings(
model_name="microsoft/BiomedNLP-PubMedBERT-base-uncased-abstract"
)
undefinedmedical_embeddings = HuggingFaceEmbeddings(
model_name="microsoft/BiomedNLP-PubMedBERT-base-uncased-abstract"
)
undefinedMulti-Collection RAG
多集合RAG
python
undefinedpython
undefinedSearch multiple collections (e.g., different document types)
搜索多个集合(例如不同类型的文档)
def multi_collection_retrieve(query: str) -> list[str]:
results = []
for collection in ["technical_docs", "user_guides", "api_reference"]:
store = QdrantVectorStore(
client=client,
collection_name=collection,
embedding=dense_embeddings
)
results.extend(store.similarity_search(query, k=2))
return [doc.page_content for doc in results]
undefineddef multi_collection_retrieve(query: str) -> list[str]:
results = []
for collection in ["technical_docs", "user_guides", "api_reference"]:
store = QdrantVectorStore(
client=client,
collection_name=collection,
embedding=dense_embeddings
)
results.extend(store.similarity_search(query, k=2))
return [doc.page_content for doc in results]
undefinedResources
资源
- GitHub Repository: https://github.com/GiovanniPasq/agentic-rag-for-dummies
- Interactive Notebook: (or open in Colab)
notebooks/agentic_rag.ipynb - PDF Conversion Guide:
notebooks/pdf_to_markdown.ipynb - Chunk Inspection Tool: Chunky
- LangGraph Documentation: https://langchain-ai.github.io/langgraph/
- Qdrant Documentation: https://qdrant.tech/documentation/
- GitHub仓库: https://github.com/GiovanniPasq/agentic-rag-for-dummies
- 交互式笔记本: (或在Colab中打开:https://colab.research.google.com/github/GiovanniPasq/agentic-rag-for-dummies/blob/main/notebooks/agentic_rag.ipynb)
notebooks/agentic_rag.ipynb - PDF转换指南:
notebooks/pdf_to_markdown.ipynb - 分块检查工具: Chunky
- LangGraph文档: https://langchain-ai.github.io/langgraph/
- Qdrant文档: https://qdrant.tech/documentation/
Key Takeaways
关键要点
- Always use 7B+ models for reliable tool calling and instruction following
- Hierarchical indexing (parent/child chunks) balances precision and context
- Query clarification prevents misunderstandings early in the pipeline
- Multi-agent decomposition handles complex queries by parallelizing sub-problems
- Self-correction loops improve answer quality through iterative refinement
- Provider-agnostic design allows seamless switching between local and cloud LLMs
This framework is production-ready and designed for extension — swap components, add new tools, or integrate custom agents as needed.
- 始终使用7B+参数模型以保证可靠的工具调用和指令遵循能力
- 分层索引(父块/子块)平衡了精度和上下文完整性
- 查询澄清可在流程早期避免误解
- 多智能体分解通过并行处理子问题来应对复杂查询
- 自我修正循环通过迭代优化提高答案质量
- 提供商无关设计允许在本地和云LLM之间无缝切换
本框架可直接用于生产环境,且具备可扩展性 — 可按需替换组件、添加新工具或集成自定义智能体。