modular-rag-mcp-server

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Modular RAG MCP Server

模块化RAG MCP服务器

Skill by ara.so — MCP Skills collection.
Expert skill for deploying, configuring, and extending the Modular RAG MCP Server — a pluggable, observable RAG (Retrieval-Augmented Generation) system that exposes tools via Model Context Protocol for AI assistants like Claude Desktop and GitHub Copilot.
ara.so提供的Skill — MCP Skills合集。
这是一款用于部署、配置和扩展模块化RAG MCP服务器的专业Skill,该服务器是一个可插拔、可观测的RAG(检索增强生成)系统,通过Model Context Protocol(MCP)为Claude Desktop、GitHub Copilot等AI助手提供工具支持。

What This Project Does

项目功能介绍

The Modular RAG MCP Server is a complete RAG pipeline featuring:
  • Ingestion Pipeline: PDF → Markdown → Chunking → Embedding → Vector Store (with multimodal image captioning)
  • Hybrid Search: Dense vectors (semantic) + Sparse BM25 (exact match) + RRF fusion + optional reranking
  • MCP Protocol: Standard MCP server exposing
    query_knowledge_hub
    ,
    list_collections
    ,
    get_document_summary
    tools
  • Dashboard: Streamlit-based management UI with 6 pages (overview, data browser, ingestion tracking, query tracking, evaluation)
  • Evaluation Framework: Ragas + custom metrics for regression testing
  • Full Observability: White-box tracing of ingestion and query pipelines
Key Architecture: Every core component (LLM, Embedding, Reranker, Splitter, VectorStore, Evaluator) is pluggable via abstract interfaces. Switch backends through configuration without code changes.
模块化RAG MCP服务器是一套完整的RAG流水线,具备以下特性:
  • 数据摄入流水线:PDF → Markdown → 文本分块 → 嵌入向量 → 向量存储(支持多模态图片 captioning)
  • 混合检索:密集向量(语义检索)+ 稀疏BM25(精确匹配)+ RRF融合 + 可选重排序
  • MCP协议:标准MCP服务器,提供
    query_knowledge_hub
    list_collections
    get_document_summary
    工具
  • 管理面板:基于Streamlit的管理UI,包含6个页面(概览、数据浏览器、摄入跟踪、查询跟踪、评估)
  • 评估框架:Ragas + 自定义指标,用于回归测试
  • 全链路可观测:摄入和查询流水线的白盒追踪
核心架构:所有核心组件(LLM、嵌入模型、重排序器、分块器、向量存储、评估器)均通过抽象接口实现可插拔,无需修改代码即可通过配置切换后端。

Installation

安装步骤

Prerequisites

前置要求

  • Python 3.9+
  • VS Code with GitHub Copilot or Claude Desktop
  • API keys for your chosen providers (OpenAI, Anthropic, Cohere, etc.)
  • Python 3.9+
  • 安装GitHub Copilot或Claude Desktop的VS Code
  • 所选服务提供商的API密钥(OpenAI、Anthropic、Cohere等)

Quick Setup with Setup Skill

使用Setup Skill快速搭建

The project includes a Setup Skill that automates the entire configuration:
bash
undefined
项目包含一个Setup Skill,可自动化完成整个配置流程:
bash
undefined

Clone the repository

Clone the repository

In VS Code with Copilot/Claude, type in chat:

In VS Code with Copilot/Claude, type in chat:

setup

The Setup Skill will:
1. Ask you to select providers (OpenAI, Anthropic, Cohere, etc.)
2. Configure API keys
3. Install dependencies
4. Generate configuration files
5. Launch the dashboard
setup

Setup Skill将执行以下操作:
1. 请您选择服务提供商(OpenAI、Anthropic、Cohere等)
2. 配置API密钥
3. 安装依赖包
4. 生成配置文件
5. 启动管理面板

Manual Setup

手动搭建

bash
undefined
bash
undefined

Create virtual environment

Create virtual environment

python -m venv venv source venv/bin/activate # On Windows: venv\Scripts\activate
python -m venv venv source venv/bin/activate # On Windows: venv\Scripts\activate

Install dependencies

Install dependencies

pip install -r requirements.txt
pip install -r requirements.txt

Copy and configure environment variables

Copy and configure environment variables

cp .env.example .env
cp .env.example .env

Edit .env with your API keys

Edit .env with your API keys

undefined
undefined

Configuration

配置说明

Main Configuration File (
src/core/config.py
)

主配置文件 (
src/core/config.py
)

The system uses a centralized configuration approach. Key settings:
python
from src.core.config import get_config

config = get_config()
系统采用集中式配置方案,关键设置如下:
python
from src.core.config import get_config

config = get_config()

Access configuration

Access configuration

llm_provider = config.llm.provider # "openai", "anthropic", etc. embedding_provider = config.embedding.provider vector_store_type = config.vector_store.type # "qdrant", "chroma", etc.
undefined
llm_provider = config.llm.provider # "openai", "anthropic", etc. embedding_provider = config.embedding.provider vector_store_type = config.vector_store.type # "qdrant", "chroma", etc.
undefined

Environment Variables

环境变量

Create
.env
file with required keys:
bash
undefined
创建
.env
文件并填入所需密钥:
bash
undefined

LLM Provider

LLM Provider

OPENAI_API_KEY=your_openai_key_here ANTHROPIC_API_KEY=your_anthropic_key_here
OPENAI_API_KEY=your_openai_key_here ANTHROPIC_API_KEY=your_anthropic_key_here

Embedding Provider

Embedding Provider

COHERE_API_KEY=your_cohere_key_here
COHERE_API_KEY=your_cohere_key_here

Reranker (optional)

Reranker (optional)

JINA_API_KEY=your_jina_key_here
JINA_API_KEY=your_jina_key_here

Vector Store (if using cloud)

Vector Store (if using cloud)

QDRANT_URL=your_qdrant_url QDRANT_API_KEY=your_qdrant_key
undefined
QDRANT_URL=your_qdrant_url QDRANT_API_KEY=your_qdrant_key
undefined

Provider Configuration

服务提供商配置

Edit
src/core/config.py
to set default providers:
python
class LLMConfig:
    provider: str = "openai"  # or "anthropic", "cohere"
    model: str = "gpt-4"
    temperature: float = 0.7
    max_tokens: int = 2048

class EmbeddingConfig:
    provider: str = "openai"  # or "cohere", "huggingface"
    model: str = "text-embedding-3-small"
    dimension: int = 1536

class RerankerConfig:
    enabled: bool = True
    provider: str = "cohere"  # or "jina", "cross-encoder"
    model: str = "rerank-english-v3.0"
    top_k: int = 5
编辑
src/core/config.py
设置默认提供商:
python
class LLMConfig:
    provider: str = "openai"  # or "anthropic", "cohere"
    model: str = "gpt-4"
    temperature: float = 0.7
    max_tokens: int = 2048

class EmbeddingConfig:
    provider: str = "openai"  # or "cohere", "huggingface"
    model: str = "text-embedding-3-small"
    dimension: int = 1536

class RerankerConfig:
    enabled: bool = True
    provider: str = "cohere"  # or "jina", "cross-encoder"
    model: str = "rerank-english-v3.0"
    top_k: int = 5

Key Components and API

核心组件与API

1. Ingestion Pipeline

1. 数据摄入流水线

Ingest documents into the knowledge base:
python
from src.ingestion.pipeline import IngestionPipeline
from src.core.config import get_config

config = get_config()
pipeline = IngestionPipeline(config)
将文档导入知识库:
python
from src.ingestion.pipeline import IngestionPipeline
from src.core.config import get_config

config = get_config()
pipeline = IngestionPipeline(config)

Ingest a PDF document

Ingest a PDF document

result = pipeline.ingest_document( file_path="path/to/document.pdf", collection_name="my_collection", metadata={"source": "internal_docs", "version": "1.0"} )
print(f"Ingested {result['chunks_created']} chunks") print(f"Ingestion ID: {result['ingestion_id']}")
undefined
result = pipeline.ingest_document( file_path="path/to/document.pdf", collection_name="my_collection", metadata={"source": "internal_docs", "version": "1.0"} )
print(f"Ingested {result['chunks_created']} chunks") print(f"Ingestion ID: {result['ingestion_id']}")
undefined

2. Hybrid Search and Query

2. 混合检索与查询

Query the knowledge base with hybrid search:
python
from src.retrieval.hybrid_search import HybridSearchRetriever
from src.core.config import get_config

config = get_config()
retriever = HybridSearchRetriever(config)
通过混合检索查询知识库:
python
from src.retrieval.hybrid_search import HybridSearchRetriever
from src.core.config import get_config

config = get_config()
retriever = HybridSearchRetriever(config)

Perform hybrid search

Perform hybrid search

results = retriever.retrieve( query="How does the authentication system work?", collection_name="my_collection", top_k=10, # Initial retrieval rerank_top_k=5 # After reranking )
for idx, result in enumerate(results): print(f"{idx+1}. Score: {result.score:.4f}") print(f" Text: {result.text[:100]}...") print(f" Metadata: {result.metadata}")
undefined
results = retriever.retrieve( query="How does the authentication system work?", collection_name="my_collection", top_k=10, # Initial retrieval rerank_top_k=5 # After reranking )
for idx, result in enumerate(results): print(f"{idx+1}. Score: {result.score:.4f}") print(f" Text: {result.text[:100]}...") print(f" Metadata: {result.metadata}")
undefined

3. MCP Server Integration

3. MCP服务器集成

The MCP server exposes tools for AI assistants. Start the server:
bash
undefined
MCP服务器为AI助手提供工具支持,启动服务器:
bash
undefined

Run MCP server (usually configured in Claude Desktop config)

Run MCP server (usually configured in Claude Desktop config)

python src/mcp/server.py

Configure in Claude Desktop (`claude_desktop_config.json`):

```json
{
  "mcpServers": {
    "rag-knowledge-hub": {
      "command": "python",
      "args": ["/path/to/project/src/mcp/server.py"],
      "env": {
        "PYTHONPATH": "/path/to/project"
      }
    }
  }
}
Available MCP Tools:
  1. query_knowledge_hub: Query the RAG system
    python
    # When Claude calls this tool:
    {
      "query": "What are the deployment requirements?",
      "collection_name": "my_collection",
      "top_k": 5
    }
  2. list_collections: List all available collections
    python
    # Returns: ["collection1", "collection2", ...]
  3. get_document_summary: Get summary of a specific document
    python
    {
      "document_id": "doc_123",
      "collection_name": "my_collection"
    }
python src/mcp/server.py

在Claude Desktop中配置(`claude_desktop_config.json`):

```json
{
  "mcpServers": {
    "rag-knowledge-hub": {
      "command": "python",
      "args": ["/path/to/project/src/mcp/server.py"],
      "env": {
        "PYTHONPATH": "/path/to/project"
      }
    }
  }
}
可用MCP工具:
  1. query_knowledge_hub:查询RAG系统
    python
    # When Claude calls this tool:
    {
      "query": "What are the deployment requirements?",
      "collection_name": "my_collection",
      "top_k": 5
    }
  2. list_collections:列出所有可用集合
    python
    # Returns: ["collection1", "collection2", ...]
  3. get_document_summary:获取特定文档的摘要
    python
    {
      "document_id": "doc_123",
      "collection_name": "my_collection"
    }

4. Dashboard

4. 管理面板

Launch the Streamlit dashboard:
bash
streamlit run src/dashboard/app.py
Dashboard pages:
  • Overview: System status, collection stats, recent activity
  • Data Browser: Browse and search ingested documents
  • Ingestion Management: Upload new documents, view ingestion history
  • Ingestion Tracking: Monitor ingestion pipeline steps
  • Query Tracking: Analyze query performance and results
  • Evaluation Panel: Run evaluations with Ragas metrics
启动Streamlit管理面板:
bash
streamlit run src/dashboard/app.py
管理面板页面:
  • 概览:系统状态、集合统计、近期活动
  • 数据浏览器:浏览和搜索已摄入的文档
  • 摄入管理:上传新文档、查看摄入历史
  • 摄入跟踪:监控摄入流水线步骤
  • 查询跟踪:分析查询性能与结果
  • 评估面板:使用Ragas指标运行评估

5. Evaluation with Ragas

5. 基于Ragas的评估

Evaluate RAG performance:
python
from src.evaluation.evaluator import RAGEvaluator
from src.core.config import get_config

config = get_config()
evaluator = RAGEvaluator(config)
评估RAG系统性能:
python
from src.evaluation.evaluator import RAGEvaluator
from src.core.config import get_config

config = get_config()
evaluator = RAGEvaluator(config)

Prepare test dataset

Prepare test dataset

test_cases = [ { "query": "What is the API rate limit?", "expected_answer": "The API rate limit is 1000 requests per hour.", "ground_truth_context": ["Rate limits are set to 1000 req/hour..."] }, # ... more test cases ]
test_cases = [ { "query": "What is the API rate limit?", "expected_answer": "The API rate limit is 1000 requests per hour.", "ground_truth_context": ["Rate limits are set to 1000 req/hour..."] }, # ... more test cases ]

Run evaluation

Run evaluation

results = evaluator.evaluate( test_cases=test_cases, collection_name="my_collection", metrics=["faithfulness", "answer_relevancy", "context_precision"] )
print(f"Average Faithfulness: {results['faithfulness']:.3f}") print(f"Average Answer Relevancy: {results['answer_relevancy']:.3f}")
undefined
results = evaluator.evaluate( test_cases=test_cases, collection_name="my_collection", metrics=["faithfulness", "answer_relevancy", "context_precision"] )
print(f"Average Faithfulness: {results['faithfulness']:.3f}") print(f"Average Answer Relevancy: {results['answer_relevancy']:.3f}")
undefined

Common Patterns

常见使用模式

Switching Embedding Providers

切换嵌入模型提供商

To switch from OpenAI to Cohere embeddings:
python
undefined
从OpenAI切换到Cohere嵌入模型:
python
undefined

In src/core/config.py

In src/core/config.py

class EmbeddingConfig: provider: str = "cohere" # Changed from "openai" model: str = "embed-english-v3.0" dimension: int = 1024 # Cohere dimension

Or programmatically:

```python
from src.core.config import get_config

config = get_config()
config.embedding.provider = "cohere"
config.embedding.model = "embed-english-v3.0"
config.embedding.dimension = 1024
class EmbeddingConfig: provider: str = "cohere" # Changed from "openai" model: str = "embed-english-v3.0" dimension: int = 1024 # Cohere dimension

或通过代码动态设置:

```python
from src.core.config import get_config

config = get_config()
config.embedding.provider = "cohere"
config.embedding.model = "embed-english-v3.0"
config.embedding.dimension = 1024

Adding Custom Chunking Strategy

添加自定义文本分块策略

Implement a custom text splitter:
python
from src.ingestion.splitters.base import BaseSplitter
from typing import List

class CustomSplitter(BaseSplitter):
    def __init__(self, chunk_size: int = 500, overlap: int = 50):
        self.chunk_size = chunk_size
        self.overlap = overlap
    
    def split(self, text: str, metadata: dict = None) -> List[dict]:
        chunks = []
        start = 0
        while start < len(text):
            end = start + self.chunk_size
            chunk_text = text[start:end]
            chunks.append({
                "text": chunk_text,
                "metadata": {
                    **(metadata or {}),
                    "chunk_index": len(chunks),
                    "start_char": start
                }
            })
            start += self.chunk_size - self.overlap
        return chunks
实现自定义文本分器:
python
from src.ingestion.splitters.base import BaseSplitter
from typing import List

class CustomSplitter(BaseSplitter):
    def __init__(self, chunk_size: int = 500, overlap: int = 50):
        self.chunk_size = chunk_size
        self.overlap = overlap
    
    def split(self, text: str, metadata: dict = None) -> List[dict]:
        chunks = []
        start = 0
        while start < len(text):
            end = start + self.chunk_size
            chunk_text = text[start:end]
            chunks.append({
                "text": chunk_text,
                "metadata": {
                    **(metadata or {}),
                    "chunk_index": len(chunks),
                    "start_char": start
                }
            })
            start += self.chunk_size - self.overlap
        return chunks

Register and use

Register and use

from src.ingestion.pipeline import IngestionPipeline
pipeline = IngestionPipeline(config) pipeline.splitter = CustomSplitter(chunk_size=300, overlap=30)
undefined
from src.ingestion.pipeline import IngestionPipeline
pipeline = IngestionPipeline(config) pipeline.splitter = CustomSplitter(chunk_size=300, overlap=30)
undefined

Implementing Custom Reranker

实现自定义重排序器

python
from src.retrieval.rerankers.base import BaseReranker
from typing import List

class CustomReranker(BaseReranker):
    def rerank(self, query: str, documents: List[dict], top_k: int = 5) -> List[dict]:
        # Custom reranking logic
        scored_docs = []
        for doc in documents:
            # Example: simple keyword matching score
            score = sum(1 for word in query.lower().split() 
                       if word in doc['text'].lower())
            scored_docs.append({**doc, 'rerank_score': score})
        
        # Sort by score and return top_k
        scored_docs.sort(key=lambda x: x['rerank_score'], reverse=True)
        return scored_docs[:top_k]
python
from src.retrieval.rerankers.base import BaseReranker
from typing import List

class CustomReranker(BaseReranker):
    def rerank(self, query: str, documents: List[dict], top_k: int = 5) -> List[dict]:
        # Custom reranking logic
        scored_docs = []
        for doc in documents:
            # Example: simple keyword matching score
            score = sum(1 for word in query.lower().split() 
                       if word in doc['text'].lower())
            scored_docs.append({**doc, 'rerank_score': score})
        
        # Sort by score and return top_k
        scored_docs.sort(key=lambda x: x['rerank_score'], reverse=True)
        return scored_docs[:top_k]

Use in retriever

Use in retriever

from src.retrieval.hybrid_search import HybridSearchRetriever
retriever = HybridSearchRetriever(config) retriever.reranker = CustomReranker()
undefined
from src.retrieval.hybrid_search import HybridSearchRetriever
retriever = HybridSearchRetriever(config) retriever.reranker = CustomReranker()
undefined

Multimodal Image Processing

多模态图片处理

The system supports image captioning in PDFs:
python
from src.ingestion.pipeline import IngestionPipeline

pipeline = IngestionPipeline(config)
系统支持PDF中的图片captioning功能:
python
from src.ingestion.pipeline import IngestionPipeline

pipeline = IngestionPipeline(config)

Enable image captioning

Enable image captioning

result = pipeline.ingest_document( file_path="document_with_images.pdf", collection_name="multimodal_docs", enable_image_captioning=True, # Vision LLM generates descriptions metadata={"type": "technical_manual"} )
result = pipeline.ingest_document( file_path="document_with_images.pdf", collection_name="multimodal_docs", enable_image_captioning=True, # Vision LLM生成描述 metadata={"type": "technical_manual"} )

Images are converted to text descriptions and embedded with surrounding text

Images are converted to text descriptions and embedded with surrounding text

undefined
undefined

Batch Ingestion

批量摄入文档

Ingest multiple documents:
python
import os
from pathlib import Path

pipeline = IngestionPipeline(config)
docs_dir = Path("./documents")

results = []
for pdf_file in docs_dir.glob("*.pdf"):
    try:
        result = pipeline.ingest_document(
            file_path=str(pdf_file),
            collection_name="batch_collection",
            metadata={"filename": pdf_file.name}
        )
        results.append(result)
        print(f"✓ Ingested {pdf_file.name}")
    except Exception as e:
        print(f"✗ Failed {pdf_file.name}: {e}")

print(f"Total successful: {len(results)}")
批量导入多个文档:
python
import os
from pathlib import Path

pipeline = IngestionPipeline(config)
docs_dir = Path("./documents")

results = []
for pdf_file in docs_dir.glob("*.pdf"):
    try:
        result = pipeline.ingest_document(
            file_path=str(pdf_file),
            collection_name="batch_collection",
            metadata={"filename": pdf_file.name}
        )
        results.append(result)
        print(f"✓ Ingested {pdf_file.name}")
    except Exception as e:
        print(f"✗ Failed {pdf_file.name}: {e}")

print(f"Total successful: {len(results)}")

Troubleshooting

故障排查

MCP Server Not Connecting

MCP服务器连接失败

Issue: Claude Desktop cannot connect to MCP server
Solution:
  1. Check Claude Desktop config path (macOS:
    ~/Library/Application Support/Claude/claude_desktop_config.json
    )
  2. Ensure Python path and project path are absolute
  3. Verify environment variables are set in config:
    json
    {
      "mcpServers": {
        "rag-knowledge-hub": {
          "command": "/usr/bin/python3",
          "args": ["/absolute/path/to/project/src/mcp/server.py"],
          "env": {
            "PYTHONPATH": "/absolute/path/to/project",
            "OPENAI_API_KEY": "sk-..."
          }
        }
      }
    }
  4. Restart Claude Desktop completely
问题:Claude Desktop无法连接到MCP服务器
解决方案
  1. 检查Claude Desktop配置路径(macOS:
    ~/Library/Application Support/Claude/claude_desktop_config.json
  2. 确保Python路径和项目路径为绝对路径
  3. 验证配置中已设置环境变量:
    json
    {
      "mcpServers": {
        "rag-knowledge-hub": {
          "command": "/usr/bin/python3",
          "args": ["/absolute/path/to/project/src/mcp/server.py"],
          "env": {
            "PYTHONPATH": "/absolute/path/to/project",
            "OPENAI_API_KEY": "sk-..."
          }
        }
      }
    }
  4. 完全重启Claude Desktop

Poor Retrieval Results

检索结果质量差

Issue: Query returns irrelevant documents
Solutions:
  1. Check chunking strategy: Smaller chunks for precise retrieval, larger for more context
    python
    config.ingestion.chunk_size = 300  # Reduce for precision
    config.ingestion.chunk_overlap = 50
  2. Enable reranking: Use cross-encoder or LLM reranker
    python
    config.reranker.enabled = True
    config.reranker.provider = "cohere"
    config.reranker.top_k = 5
  3. Adjust hybrid search weights:
    python
    from src.retrieval.hybrid_search import HybridSearchRetriever
    
    retriever = HybridSearchRetriever(config)
    retriever.dense_weight = 0.7  # Semantic search
    retriever.sparse_weight = 0.3  # BM25 exact match
  4. Use evaluation to iterate:
    python
    # Create golden test set
    evaluator = RAGEvaluator(config)
    results = evaluator.evaluate(test_cases, collection_name="my_collection")
    # Adjust parameters based on metrics
问题:查询返回不相关文档
解决方案
  1. 检查分块策略:更小的分块用于精准检索,更大的分块用于获取更多上下文
    python
    config.ingestion.chunk_size = 300  # 减小分块尺寸提升精准度
    config.ingestion.chunk_overlap = 50
  2. 启用重排序:使用cross-encoder或LLM重排序器
    python
    config.reranker.enabled = True
    config.reranker.provider = "cohere"
    config.reranker.top_k = 5
  3. 调整混合检索权重
    python
    from src.retrieval.hybrid_search import HybridSearchRetriever
    
    retriever = HybridSearchRetriever(config)
    retriever.dense_weight = 0.7  # 语义检索权重
    retriever.sparse_weight = 0.3  # BM25精确匹配权重
  4. 通过评估迭代优化
    python
    # 创建测试数据集
    evaluator = RAGEvaluator(config)
    results = evaluator.evaluate(test_cases, collection_name="my_collection")
    # 根据指标调整参数

Vector Store Connection Issues

向量存储连接失败

Issue: Cannot connect to Qdrant/Chroma
Solution:
  1. For Qdrant Cloud:
    bash
    # .env
    QDRANT_URL=https://your-cluster.qdrant.io
    QDRANT_API_KEY=your_api_key
  2. For local Qdrant:
    bash
    # Start Qdrant with Docker
    docker run -p 6333:6333 qdrant/qdrant
    
    # In config
    QDRANT_URL=http://localhost:6333
  3. For Chroma (local):
    python
    # config.py
    class VectorStoreConfig:
        type: str = "chroma"
        persist_directory: str = "./chroma_db"
问题:无法连接Qdrant/Chroma
解决方案
  1. 对于Qdrant Cloud:
    bash
    # .env
    QDRANT_URL=https://your-cluster.qdrant.io
    QDRANT_API_KEY=your_api_key
  2. 对于本地Qdrant:
    bash
    # 使用Docker启动Qdrant
    docker run -p 6333:6333 qdrant/qdrant
    
    # 配置文件中
    QDRANT_URL=http://localhost:6333
  3. 对于本地Chroma:
    python
    # config.py
    class VectorStoreConfig:
        type: str = "chroma"
        persist_directory: str = "./chroma_db"

Out of Memory During Ingestion

摄入过程内存不足

Issue: Large PDFs cause OOM errors
Solutions:
  1. Process in batches:
    python
    # Increase chunk size, reduce batch size
    config.ingestion.chunk_size = 800
    config.ingestion.batch_size = 10  # Embed 10 chunks at a time
  2. Use streaming for large documents:
    python
    pipeline = IngestionPipeline(config)
    pipeline.process_streaming(
        file_path="large_document.pdf",
        collection_name="large_docs"
    )
问题:大PDF文件导致内存溢出错误
解决方案
  1. 分批处理:
    python
    # 增大分块尺寸,减小批量大小
    config.ingestion.chunk_size = 800
    config.ingestion.batch_size = 10  # 一次嵌入10个分块
  2. 对大文档使用流式处理:
    python
    pipeline = IngestionPipeline(config)
    pipeline.process_streaming(
        file_path="large_document.pdf",
        collection_name="large_docs"
    )

API Rate Limits

API速率限制

Issue: Hitting provider rate limits
Solutions:
  1. Implement retry with exponential backoff:
    python
    config.llm.max_retries = 5
    config.llm.retry_delay = 2.0  # seconds
  2. Use batch embedding APIs:
    python
    # OpenAI allows batching up to 2048 texts
    config.embedding.batch_size = 100
  3. Switch to providers with higher limits (e.g., Cohere for embeddings)
问题:触发服务提供商的速率限制
解决方案
  1. 实现指数退避重试:
    python
    config.llm.max_retries = 5
    config.llm.retry_delay = 2.0  # 秒
  2. 使用批量嵌入API:
    python
    # OpenAI支持最多2048条文本批量处理
    config.embedding.batch_size = 100
  3. 切换到更高速率限制的提供商(例如,使用Cohere进行嵌入)

Advanced Usage

进阶用法

Custom RAG Pipeline

自定义RAG流水线

Build a custom RAG pipeline with specific components:
python
from src.core.config import get_config
from src.retrieval.hybrid_search import HybridSearchRetriever
from src.generation.generator import Generator
from src.evaluation.evaluator import RAGEvaluator

config = get_config()
构建包含特定组件的自定义RAG流水线:
python
from src.core.config import get_config
from src.retrieval.hybrid_search import HybridSearchRetriever
from src.generation.generator import Generator
from src.evaluation.evaluator import RAGEvaluator

config = get_config()

Custom retriever configuration

Custom retriever configuration

retriever = HybridSearchRetriever(config) retriever.dense_weight = 0.6 retriever.sparse_weight = 0.4
retriever = HybridSearchRetriever(config) retriever.dense_weight = 0.6 retriever.sparse_weight = 0.4

Custom generator

Custom generator

generator = Generator(config) generator.system_prompt = "You are a helpful technical assistant..."
generator = Generator(config) generator.system_prompt = "You are a helpful technical assistant..."

Run custom RAG

Run custom RAG

def custom_rag_query(query: str, collection: str): # Retrieve contexts = retriever.retrieve(query, collection, top_k=5)
# Generate
response = generator.generate(
    query=query,
    contexts=[c.text for c in contexts],
    metadata=[c.metadata for c in contexts]
)

# Evaluate (optional)
evaluator = RAGEvaluator(config)
metrics = evaluator.evaluate_single(
    query=query,
    response=response,
    contexts=[c.text for c in contexts]
)

return {
    "response": response,
    "contexts": contexts,
    "metrics": metrics
}
result = custom_rag_query("What are the system requirements?", "docs") print(result["response"])
undefined
def custom_rag_query(query: str, collection: str): # Retrieve contexts = retriever.retrieve(query, collection, top_k=5)
# Generate
response = generator.generate(
    query=query,
    contexts=[c.text for c in contexts],
    metadata=[c.metadata for c in contexts]
)

# Evaluate (optional)
evaluator = RAGEvaluator(config)
metrics = evaluator.evaluate_single(
    query=query,
    response=response,
    contexts=[c.text for c in contexts]
)

return {
    "response": response,
    "contexts": contexts,
    "metrics": metrics
}
result = custom_rag_query("What are the system requirements?", "docs") print(result["response"])
undefined

Integrating with Your Own Application

集成到自有应用

Use the RAG system as a library:
python
from src.rag_system import RAGSystem
from src.core.config import get_config
将RAG系统作为库使用:
python
from src.rag_system import RAGSystem
from src.core.config import get_config

Initialize

Initialize

config = get_config() rag = RAGSystem(config)
config = get_config() rag = RAGSystem(config)

In your FastAPI/Flask app

In your FastAPI/Flask app

@app.post("/ask") async def ask_question(query: str, collection: str = "default"): result = rag.query( query=query, collection_name=collection, top_k=5 ) return { "answer": result["response"], "sources": result["contexts"], "confidence": result["metrics"]["answer_relevancy"] }
undefined
@app.post("/ask") async def ask_question(query: str, collection: str = "default"): result = rag.query( query=query, collection_name=collection, top_k=5 ) return { "answer": result["response"], "sources": result["contexts"], "confidence": result["metrics"]["answer_relevancy"] }
undefined

Branch Strategy

分支策略

  • main
    : Clean, production-ready code (1 commit with latest complete code)
  • dev
    : Full commit history showing development progression
  • clean-start
    : Skeleton with Skills and DEV_SPEC, zero progress (for learning from scratch)
Choose branch based on your needs:
  • Quick deployment →
    main
  • Understanding the build process →
    dev
  • Learning by building yourself →
    clean-start
  • main
    :简洁的生产就绪代码(仅1个提交,包含最新完整代码)
  • dev
    :完整提交历史,展示开发过程
  • clean-start
    :仅包含Skills和DEV_SPEC的骨架代码,无开发进度(适合从零开始学习)
根据需求选择分支:
  • 快速部署 →
    main
  • 了解构建过程 →
    dev
  • 自主构建学习 →
    clean-start

Additional Resources

额外资源

  • DEV_SPEC.md: Complete architecture design and task breakdown
  • Resume Writer Skill: Generate customized resume descriptions for this project
  • QA Tester Skill: Automated testing across unit/integration/E2E layers
  • Package Skill: Clean and package project for distribution
Use these skills in VS Code by typing the skill name in Copilot/Claude chat.
  • DEV_SPEC.md:完整架构设计与任务分解
  • Resume Writer Skill:生成该项目的定制化简历描述
  • QA Tester Skill:跨单元/集成/E2E层的自动化测试
  • Package Skill:清理并打包项目用于分发
在VS Code的Copilot/Claude聊天中输入技能名称即可使用这些工具。