agentic-rag-for-dummies

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Agentic RAG for Dummies

Agentic RAG入门指南

Skill by ara.so — AI Agent Skills collection.
This skill enables you to build modular Agentic RAG (Retrieval-Augmented Generation) systems using LangGraph. The framework provides hierarchical document indexing, conversation memory, query clarification with human-in-the-loop, multi-agent map-reduce for complex queries, self-correction, and context compression.
ara.so提供的技能 — AI Agent技能合集。
本技能可帮助你使用LangGraph构建模块化Agentic RAG(检索增强生成)系统。该框架提供分层文档索引、对话记忆、人机协作的查询澄清、复杂查询的多智能体Map-Reduce处理、自我修正以及上下文压缩功能。

What This Project Does

项目功能介绍

Agentic RAG for Dummies is a production-ready framework for building intelligent document retrieval systems that go beyond basic RAG:
  • Hierarchical Indexing: Search small child chunks for precision, retrieve large parent chunks for context
  • Conversation Memory: Maintains dialogue context across multiple questions
  • Query Clarification: Rewrites ambiguous queries or pauses for human clarification
  • Multi-Agent Orchestration: Decomposes complex queries into parallel sub-agents using LangGraph
  • Self-Correction: Automatically re-queries when initial results are insufficient
  • Context Compression: Prevents redundant retrievals across long conversations
  • Provider Agnostic: Works with Ollama, OpenAI, Anthropic, Google, or any LangChain-supported LLM
Agentic RAG入门指南是一套可用于生产环境的框架,用于构建超越基础RAG能力的智能文档检索系统:
  • 分层索引:搜索小型子块以保证精度,检索大型父块以获取完整上下文
  • 对话记忆:在多轮问题中保持对话上下文
  • 查询澄清:重写模糊查询或暂停以等待人工澄清
  • 多智能体编排:使用LangGraph将复杂查询分解为并行子智能体处理
  • 自我修正:当初始结果不足时自动重新查询
  • 上下文压缩:避免长对话中的重复检索
  • 兼容多提供商:支持Ollama、OpenAI、Anthropic、Google或任何LangChain兼容的LLM

Installation

安装步骤

Clone and Set Up Environment

克隆项目并配置环境

bash
git clone https://github.com/GiovanniPasq/agentic-rag-for-dummies.git
cd agentic-rag-for-dummies
python -m venv venv
source venv/bin/activate  # On Windows: venv\Scripts\activate
pip install -r requirements.txt
bash
git clone https://github.com/GiovanniPasq/agentic-rag-for-dummies.git
cd agentic-rag-for-dummies
python -m venv venv
source venv/bin/activate  # Windows系统请执行:venv\Scripts\activate
pip install -r requirements.txt

Install Ollama (for Local LLMs)

安装Ollama(用于本地LLM)

bash
undefined
bash
undefined

Pull a recommended model (7B+ for reliable tool calling)

拉取推荐模型(7B+参数模型可保证可靠的工具调用能力)

ollama pull qwen3:4b-instruct-2507-q4_K_M
ollama pull qwen3:4b-instruct-2507-q4_K_M

Or for better performance:

如需更好性能:

ollama pull llama3.1:8b-instruct-q4_K_M
undefined
ollama pull llama3.1:8b-instruct-q4_K_M
undefined

For Cloud Providers

云提供商配置

bash
undefined
bash
undefined

OpenAI

OpenAI

pip install langchain-openai export OPENAI_API_KEY="your-key-here"
pip install langchain-openai export OPENAI_API_KEY="your-key-here"

Anthropic

Anthropic

pip install langchain-anthropic export ANTHROPIC_API_KEY="your-key-here"
pip install langchain-anthropic export ANTHROPIC_API_KEY="your-key-here"

Google

Google

pip install langchain-google-genai export GOOGLE_API_KEY="your-key-here"
undefined
pip install langchain-google-genai export GOOGLE_API_KEY="your-key-here"
undefined

Core Configuration

核心配置

Initialize Components

初始化组件

python
import os
from pathlib import Path
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_qdrant.fastembed_sparse import FastEmbedSparse
from qdrant_client import QdrantClient
from langchain_ollama import ChatOllama
python
import os
from pathlib import Path
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_qdrant.fastembed_sparse import FastEmbedSparse
from qdrant_client import QdrantClient
from langchain_ollama import ChatOllama

Directory structure

目录结构

DOCS_DIR = "docs" # Your PDF files MARKDOWN_DIR = "markdown_docs" # Converted markdown PARENT_STORE_PATH = "parent_store" # Parent chunk storage CHILD_COLLECTION = "document_child_chunks" # Vector DB collection
os.makedirs(DOCS_DIR, exist_ok=True) os.makedirs(MARKDOWN_DIR, exist_ok=True) os.makedirs(PARENT_STORE_PATH, exist_ok=True)
DOCS_DIR = "docs" # 存放PDF文件 MARKDOWN_DIR = "markdown_docs" # 存放转换后的Markdown文件 PARENT_STORE_PATH = "parent_store" # 父块存储目录 CHILD_COLLECTION = "document_child_chunks" # 向量数据库集合
os.makedirs(DOCS_DIR, exist_ok=True) os.makedirs(MARKDOWN_DIR, exist_ok=True) os.makedirs(PARENT_STORE_PATH, exist_ok=True)

Initialize LLM (swap provider easily)

初始化LLM(可轻松切换提供商)

llm = ChatOllama(model="qwen3:4b-instruct-2507-q4_K_M", temperature=0)
llm = ChatOllama(model="qwen3:4b-instruct-2507-q4_K_M", temperature=0)

Embeddings for hybrid search

用于混合搜索的嵌入模型

dense_embeddings = HuggingFaceEmbeddings( model_name="sentence-transformers/all-mpnet-base-v2" ) sparse_embeddings = FastEmbedSparse(model_name="Qdrant/bm25")
dense_embeddings = HuggingFaceEmbeddings( model_name="sentence-transformers/all-mpnet-base-v2" ) sparse_embeddings = FastEmbedSparse(model_name="Qdrant/bm25")

Vector database

向量数据库

client = QdrantClient(path="qdrant_db")
undefined
client = QdrantClient(path="qdrant_db")
undefined

Switch LLM Providers

切换LLM提供商

python
undefined
python
undefined

OpenAI

OpenAI

from langchain_openai import ChatOpenAI llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
from langchain_openai import ChatOpenAI llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

Anthropic

Anthropic

from langchain_anthropic import ChatAnthropic llm = ChatAnthropic(model="claude-sonnet-4-5-20250929", temperature=0)
from langchain_anthropic import ChatAnthropic llm = ChatAnthropic(model="claude-sonnet-4-5-20250929", temperature=0)

Google

Google

from langchain_google_genai import ChatGoogleGenerativeAI llm = ChatGoogleGenerativeAI(model="gemini-2.5-flash", temperature=0)
undefined
from langchain_google_genai import ChatGoogleGenerativeAI llm = ChatGoogleGenerativeAI(model="gemini-2.5-flash", temperature=0)
undefined

Document Processing Pipeline

文档处理流程

1. Convert PDFs to Markdown

1. 将PDF转换为Markdown

python
import pymupdf
import pymupdf4llm
import glob

def pdf_to_markdown(pdf_path, output_dir):
    """Convert a single PDF to Markdown."""
    doc = pymupdf.open(pdf_path)
    md = pymupdf4llm.to_markdown(
        doc, 
        header=False, 
        footer=False, 
        page_separators=True,
        ignore_images=True
    )
    md_cleaned = md.encode('utf-8', errors='surrogatepass').decode('utf-8', errors='ignore')
    output_path = Path(output_dir) / Path(doc.name).stem
    Path(output_path).with_suffix(".md").write_bytes(md_cleaned.encode('utf-8'))

def pdfs_to_markdowns(path_pattern, overwrite=False):
    """Convert all PDFs matching pattern."""
    output_dir = Path(MARKDOWN_DIR)
    for pdf_path in map(Path, glob.glob(path_pattern)):
        md_path = (output_dir / pdf_path.stem).with_suffix(".md")
        if overwrite or not md_path.exists():
            pdf_to_markdown(pdf_path, output_dir)
python
import pymupdf
import pymupdf4llm
import glob

def pdf_to_markdown(pdf_path, output_dir):
    """将单个PDF文件转换为Markdown格式。"""
    doc = pymupdf.open(pdf_path)
    md = pymupdf4llm.to_markdown(
        doc, 
        header=False, 
        footer=False, 
        page_separators=True,
        ignore_images=True
    )
    md_cleaned = md.encode('utf-8', errors='surrogatepass').decode('utf-8', errors='ignore')
    output_path = Path(output_dir) / Path(doc.name).stem
    Path(output_path).with_suffix(".md").write_bytes(md_cleaned.encode('utf-8'))

def pdfs_to_markdowns(path_pattern, overwrite=False):
    """将所有匹配指定模式的PDF文件转换为Markdown格式。"""
    output_dir = Path(MARKDOWN_DIR)
    for pdf_path in map(Path, glob.glob(path_pattern)):
        md_path = (output_dir / pdf_path.stem).with_suffix(".md")
        if overwrite or not md_path.exists():
            pdf_to_markdown(pdf_path, output_dir)

Convert all PDFs in docs directory

转换docs目录下的所有PDF文件

pdfs_to_markdowns(f"{DOCS_DIR}/*.pdf")
undefined
pdfs_to_markdowns(f"{DOCS_DIR}/*.pdf")
undefined

2. Hierarchical Chunking (Parent/Child)

2. 分层分块(父块/子块)

python
from langchain_text_splitters import MarkdownHeaderTextSplitter, RecursiveCharacterTextSplitter
import json

def process_document_hierarchical(markdown_path):
    """Split document into parent and child chunks."""
    content = Path(markdown_path).read_text(encoding='utf-8')
    
    # Parent chunks: split by headers
    header_splitter = MarkdownHeaderTextSplitter(
        headers_to_split_on=[
            ("#", "Header 1"),
            ("##", "Header 2"),
            ("###", "Header 3"),
        ],
        strip_headers=False
    )
    parent_chunks = header_splitter.split_text(content)
    
    # Child chunks: fixed-size from each parent
    child_splitter = RecursiveCharacterTextSplitter(
        chunk_size=500,
        chunk_overlap=100
    )
    
    parent_ids = []
    child_chunks = []
    
    for i, parent in enumerate(parent_chunks):
        parent_id = f"{Path(markdown_path).stem}_parent_{i}"
        parent_ids.append(parent_id)
        
        # Store parent chunk
        parent_data = {
            "id": parent_id,
            "content": parent.page_content,
            "metadata": parent.metadata
        }
        parent_file = Path(PARENT_STORE_PATH) / f"{parent_id}.json"
        parent_file.write_text(json.dumps(parent_data, ensure_ascii=False))
        
        # Create child chunks
        children = child_splitter.split_documents([parent])
        for j, child in enumerate(children):
            child.metadata["parent_id"] = parent_id
            child.metadata["child_index"] = j
            child_chunks.append(child)
    
    return parent_ids, child_chunks
python
from langchain_text_splitters import MarkdownHeaderTextSplitter, RecursiveCharacterTextSplitter
import json

def process_document_hierarchical(markdown_path):
    """将文档拆分为父块和子块。"""
    content = Path(markdown_path).read_text(encoding='utf-8')
    
    # 父块:按标题拆分
    header_splitter = MarkdownHeaderTextSplitter(
        headers_to_split_on=[
            ("#", "Header 1"),
            ("##", "Header 2"),
            ("###", "Header 3"),
        ],
        strip_headers=False
    )
    parent_chunks = header_splitter.split_text(content)
    
    # 子块:从每个父块中拆分固定大小的块
    child_splitter = RecursiveCharacterTextSplitter(
        chunk_size=500,
        chunk_overlap=100
    )
    
    parent_ids = []
    child_chunks = []
    
    for i, parent in enumerate(parent_chunks):
        parent_id = f"{Path(markdown_path).stem}_parent_{i}"
        parent_ids.append(parent_id)
        
        # 存储父块
        parent_data = {
            "id": parent_id,
            "content": parent.page_content,
            "metadata": parent.metadata
        }
        parent_file = Path(PARENT_STORE_PATH) / f"{parent_id}.json"
        parent_file.write_text(json.dumps(parent_data, ensure_ascii=False))
        
        # 创建子块
        children = child_splitter.split_documents([parent])
        for j, child in enumerate(children):
            child.metadata["parent_id"] = parent_id
            child.metadata["child_index"] = j
            child_chunks.append(child)
    
    return parent_ids, child_chunks

3. Index Documents in Vector Database

3. 在向量数据库中索引文档

python
from qdrant_client.http import models as qmodels
from langchain_qdrant import QdrantVectorStore, RetrievalMode

def ensure_collection(collection_name):
    """Create Qdrant collection if it doesn't exist."""
    embedding_dimension = len(dense_embeddings.embed_query("test"))
    
    if not client.collection_exists(collection_name):
        client.create_collection(
            collection_name=collection_name,
            vectors_config=qmodels.VectorParams(
                size=embedding_dimension,
                distance=qmodels.Distance.COSINE
            ),
            sparse_vectors_config={
                "sparse": qmodels.SparseVectorParams()
            },
        )

def index_documents(markdown_files):
    """Index all documents with hierarchical chunking."""
    ensure_collection(CHILD_COLLECTION)
    
    vector_store = QdrantVectorStore(
        client=client,
        collection_name=CHILD_COLLECTION,
        embedding=dense_embeddings,
        sparse_embedding=sparse_embeddings,
        retrieval_mode=RetrievalMode.HYBRID,
    )
    
    all_child_chunks = []
    for md_file in glob.glob(f"{MARKDOWN_DIR}/*.md"):
        parent_ids, child_chunks = process_document_hierarchical(md_file)
        all_child_chunks.extend(child_chunks)
        print(f"Processed {Path(md_file).name}: {len(parent_ids)} parents, {len(child_chunks)} children")
    
    # Batch index all child chunks
    vector_store.add_documents(all_child_chunks)
    return vector_store
python
from qdrant_client.http import models as qmodels
from langchain_qdrant import QdrantVectorStore, RetrievalMode

def ensure_collection(collection_name):
    """如果Qdrant集合不存在则创建。"""
    embedding_dimension = len(dense_embeddings.embed_query("test"))
    
    if not client.collection_exists(collection_name):
        client.create_collection(
            collection_name=collection_name,
            vectors_config=qmodels.VectorParams(
                size=embedding_dimension,
                distance=qmodels.Distance.COSINE
            ),
            sparse_vectors_config={
                "sparse": qmodels.SparseVectorParams()
            },
        )

def index_documents(markdown_files):
    """通过分层分块索引所有文档。"""
    ensure_collection(CHILD_COLLECTION)
    
    vector_store = QdrantVectorStore(
        client=client,
        collection_name=CHILD_COLLECTION,
        embedding=dense_embeddings,
        sparse_embedding=sparse_embeddings,
        retrieval_mode=RetrievalMode.HYBRID,
    )
    
    all_child_chunks = []
    for md_file in glob.glob(f"{MARKDOWN_DIR}/*.md"):
        parent_ids, child_chunks = process_document_hierarchical(md_file)
        all_child_chunks.extend(child_chunks)
        print(f"已处理 {Path(md_file).name}: {len(parent_ids)} 个父块,{len(child_chunks)} 个子块")
    
    # 批量索引所有子块
    vector_store.add_documents(all_child_chunks)
    return vector_store

Index all markdown documents

索引所有Markdown文档

vector_store = index_documents(f"{MARKDOWN_DIR}/*.md")
undefined
vector_store = index_documents(f"{MARKDOWN_DIR}/*.md")
undefined

Building the Agentic RAG System

构建Agentic RAG系统

Define Agent Tools

定义智能体工具

python
from langchain_core.tools import tool

@tool
def retrieve_documents(query: str) -> list[str]:
    """
    Search the knowledge base using hybrid search (dense + sparse embeddings).
    Returns relevant document chunks.
    
    Args:
        query: The search query
    """
    results = vector_store.similarity_search(query, k=5)
    return [doc.page_content for doc in results]

@tool
def get_parent_context(parent_id: str) -> str:
    """
    Retrieve the full parent chunk for additional context.
    
    Args:
        parent_id: The parent chunk identifier
    """
    parent_file = Path(PARENT_STORE_PATH) / f"{parent_id}.json"
    if parent_file.exists():
        data = json.loads(parent_file.read_text())
        return data["content"]
    return "Parent chunk not found."

tools = [retrieve_documents, get_parent_context]
python
from langchain_core.tools import tool

@tool
def retrieve_documents(query: str) -> list[str]:
    """
    使用混合搜索(稠密+稀疏嵌入)搜索知识库。
    返回相关文档块。
    
    参数:
        query: 搜索查询语句
    """
    results = vector_store.similarity_search(query, k=5)
    return [doc.page_content for doc in results]

@tool
def get_parent_context(parent_id: str) -> str:
    """
    检索完整的父块以获取额外上下文。
    
    参数:
        parent_id: 父块标识符
    """
    parent_file = Path(PARENT_STORE_PATH) / f"{parent_id}.json"
    if parent_file.exists():
        data = json.loads(parent_file.read_text())
        return data["content"]
    return "未找到父块。"

tools = [retrieve_documents, get_parent_context]

Define System Prompts

定义系统提示词

python
CONVERSATION_SUMMARIZER_PROMPT = """You are a conversation summarizer. 
Extract key context from the conversation history that is relevant to the current query.
Focus on: entities mentioned, topics discussed, user intent.

Conversation History:
{history}

Current Query: {query}

Provide a concise summary of relevant context."""

QUERY_CLARIFICATION_PROMPT = """You are a query clarification assistant.
Analyze the query and conversation context.

If the query is:
- Ambiguous or contains pronouns without clear referents: Rewrite it clearly
- Multi-part (multiple questions): Split into focused sub-queries
- Clear and focused: Return it unchanged

Context: {context}
Query: {query}

Return a JSON object:
{{
    "needs_clarification": boolean,
    "clarification_question": string or null,
    "rewritten_queries": [list of clear, focused queries]
}}"""

AGENT_PROMPT = """You are a RAG agent. Use the retrieve_documents tool to search for information.
If results are insufficient, try rephrasing your search query.
If you find relevant parent_id metadata, use get_parent_context for full context.

Available tools:
- retrieve_documents(query: str): Search the knowledge base
- get_parent_context(parent_id: str): Get full parent chunk

Question: {query}
Context: {context}

Provide a comprehensive answer based on retrieved documents."""
python
CONVERSATION_SUMMARIZER_PROMPT = """你是一名对话总结助手。
从对话历史中提取与当前查询相关的关键上下文。
重点关注:提及的实体、讨论的主题、用户意图。

对话历史:
{history}

当前查询:
{query}

请提供相关上下文的简洁总结。"""

QUERY_CLARIFICATION_PROMPT = """你是一名查询澄清助手。
分析查询语句和对话上下文。

如果查询语句:
- 模糊或包含指代不明的代词:清晰重写查询
- 多部分内容(多个问题):拆分为聚焦的子查询
- 清晰且聚焦:保持原查询不变

上下文:
{context}
查询:
{query}

返回JSON对象:
{{
    "needs_clarification": boolean,
    "clarification_question": string or null,
    "rewritten_queries": [清晰聚焦的查询列表]
}}"""

AGENT_PROMPT = """你是一个RAG智能体。使用retrieve_documents工具搜索信息。
如果结果不足,尝试重新表述搜索查询。
如果找到相关的parent_id元数据,使用get_parent_context获取完整上下文。

可用工具:
- retrieve_documents(query: str): 搜索知识库
- get_parent_context(parent_id: str): 获取完整父块

问题:
{query}
上下文:
{context}

基于检索到的文档提供全面的答案。"""

Define State Models

定义状态模型

python
from typing import TypedDict, Annotated, Sequence
from langgraph.graph import MessagesState
from langchain_core.messages import BaseMessage

class AgentState(TypedDict):
    """State for individual RAG agents."""
    messages: Annotated[Sequence[BaseMessage], "The messages in the conversation"]
    query: str
    context: str
    retrieved_docs: list[str]
    parent_contexts: list[str]
    search_attempts: int
    max_searches: int
    answer: str

class OrchestratorState(TypedDict):
    """State for the main orchestration graph."""
    user_query: str
    conversation_history: list[dict]
    conversation_summary: str
    clarified_queries: list[str]
    needs_human_input: bool
    clarification_question: str
    agent_results: list[dict]
    final_answer: str
python
from typing import TypedDict, Annotated, Sequence
from langgraph.graph import MessagesState
from langchain_core.messages import BaseMessage

class AgentState(TypedDict):
    """单个RAG智能体的状态。"""
    messages: Annotated[Sequence[BaseMessage], "对话中的消息"]
    query: str
    context: str
    retrieved_docs: list[str]
    parent_contexts: list[str]
    search_attempts: int
    max_searches: int
    answer: str

class OrchestratorState(TypedDict):
    """主编排图的状态。"""
    user_query: str
    conversation_history: list[dict]
    conversation_summary: str
    clarified_queries: list[str]
    needs_human_input: bool
    clarification_question: str
    agent_results: list[dict]
    final_answer: str

Build LangGraph Agent

构建LangGraph智能体

python
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolNode
from langchain_core.messages import HumanMessage, AIMessage

def should_continue(state: AgentState) -> str:
    """Decide if agent should continue searching or finish."""
    if state["answer"]:
        return "end"
    if state["search_attempts"] >= state["max_searches"]:
        return "end"
    return "continue"

def agent_node(state: AgentState) -> AgentState:
    """Main agent reasoning node."""
    llm_with_tools = llm.bind_tools(tools)
    
    messages = state["messages"]
    if not messages:
        messages = [HumanMessage(content=AGENT_PROMPT.format(
            query=state["query"],
            context=state.get("context", "")
        ))]
    
    response = llm_with_tools.invoke(messages)
    
    # Check if we have a final answer (no tool calls)
    if not response.tool_calls:
        return {
            **state,
            "answer": response.content,
            "messages": messages + [response]
        }
    
    return {
        **state,
        "messages": messages + [response],
        "search_attempts": state["search_attempts"] + 1
    }

def build_agent_graph():
    """Build the RAG agent graph."""
    workflow = StateGraph(AgentState)
    
    workflow.add_node("agent", agent_node)
    workflow.add_node("tools", ToolNode(tools))
    
    workflow.set_entry_point("agent")
    workflow.add_conditional_edges(
        "agent",
        should_continue,
        {
            "continue": "tools",
            "end": END
        }
    )
    workflow.add_edge("tools", "agent")
    
    return workflow.compile()

agent_graph = build_agent_graph()
python
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolNode
from langchain_core.messages import HumanMessage, AIMessage

def should_continue(state: AgentState) -> str:
    """决定智能体应继续搜索还是结束。"""
    if state["answer"]:
        return "end"
    if state["search_attempts"] >= state["max_searches"]:
        return "end"
    return "continue"

def agent_node(state: AgentState) -> AgentState:
    """主智能体推理节点。"""
    llm_with_tools = llm.bind_tools(tools)
    
    messages = state["messages"]
    if not messages:
        messages = [HumanMessage(content=AGENT_PROMPT.format(
            query=state["query"],
            context=state.get("context", "")
        ))]
    
    response = llm_with_tools.invoke(messages)
    
    # 检查是否有最终答案(无工具调用)
    if not response.tool_calls:
        return {
            **state,
            "answer": response.content,
            "messages": messages + [response]
        }
    
    return {
        **state,
        "messages": messages + [response],
        "search_attempts": state["search_attempts"] + 1
    }

def build_agent_graph():
    """构建RAG智能体图。"""
    workflow = StateGraph(AgentState)
    
    workflow.add_node("agent", agent_node)
    workflow.add_node("tools", ToolNode(tools))
    
    workflow.set_entry_point("agent")
    workflow.add_conditional_edges(
        "agent",
        should_continue,
        {
            "continue": "tools",
            "end": END
        }
    )
    workflow.add_edge("tools", "agent")
    
    return workflow.compile()

agent_graph = build_agent_graph()

Multi-Agent Orchestration

多智能体编排

python
from langgraph.graph import StateGraph, END
import json

def summarize_conversation(state: OrchestratorState) -> OrchestratorState:
    """Summarize conversation history for context."""
    history_text = "\n".join([
        f"{msg['role']}: {msg['content']}" 
        for msg in state["conversation_history"][-5:]  # Last 5 messages
    ])
    
    summary_prompt = CONVERSATION_SUMMARIZER_PROMPT.format(
        history=history_text,
        query=state["user_query"]
    )
    summary = llm.invoke(summary_prompt).content
    
    return {**state, "conversation_summary": summary}

def clarify_query(state: OrchestratorState) -> OrchestratorState:
    """Clarify and potentially decompose the query."""
    clarification_prompt = QUERY_CLARIFICATION_PROMPT.format(
        context=state.get("conversation_summary", ""),
        query=state["user_query"]
    )
    
    response = llm.invoke(clarification_prompt).content
    result = json.loads(response)
    
    return {
        **state,
        "needs_human_input": result["needs_clarification"],
        "clarification_question": result.get("clarification_question"),
        "clarified_queries": result["rewritten_queries"]
    }

def route_after_clarification(state: OrchestratorState) -> str:
    """Route based on whether human input is needed."""
    if state["needs_human_input"]:
        return "wait_for_human"
    return "execute_agents"

def execute_parallel_agents(state: OrchestratorState) -> OrchestratorState:
    """Execute multiple agents in parallel for query decomposition."""
    results = []
    
    for query in state["clarified_queries"]:
        agent_state = {
            "messages": [],
            "query": query,
            "context": state.get("conversation_summary", ""),
            "retrieved_docs": [],
            "parent_contexts": [],
            "search_attempts": 0,
            "max_searches": 3,
            "answer": ""
        }
        
        # Run agent graph
        final_state = agent_graph.invoke(agent_state)
        results.append({
            "query": query,
            "answer": final_state["answer"],
            "docs": final_state["retrieved_docs"]
        })
    
    return {**state, "agent_results": results}

def aggregate_results(state: OrchestratorState) -> OrchestratorState:
    """Combine all agent results into final answer."""
    combined = "\n\n".join([
        f"Sub-query: {r['query']}\nAnswer: {r['answer']}"
        for r in state["agent_results"]
    ])
    
    aggregation_prompt = f"""Synthesize these sub-answers into a coherent response:

{combined}

Original question: {state['user_query']}

Provide a unified, well-structured answer."""
    
    final_answer = llm.invoke(aggregation_prompt).content
    
    return {**state, "final_answer": final_answer}

def build_orchestrator_graph():
    """Build the main orchestration graph."""
    workflow = StateGraph(OrchestratorState)
    
    workflow.add_node("summarize", summarize_conversation)
    workflow.add_node("clarify", clarify_query)
    workflow.add_node("execute", execute_parallel_agents)
    workflow.add_node("aggregate", aggregate_results)
    
    workflow.set_entry_point("summarize")
    workflow.add_edge("summarize", "clarify")
    workflow.add_conditional_edges(
        "clarify",
        route_after_clarification,
        {
            "wait_for_human": END,  # Pause for human input
            "execute_agents": "execute"
        }
    )
    workflow.add_edge("execute", "aggregate")
    workflow.add_edge("aggregate", END)
    
    return workflow.compile()

orchestrator_graph = build_orchestrator_graph()
python
from langgraph.graph import StateGraph, END
import json

def summarize_conversation(state: OrchestratorState) -> OrchestratorState:
    """总结对话历史以获取上下文。"""
    history_text = "\n".join([
        f"{msg['role']}: {msg['content']}" 
        for msg in state["conversation_history"][-5:]  # 最近5条消息
    ])
    
    summary_prompt = CONVERSATION_SUMMARIZER_PROMPT.format(
        history=history_text,
        query=state["user_query"]
    )
    summary = llm.invoke(summary_prompt).content
    
    return {**state, "conversation_summary": summary}

def clarify_query(state: OrchestratorState) -> OrchestratorState:
    """澄清并可能分解查询。"""
    clarification_prompt = QUERY_CLARIFICATION_PROMPT.format(
        context=state.get("conversation_summary", ""),
        query=state["user_query"]
    )
    
    response = llm.invoke(clarification_prompt).content
    result = json.loads(response)
    
    return {
        **state,
        "needs_human_input": result["needs_clarification"],
        "clarification_question": result.get("clarification_question"),
        "clarified_queries": result["rewritten_queries"]
    }

def route_after_clarification(state: OrchestratorState) -> str:
    """根据是否需要人工输入进行路由。"""
    if state["needs_human_input"]:
        return "wait_for_human"
    return "execute_agents"

def execute_parallel_agents(state: OrchestratorState) -> OrchestratorState:
    """并行执行多个智能体以处理查询分解。"""
    results = []
    
    for query in state["clarified_queries"]:
        agent_state = {
            "messages": [],
            "query": query,
            "context": state.get("conversation_summary", ""),
            "retrieved_docs": [],
            "parent_contexts": [],
            "search_attempts": 0,
            "max_searches": 3,
            "answer": ""
        }
        
        # 运行智能体图
        final_state = agent_graph.invoke(agent_state)
        results.append({
            "query": query,
            "answer": final_state["answer"],
            "docs": final_state["retrieved_docs"]
        })
    
    return {**state, "agent_results": results}

def aggregate_results(state: OrchestratorState) -> OrchestratorState:
    """将所有智能体结果合并为最终答案。"""
    combined = "\n\n".join([
        f"子查询: {r['query']}\n答案: {r['answer']}"
        for r in state["agent_results"]
    ])
    
    aggregation_prompt = f"""将这些子答案合成为连贯的响应:

{combined}

原始问题: {state['user_query']}

提供统一、结构清晰的答案。"""
    
    final_answer = llm.invoke(aggregation_prompt).content
    
    return {**state, "final_answer": final_answer}

def build_orchestrator_graph():
    """构建主编排图。"""
    workflow = StateGraph(OrchestratorState)
    
    workflow.add_node("summarize", summarize_conversation)
    workflow.add_node("clarify", clarify_query)
    workflow.add_node("execute", execute_parallel_agents)
    workflow.add_node("aggregate", aggregate_results)
    
    workflow.set_entry_point("summarize")
    workflow.add_edge("summarize", "clarify")
    workflow.add_conditional_edges(
        "clarify",
        route_after_clarification,
        {
            "wait_for_human": END,  # 暂停等待人工输入
            "execute_agents": "execute"
        }
    )
    workflow.add_edge("execute", "aggregate")
    workflow.add_edge("aggregate", END)
    
    return workflow.compile()

orchestrator_graph = build_orchestrator_graph()

Usage Patterns

使用模式

Basic Query Execution

基础查询执行

python
def query_rag_system(user_query: str, conversation_history: list = None):
    """Execute a query through the full agentic RAG system."""
    initial_state = {
        "user_query": user_query,
        "conversation_history": conversation_history or [],
        "conversation_summary": "",
        "clarified_queries": [],
        "needs_human_input": False,
        "clarification_question": "",
        "agent_results": [],
        "final_answer": ""
    }
    
    result = orchestrator_graph.invoke(initial_state)
    
    if result["needs_human_input"]:
        return {
            "needs_clarification": True,
            "question": result["clarification_question"]
        }
    
    return {
        "needs_clarification": False,
        "answer": result["final_answer"],
        "sub_queries": result["clarified_queries"],
        "sources": [r["docs"] for r in result["agent_results"]]
    }
python
def query_rag_system(user_query: str, conversation_history: list = None):
    """通过完整的Agentic RAG系统执行查询。"""
    initial_state = {
        "user_query": user_query,
        "conversation_history": conversation_history or [],
        "conversation_summary": "",
        "clarified_queries": [],
        "needs_human_input": False,
        "clarification_question": "",
        "agent_results": [],
        "final_answer": ""
    }
    
    result = orchestrator_graph.invoke(initial_state)
    
    if result["needs_human_input"]:
        return {
            "needs_clarification": True,
            "question": result["clarification_question"]
        }
    
    return {
        "needs_clarification": False,
        "answer": result["final_answer"],
        "sub_queries": result["clarified_queries"],
        "sources": [r["docs"] for r in result["agent_results"]]
    }

Example usage

使用示例

response = query_rag_system( "What is the difference between JavaScript and Python?", conversation_history=[ {"role": "user", "content": "Tell me about programming languages"}, {"role": "assistant", "content": "Programming languages are..."} ] )
if response["needs_clarification"]: print(f"Clarification needed: {response['question']}") else: print(f"Answer: {response['answer']}") print(f"Decomposed into: {response['sub_queries']}")
undefined
response = query_rag_system( "JavaScript和Python的区别是什么?", conversation_history=[ {"role": "user", "content": "给我讲讲编程语言"}, {"role": "assistant", "content": "编程语言是..."} ] )
if response["needs_clarification"]: print(f"需要澄清: {response['question']}") else: print(f"答案: {response['answer']}") print(f"分解为: {response['sub_queries']}")
undefined

Interactive Chat Loop

交互式聊天循环

python
def chat_loop():
    """Interactive chat session with conversation memory."""
    conversation_history = []
    print("Agentic RAG Chat (type 'quit' to exit)")
    
    while True:
        user_input = input("\nYou: ").strip()
        if user_input.lower() == 'quit':
            break
        
        response = query_rag_system(user_input, conversation_history)
        
        if response["needs_clarification"]:
            print(f"\nBot: {response['question']}")
            clarification = input("You: ").strip()
            # Re-run with clarified input
            response = query_rag_system(clarification, conversation_history)
        
        print(f"\nBot: {response['answer']}")
        
        # Update history
        conversation_history.append({"role": "user", "content": user_input})
        conversation_history.append({"role": "assistant", "content": response['answer']})
python
def chat_loop():
    """带对话记忆的交互式聊天会话。"""
    conversation_history = []
    print("Agentic RAG聊天(输入'quit'退出)")
    
    while True:
        user_input = input("\n你: ").strip()
        if user_input.lower() == 'quit':
            break
        
        response = query_rag_system(user_input, conversation_history)
        
        if response["needs_clarification"]:
            print(f"\n机器人: {response['question']}")
            clarification = input("你: ").strip()
            # 使用澄清后的输入重新运行
            response = query_rag_system(clarification, conversation_history)
        
        print(f"\n机器人: {response['answer']}")
        
        # 更新对话历史
        conversation_history.append({"role": "user", "content": user_input})
        conversation_history.append({"role": "assistant", "content": response['answer']})

Run interactive chat

运行交互式聊天

chat_loop()
undefined
chat_loop()
undefined

Programmatic Multi-Query

程序化多查询

python
queries = [
    "What is machine learning?",
    "How does neural network training work?",
    "What are common ML frameworks?"
]

results = []
history = []

for q in queries:
    result = query_rag_system(q, history)
    results.append(result)
    
    history.append({"role": "user", "content": q})
    history.append({"role": "assistant", "content": result["answer"]})
python
queries = [
    "什么是机器学习?",
    "神经网络训练的工作原理是什么?",
    "常见的ML框架有哪些?"
]

results = []
history = []

for q in queries:
    result = query_rag_system(q, history)
    results.append(result)
    
    history.append({"role": "user", "content": q})
    history.append({"role": "assistant", "content": result["answer"]})

Results now contain context-aware answers

结果现在包含上下文感知的答案

for i, r in enumerate(results): print(f"\nQ{i+1}: {queries[i]}") print(f"A: {r['answer']}\n")
undefined
for i, r in enumerate(results): print(f"\n问题{i+1}: {queries[i]}") print(f"答案: {r['answer']}\n")
undefined

Running the Gradio UI

运行Gradio UI

python
undefined
python
undefined

Use the provided Gradio interface

使用提供的Gradio界面

python src/chat_app.py
python src/chat_app.py

Or programmatically

或程序化调用

from src.chat_app import create_chat_interface
demo = create_chat_interface( orchestrator_graph=orchestrator_graph, parent_store_path=PARENT_STORE_PATH ) demo.launch(share=True)
undefined
from src.chat_app import create_chat_interface
demo = create_chat_interface( orchestrator_graph=orchestrator_graph, parent_store_path=PARENT_STORE_PATH ) demo.launch(share=True)
undefined

Configuration Options

配置选项

Tuning Retrieval Parameters

调整检索参数

python
undefined
python
undefined

Adjust number of retrieved chunks

调整检索块的数量

vector_store.similarity_search(query, k=10) # Retrieve top 10
vector_store.similarity_search(query, k=10) # 检索前10个结果

Adjust child chunk size

调整子块大小

child_splitter = RecursiveCharacterTextSplitter( chunk_size=1000, # Larger chunks = more context chunk_overlap=200 # More overlap = better boundary handling )
child_splitter = RecursiveCharacterTextSplitter( chunk_size=1000, # 更大的块=更多上下文 chunk_overlap=200 # 更多重叠=更好的边界处理 )

Adjust max search attempts per agent

调整每个智能体的最大搜索尝试次数

agent_state = { "max_searches": 5, # Allow more self-correction loops ... }
undefined
agent_state = { "max_searches": 5, # 允许更多自我修正循环 ... }
undefined

Adjusting Agent Behavior

调整智能体行为

python
undefined
python
undefined

More temperature for creative answers

提高temperature以获得更具创意的答案

llm = ChatOllama(model="qwen3:4b-instruct-2507-q4_K_M", temperature=0.3)
llm = ChatOllama(model="qwen3:4b-instruct-2507-q4_K_M", temperature=0.3)

More aggressive query decomposition

更激进的查询分解

Modify QUERY_CLARIFICATION_PROMPT to split more aggressively

修改QUERY_CLARIFICATION_PROMPT以更激进地拆分查询

Longer conversation memory

更长的对话记忆

history_text = "\n".join([ f"{msg['role']}: {msg['content']}" for msg in state["conversation_history"][-10:] # Last 10 instead of 5 ])
undefined
history_text = "\n".join([ f"{msg['role']}: {msg['content']}" for msg in state["conversation_history"][-10:] # 最近10条消息而非5条 ])
undefined

Troubleshooting

故障排除

Small Models Ignore Tools

小型模型忽略工具调用

Problem: Ollama models <7B parameters ignore tool calls or hallucinate answers.
Solution:
bash
undefined
问题:Ollama中小于7B参数的模型忽略工具调用或生成幻觉答案。
解决方案:
bash
undefined

Use larger models for reliable tool calling

使用更大的模型以保证可靠的工具调用

ollama pull llama3.1:8b-instruct-q4_K_M ollama pull mistral:7b-instruct-q4_K_M
ollama pull llama3.1:8b-instruct-q4_K_M ollama pull mistral:7b-instruct-q4_K_M

Or switch to cloud providers

或切换到云提供商

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
undefined
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
undefined

Qdrant Collection Errors

Qdrant集合错误

Problem:
Collection already exists
or dimension mismatch errors.
Solution:
python
undefined
问题:出现
Collection already exists
或维度不匹配错误。
解决方案:
python
undefined

Delete and recreate collection

删除并重新创建集合

client.delete_collection(CHILD_COLLECTION) ensure_collection(CHILD_COLLECTION)
client.delete_collection(CHILD_COLLECTION) ensure_collection(CHILD_COLLECTION)

Or use a new collection name

或使用新的集合名称

CHILD_COLLECTION = "document_child_chunks_v2"
undefined
CHILD_COLLECTION = "document_child_chunks_v2"
undefined

Parent Chunks Not Found

父块未找到

Problem:
get_parent_context
returns "Parent chunk not found".
Solution:
python
undefined
问题
get_parent_context
返回"未找到父块"。
解决方案:
python
undefined

Check parent store exists

检查父块存储目录是否存在

print(list(Path(PARENT_STORE_PATH).glob("*.json")))
print(list(Path(PARENT_STORE_PATH).glob("*.json")))

Verify metadata in child chunks

验证子块中的元数据

results = vector_store.similarity_search("test", k=1) print(results[0].metadata) # Should have "parent_id" key
undefined
results = vector_store.similarity_search("test", k=1) print(results[0].metadata) # 应包含"parent_id"键
undefined

Memory Issues with Large Documents

处理大型文档时内存不足

Problem: Out of memory when processing many large PDFs.
Solution:
python
undefined
问题:处理大量大型PDF时出现内存不足。
解决方案:
python
undefined

Process documents in batches

批量处理文档

def index_documents_batched(markdown_files, batch_size=10): for i in range(0, len(markdown_files), batch_size): batch = markdown_files[i:i+batch_size] all_child_chunks = [] for md_file in batch: parent_ids, child_chunks = process_document_hierarchical(md_file) all_child_chunks.extend(child_chunks) vector_store.add_documents(all_child_chunks) print(f"Indexed batch {i//batch_size + 1}")
undefined
def index_documents_batched(markdown_files, batch_size=10): for i in range(0, len(markdown_files), batch_size): batch = markdown_files[i:i+batch_size] all_child_chunks = [] for md_file in batch: parent_ids, child_chunks = process_document_hierarchical(md_file) all_child_chunks.extend(child_chunks) vector_store.add_documents(all_child_chunks) print(f"已索引第 {i//batch_size + 1} 批文档")
undefined

Agent Loops Indefinitely

智能体无限循环

Problem: Agent keeps calling tools without producing an answer.
Solution:
python
undefined
问题:智能体持续调用工具而不生成答案。
解决方案:
python
undefined

Enforce stricter max_searches

设置更严格的max_searches限制

agent_state["max_searches"] = 2
agent_state["max_searches"] = 2

Add explicit termination in agent_node

在agent_node中添加显式终止逻辑

def agent_node(state: AgentState) -> AgentState: if state["search_attempts"] >= state["max_searches"]: return { **state, "answer": "Unable to find sufficient information after multiple attempts." } # ... rest of logic
undefined
def agent_node(state: AgentState) -> AgentState: if state["search_attempts"] >= state["max_searches"]: return { **state, "answer": "多次尝试后仍无法找到足够信息。" } # ... 其余逻辑
undefined

Query Clarification Too Aggressive

查询澄清过于激进

Problem: System asks for clarification on clear queries.
Solution:
python
undefined
问题:系统对清晰的查询也要求澄清。
解决方案:
python
undefined

Adjust QUERY_CLARIFICATION_PROMPT

调整QUERY_CLARIFICATION_PROMPT

QUERY_CLARIFICATION_PROMPT = """... Only set needs_clarification=true if the query is genuinely ambiguous (contains unresolved pronouns, missing critical context, or is nonsensical). ..."""
QUERY_CLARIFICATION_PROMPT = """... 仅当查询确实模糊(包含未解析的代词、缺失关键上下文或无意义)时,才将needs_clarification设置为true。 ..."""

Or skip clarification node for simple queries

或对简单查询跳过澄清节点

def route_after_clarification(state: OrchestratorState) -> str: if len(state["user_query"].split()) < 5: # Short queries skip return "execute_agents" if state["needs_human_input"]: return "wait_for_human" return "execute_agents"
undefined
def route_after_clarification(state: OrchestratorState) -> str: if len(state["user_query"].split()) < 5: # 短查询直接跳过 return "execute_agents" if state["needs_human_input"]: return "wait_for_human" return "execute_agents"
undefined

Advanced Patterns

高级模式

Add Observability with Langfuse

使用Langfuse添加可观测性

python
undefined
python
undefined

Set up Langfuse tracing

设置Langfuse追踪

import os os.environ["LANGFUSE_PUBLIC_KEY"] = "your-public-key" os.environ["LANGFUSE_SECRET_KEY"] = "your-secret-key" os.environ["LANGFUSE_HOST"] = "https://cloud.langfuse.com"
from langfuse.callback import CallbackHandler
langfuse_handler = CallbackHandler()
import os os.environ["LANGFUSE_PUBLIC_KEY"] = "your-public-key" os.environ["LANGFUSE_SECRET_KEY"] = "your-secret-key" os.environ["LANGFUSE_HOST"] = "https://cloud.langfuse.com"
from langfuse.callback import CallbackHandler
langfuse_handler = CallbackHandler()

Add to LLM calls

添加到LLM调用

llm = ChatOllama( model="qwen3:4b-instruct-2507-q4_K_M", temperature=0, callbacks=[langfuse_handler] )
llm = ChatOllama( model="qwen3:4b-instruct-2507-q4_K_M", temperature=0, callbacks=[langfuse_handler] )

Trace graph execution

追踪图执行

result = orchestrator_graph.invoke( initial_state, config={"callbacks": [langfuse_handler]} )
undefined
result = orchestrator_graph.invoke( initial_state, config={"callbacks": [langfuse_handler]} )
undefined

Custom Embedding Models

自定义嵌入模型

python
undefined
python
undefined

Use different embedding for domain-specific docs

为特定领域文档使用不同的嵌入模型

from langchain_huggingface import HuggingFaceEmbeddings
from langchain_huggingface import HuggingFaceEmbeddings

Legal documents

法律文档

legal_embeddings = HuggingFaceEmbeddings( model_name="nlpaueb/legal-bert-base-uncased" )
legal_embeddings = HuggingFaceEmbeddings( model_name="nlpaueb/legal-bert-base-uncased" )

Medical documents

医疗文档

medical_embeddings = HuggingFaceEmbeddings( model_name="microsoft/BiomedNLP-PubMedBERT-base-uncased-abstract" )
undefined
medical_embeddings = HuggingFaceEmbeddings( model_name="microsoft/BiomedNLP-PubMedBERT-base-uncased-abstract" )
undefined

Multi-Collection RAG

多集合RAG

python
undefined
python
undefined

Search multiple collections (e.g., different document types)

搜索多个集合(例如不同类型的文档)

def multi_collection_retrieve(query: str) -> list[str]: results = [] for collection in ["technical_docs", "user_guides", "api_reference"]: store = QdrantVectorStore( client=client, collection_name=collection, embedding=dense_embeddings ) results.extend(store.similarity_search(query, k=2)) return [doc.page_content for doc in results]
undefined
def multi_collection_retrieve(query: str) -> list[str]: results = [] for collection in ["technical_docs", "user_guides", "api_reference"]: store = QdrantVectorStore( client=client, collection_name=collection, embedding=dense_embeddings ) results.extend(store.similarity_search(query, k=2)) return [doc.page_content for doc in results]
undefined

Resources

资源

Key Takeaways

关键要点

  1. Always use 7B+ models for reliable tool calling and instruction following
  2. Hierarchical indexing (parent/child chunks) balances precision and context
  3. Query clarification prevents misunderstandings early in the pipeline
  4. Multi-agent decomposition handles complex queries by parallelizing sub-problems
  5. Self-correction loops improve answer quality through iterative refinement
  6. Provider-agnostic design allows seamless switching between local and cloud LLMs
This framework is production-ready and designed for extension — swap components, add new tools, or integrate custom agents as needed.
  1. 始终使用7B+参数模型以保证可靠的工具调用和指令遵循能力
  2. 分层索引(父块/子块)平衡了精度和上下文完整性
  3. 查询澄清可在流程早期避免误解
  4. 多智能体分解通过并行处理子问题来应对复杂查询
  5. 自我修正循环通过迭代优化提高答案质量
  6. 提供商无关设计允许在本地和云LLM之间无缝切换
本框架可直接用于生产环境,且具备可扩展性 — 可按需替换组件、添加新工具或集成自定义智能体。