rag-builder

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

RAG Builder Skill

RAG Builder 技能

Build the RAG (Retrieval-Augmented Generation) server using Qdrant.
使用Qdrant构建检索增强生成(RAG)服务器。

Overview

概述

The RAG server provides vector search capabilities for the workspace:
  • Document ingestion with chunking
  • Semantic search across collections
  • Multi-project isolation via collections
RAG服务器为工作区提供向量搜索功能:
  • 支持文档分块嵌入
  • 跨集合的语义搜索
  • 基于集合的多项目隔离

Prerequisites

前置依赖

bash
pip install qdrant-client sentence-transformers mcp fastembed
bash
pip install qdrant-client sentence-transformers mcp fastembed

Using the MCP Server

使用MCP服务器

The Reflex plugin includes a pre-configured Qdrant MCP server. Use these tools:
Reflex插件包含预配置的Qdrant MCP服务器,可使用以下工具:

Store Documents

存储文档

Tool: qdrant-store
Information: "Your document text here..."
Metadata:
  source: "user_upload"
  type: "notes"
Tool: qdrant-store
Information: "你的文档文本内容..."
Metadata:
  source: "user_upload"
  type: "notes"

Search Documents

搜索文档

Tool: qdrant-find
Query: "quantum computing applications"
Tool: qdrant-find
Query: "quantum computing applications"

Build Steps (Custom Server)

构建步骤(自定义服务器)

Step 1: Create the RAG Server

步骤1:创建RAG服务器

File:
mcp/servers/rag-server/server.py
python
#!/usr/bin/env python3
"""
RAG MCP Server - Vector search using Qdrant.
"""

import asyncio
import json
import os
from datetime import datetime
from typing import Optional

from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
from mcp.server import Server
from mcp.server.stdio import stdio_server
from sentence_transformers import SentenceTransformer
文件:
mcp/servers/rag-server/server.py
python
#!/usr/bin/env python3
"""
RAG MCP Server - Vector search using Qdrant.
"""

import asyncio
import json
import os
from datetime import datetime
from typing import Optional

from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
from mcp.server import Server
from mcp.server.stdio import stdio_server
from sentence_transformers import SentenceTransformer

Configuration

Configuration

QDRANT_URL = os.getenv("QDRANT_URL", "http://localhost:6333") EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "all-MiniLM-L6-v2") DEFAULT_COLLECTION = os.getenv("COLLECTION_NAME", "default_memories") CHUNK_SIZE = int(os.getenv("CHUNK_SIZE", "512")) CHUNK_OVERLAP = int(os.getenv("CHUNK_OVERLAP", "50"))
class RAGServer: def init(self): self.server = Server("rag-server")
    # Initialize Qdrant
    self.client = QdrantClient(url=QDRANT_URL)

    # Initialize embedding model
    self.embedder = SentenceTransformer(EMBEDDING_MODEL)
    self.vector_size = self.embedder.get_sentence_embedding_dimension()

    self._setup_tools()

def _ensure_collection(self, name: str):
    """Ensure collection exists."""
    collections = self.client.get_collections().collections
    if not any(c.name == name for c in collections):
        self.client.create_collection(
            collection_name=name,
            vectors_config=VectorParams(
                size=self.vector_size,
                distance=Distance.COSINE
            )
        )

def _chunk_text(self, text: str) -> list[str]:
    """Split text into overlapping chunks."""
    words = text.split()
    chunks = []
    for i in range(0, len(words), CHUNK_SIZE - CHUNK_OVERLAP):
        chunk = " ".join(words[i:i + CHUNK_SIZE])
        if chunk:
            chunks.append(chunk)
    return chunks

def _setup_tools(self):

    @self.server.tool()
    async def ingest(
        content: str,
        collection: str = DEFAULT_COLLECTION,
        metadata: Optional[dict] = None,
        doc_id: Optional[str] = None
    ) -> str:
        """
        Ingest a document into the vector database.

        Args:
            content: Document text to ingest
            collection: Collection name (use project name for isolation)
            metadata: Optional metadata (source, type, date, etc.)
            doc_id: Optional custom document ID
        """
        self._ensure_collection(collection)
        chunks = self._chunk_text(content)

        base_id = doc_id or f"doc_{datetime.now().timestamp()}"

        # Generate embeddings
        embeddings = self.embedder.encode(chunks).tolist()

        # Prepare metadata
        base_meta = metadata or {}
        base_meta["ingested_at"] = datetime.now().isoformat()
        base_meta["source_doc"] = base_id

        # Create points
        points = [
            PointStruct(
                id=hash(f"{base_id}_chunk_{i}") % (2**63),
                vector=embeddings[i],
                payload={**base_meta, "chunk_index": i, "content": chunk}
            )
            for i, chunk in enumerate(chunks)
        ]

        self.client.upsert(collection_name=collection, points=points)

        return json.dumps({
            "status": "success",
            "collection": collection,
            "chunks": len(chunks),
            "doc_id": base_id
        })

    @self.server.tool()
    async def search(
        query: str,
        collection: str = DEFAULT_COLLECTION,
        n_results: int = 5
    ) -> str:
        """
        Search for relevant documents.

        Args:
            query: Search query
            collection: Collection to search
            n_results: Number of results (default 5)
        """
        self._ensure_collection(collection)

        query_embedding = self.embedder.encode([query])[0].tolist()

        results = self.client.search(
            collection_name=collection,
            query_vector=query_embedding,
            limit=n_results
        )

        formatted = [
            {
                "id": str(r.id),
                "content": r.payload.get("content", ""),
                "metadata": {k: v for k, v in r.payload.items() if k != "content"},
                "score": r.score
            }
            for r in results
        ]

        return json.dumps({
            "query": query,
            "collection": collection,
            "results": formatted
        })

    @self.server.tool()
    async def list_collections() -> str:
        """List all collections."""
        collections = self.client.get_collections()
        return json.dumps({
            "collections": [
                {"name": c.name}
                for c in collections.collections
            ]
        })

async def run(self):
    async with stdio_server() as (read_stream, write_stream):
        await self.server.run(read_stream, write_stream)
def main(): server = RAGServer() asyncio.run(server.run())
if name == "main": main()
undefined
QDRANT_URL = os.getenv("QDRANT_URL", "http://localhost:6333") EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "all-MiniLM-L6-v2") DEFAULT_COLLECTION = os.getenv("COLLECTION_NAME", "default_memories") CHUNK_SIZE = int(os.getenv("CHUNK_SIZE", "512")) CHUNK_OVERLAP = int(os.getenv("CHUNK_OVERLAP", "50"))
class RAGServer: def init(self): self.server = Server("rag-server")
    # Initialize Qdrant
    self.client = QdrantClient(url=QDRANT_URL)

    # Initialize embedding model
    self.embedder = SentenceTransformer(EMBEDDING_MODEL)
    self.vector_size = self.embedder.get_sentence_embedding_dimension()

    self._setup_tools()

def _ensure_collection(self, name: str):
    """Ensure collection exists."""
    collections = self.client.get_collections().collections
    if not any(c.name == name for c in collections):
        self.client.create_collection(
            collection_name=name,
            vectors_config=VectorParams(
                size=self.vector_size,
                distance=Distance.COSINE
            )
        )

def _chunk_text(self, text: str) -> list[str]:
    """Split text into overlapping chunks."""
    words = text.split()
    chunks = []
    for i in range(0, len(words), CHUNK_SIZE - CHUNK_OVERLAP):
        chunk = " ".join(words[i:i + CHUNK_SIZE])
        if chunk:
            chunks.append(chunk)
    return chunks

def _setup_tools(self):

    @self.server.tool()
    async def ingest(
        content: str,
        collection: str = DEFAULT_COLLECTION,
        metadata: Optional[dict] = None,
        doc_id: Optional[str] = None
    ) -> str:
        """
        Ingest a document into the vector database.

        Args:
            content: Document text to ingest
            collection: Collection name (use project name for isolation)
            metadata: Optional metadata (source, type, date, etc.)
            doc_id: Optional custom document ID
        """
        self._ensure_collection(collection)
        chunks = self._chunk_text(content)

        base_id = doc_id or f"doc_{datetime.now().timestamp()}"

        # Generate embeddings
        embeddings = self.embedder.encode(chunks).tolist()

        # Prepare metadata
        base_meta = metadata or {}
        base_meta["ingested_at"] = datetime.now().isoformat()
        base_meta["source_doc"] = base_id

        # Create points
        points = [
            PointStruct(
                id=hash(f"{base_id}_chunk_{i}") % (2**63),
                vector=embeddings[i],
                payload={**base_meta, "chunk_index": i, "content": chunk}
            )
            for i, chunk in enumerate(chunks)
        ]

        self.client.upsert(collection_name=collection, points=points)

        return json.dumps({
            "status": "success",
            "collection": collection,
            "chunks": len(chunks),
            "doc_id": base_id
        })

    @self.server.tool()
    async def search(
        query: str,
        collection: str = DEFAULT_COLLECTION,
        n_results: int = 5
    ) -> str:
        """
        Search for relevant documents.

        Args:
            query: Search query
            collection: Collection to search
            n_results: Number of results (default 5)
        """
        self._ensure_collection(collection)

        query_embedding = self.embedder.encode([query])[0].tolist()

        results = self.client.search(
            collection_name=collection,
            query_vector=query_embedding,
            limit=n_results
        )

        formatted = [
            {
                "id": str(r.id),
                "content": r.payload.get("content", ""),
                "metadata": {k: v for k, v in r.payload.items() if k != "content"},
                "score": r.score
            }
            for r in results
        ]

        return json.dumps({
            "query": query,
            "collection": collection,
            "results": formatted
        })

    @self.server.tool()
    async def list_collections() -> str:
        """List all collections."""
        collections = self.client.get_collections()
        return json.dumps({
            "collections": [
                {"name": c.name}
                for c in collections.collections
            ]
        })

async def run(self):
    async with stdio_server() as (read_stream, write_stream):
        await self.server.run(read_stream, write_stream)
def main(): server = RAGServer() asyncio.run(server.run())
if name == "main": main()
undefined

Step 2: Create Requirements

步骤2:创建依赖文件

File:
mcp/servers/rag-server/requirements.txt
mcp>=1.0.0
qdrant-client>=1.7.0
sentence-transformers>=2.2.0
文件:
mcp/servers/rag-server/requirements.txt
mcp>=1.0.0
qdrant-client>=1.7.0
sentence-transformers>=2.2.0

Step 3: Create Test Script

步骤3:创建测试脚本

File:
mcp/servers/rag-server/test_rag.py
python
#!/usr/bin/env python3
"""Quick test for RAG server components."""

import os
import sys
文件:
mcp/servers/rag-server/test_rag.py
python
#!/usr/bin/env python3
"""Quick test for RAG server components."""

import os
import sys

Set up path

Set up path

sys.path.insert(0, os.path.dirname(file))
def test_qdrant(): """Test Qdrant is working.""" from qdrant_client import QdrantClient client = QdrantClient(url="http://localhost:6333") collections = client.get_collections() print(f"✅ Qdrant working, {len(collections.collections)} collections")
def test_embeddings(): """Test embedding model.""" from sentence_transformers import SentenceTransformer model = SentenceTransformer('all-MiniLM-L6-v2') embedding = model.encode(["test sentence"]) assert embedding.shape == (1, 384) print("✅ Embeddings working")
def test_server_init(): """Test server initialization.""" from server import RAGServer server = RAGServer() assert server.client is not None assert server.embedder is not None print("✅ Server initialization working")
if name == "main": test_qdrant() test_embeddings() test_server_init() print("\n✅ All RAG tests passed!")
undefined
sys.path.insert(0, os.path.dirname(file))
def test_qdrant(): """Test Qdrant is working.""" from qdrant_client import QdrantClient client = QdrantClient(url="http://localhost:6333") collections = client.get_collections() print(f"✅ Qdrant working, {len(collections.collections)} collections")
def test_embeddings(): """Test embedding model.""" from sentence_transformers import SentenceTransformer model = SentenceTransformer('all-MiniLM-L6-v2') embedding = model.encode(["test sentence"]) assert embedding.shape == (1, 384) print("✅ Embeddings working")
def test_server_init(): """Test server initialization.""" from server import RAGServer server = RAGServer() assert server.client is not None assert server.embedder is not None print("✅ Server initialization working")
if name == "main": test_qdrant() test_embeddings() test_server_init() print("\n✅ All RAG tests passed!")
undefined

Verification

验证

bash
undefined
bash
undefined

Start Qdrant (if using Docker)

启动Qdrant(使用Docker)

docker run -d -p 6333:6333 qdrant/qdrant
docker run -d -p 6333:6333 qdrant/qdrant

Navigate to server directory

进入服务器目录

cd mcp/servers/rag-server
cd mcp/servers/rag-server

Install dependencies

安装依赖

pip install -r requirements.txt
pip install -r requirements.txt

Run tests

运行测试

python test_rag.py
python test_rag.py

Expected output:

预期输出:

✅ Qdrant working, 0 collections

✅ Qdrant working, 0 collections

✅ Embeddings working

✅ Embeddings working

✅ Server initialization working

✅ Server initialization working

✅ All RAG tests passed!

✅ All RAG tests passed!

undefined
undefined

Usage Examples

使用示例

Once running as MCP server:
python
undefined
当作为MCP服务器运行后:
python
undefined

Ingest a document

嵌入文档

await ingest( content="Your document text here...", collection="project_alpha_docs", metadata={"source": "user_upload", "type": "notes"} )
await ingest( content="你的文档文本内容...", collection="project_alpha_docs", metadata={"source": "user_upload", "type": "notes"} )

Search

搜索文档

results = await search( query="quantum computing applications", collection="project_alpha_docs", n_results=5 )
results = await search( query="quantum computing applications", collection="project_alpha_docs", n_results=5 )

List collections

列出所有集合

collections = await list_collections()
undefined
collections = await list_collections()
undefined

Multi-Project Isolation

多项目隔离

python
undefined
python
undefined

Each project gets its own collections

每个项目拥有独立的集合

"project_alpha_docs" # Project Alpha documentation "project_alpha_code" # Project Alpha code snippets "project_beta_docs" # Project Beta documentation "shared_knowledge" # Cross-project shared info
undefined
"project_alpha_docs" # Alpha项目文档 "project_alpha_code" # Alpha项目代码片段 "project_beta_docs" # Beta项目文档 "shared_knowledge" # 跨项目共享信息
undefined

Configuration

配置

Environment variables:
bash
QDRANT_URL=http://localhost:6333
EMBEDDING_MODEL=all-MiniLM-L6-v2
COLLECTION_NAME=default_memories
CHUNK_SIZE=512
CHUNK_OVERLAP=50
环境变量:
bash
QDRANT_URL=http://localhost:6333
EMBEDDING_MODEL=all-MiniLM-L6-v2
COLLECTION_NAME=default_memories
CHUNK_SIZE=512
CHUNK_OVERLAP=50

After Building

构建完成后

  1. ✅ Run tests to verify
  2. Update
    CLAUDE.md
    status
  3. Proceed to
    skills/router-builder/SKILL.md
  1. ✅ 运行测试验证功能
  2. 更新
    CLAUDE.md
    状态
  3. 继续查看
    skills/router-builder/SKILL.md

Refinement Notes

优化笔记

Add notes here as we build and discover what works/doesn't work.
  • Initial implementation
  • Tested with real documents
  • Integrated with MCP config
  • Performance tuned
在构建过程中记录可行/不可行的方案。
  • 初始实现
  • 真实文档测试
  • 集成MCP配置
  • 性能调优