rag-pipeline-builder

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

RAG Pipeline Builder

RAG管道构建器

Design end-to-end RAG pipelines for accurate document retrieval and generation.
设计端到端的RAG管道,实现精准的文档检索与生成。

Pipeline Architecture

管道架构

Documents → Chunking → Embedding → Vector Store → Retrieval → Reranking → Generation
Documents → Chunking → Embedding → Vector Store → Retrieval → Reranking → Generation

Chunking Strategy

分块策略

python
undefined
python
undefined

Semantic chunking (recommended)

Semantic chunking (recommended)

from langchain.text_splitter import RecursiveCharacterTextSplitter
splitter = RecursiveCharacterTextSplitter( chunk_size=1000, # Characters per chunk chunk_overlap=200, # Overlap between chunks separators=["\n\n", "\n", ". ", " ", ""], length_function=len, )
chunks = splitter.split_text(document.text)
from langchain.text_splitter import RecursiveCharacterTextSplitter
splitter = RecursiveCharacterTextSplitter( chunk_size=1000, # Characters per chunk chunk_overlap=200, # Overlap between chunks separators=["\n\n", "\n", ". ", " ", ""], length_function=len, )
chunks = splitter.split_text(document.text)

Add metadata to each chunk

Add metadata to each chunk

for i, chunk in enumerate(chunks): chunks[i] = { "text": chunk, "metadata": { "source": document.filename, "page": calculate_page(i), "chunk_id": f"{document.id}chunk{i}", } }
undefined
for i, chunk in enumerate(chunks): chunks[i] = { "text": chunk, "metadata": { "source": document.filename, "page": calculate_page(i), "chunk_id": f"{document.id}chunk{i}", } }
undefined

Metadata Schema

元数据Schema

typescript
interface ChunkMetadata {
  // Source information
  document_id: string;
  source: string;
  url?: string;

  // Location
  page?: number;
  section?: string;
  chunk_index: number;

  // Content classification
  content_type: "text" | "code" | "table" | "list";
  language?: string;

  // Timestamps
  created_at: Date;
  updated_at: Date;

  // Retrieval optimization
  keywords: string[];
  summary?: string;
  importance_score?: number;
}
typescript
interface ChunkMetadata {
  // Source information
  document_id: string;
  source: string;
  url?: string;

  // Location
  page?: number;
  section?: string;
  chunk_index: number;

  // Content classification
  content_type: "text" | "code" | "table" | "list";
  language?: string;

  // Timestamps
  created_at: Date;
  updated_at: Date;

  // Retrieval optimization
  keywords: string[];
  summary?: string;
  importance_score?: number;
}

Vector Store Setup

向量存储设置

python
undefined
python
undefined

Pinecone example

Pinecone example

import pinecone from langchain.vectorstores import Pinecone from langchain.embeddings import OpenAIEmbeddings
pinecone.init(api_key="...", environment="...")
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vectorstore = Pinecone.from_documents( documents=chunks, embedding=embeddings, index_name="knowledge-base", namespace="production", )
undefined
import pinecone from langchain.vectorstores import Pinecone from langchain.embeddings import OpenAIEmbeddings
pinecone.init(api_key="...", environment="...")
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vectorstore = Pinecone.from_documents( documents=chunks, embedding=embeddings, index_name="knowledge-base", namespace="production", )
undefined

Retrieval Strategies

检索策略

python
undefined
python
undefined

Hybrid search (dense + sparse)

Hybrid search (dense + sparse)

def hybrid_retrieval(query: str, k: int = 5): # Dense retrieval (semantic) dense_results = vectorstore.similarity_search(query, k=k*2)
# Sparse retrieval (keyword - BM25)
sparse_results = bm25_search(query, k=k*2)

# Combine and rerank
combined = reciprocal_rank_fusion(dense_results, sparse_results)

return combined[:k]
def hybrid_retrieval(query: str, k: int = 5): # Dense retrieval (semantic) dense_results = vectorstore.similarity_search(query, k=k*2)
# Sparse retrieval (keyword - BM25)
sparse_results = bm25_search(query, k=k*2)

# Combine and rerank
combined = reciprocal_rank_fusion(dense_results, sparse_results)

return combined[:k]

Metadata filtering

Metadata filtering

results = vectorstore.similarity_search( query, k=5, filter={ "content_type": "code", "language": "python", } )
undefined
results = vectorstore.similarity_search( query, k=5, filter={ "content_type": "code", "language": "python", } )
undefined

Reranking

重排序

python
from sentence_transformers import CrossEncoder

reranker = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')

def rerank_results(query: str, results: List[Document], top_k: int = 3):
    # Score each result against query
    pairs = [(query, doc.page_content) for doc in results]
    scores = reranker.predict(pairs)

    # Sort by score
    scored_results = list(zip(results, scores))
    scored_results.sort(key=lambda x: x[1], reverse=True)

    return [doc for doc, score in scored_results[:top_k]]
python
from sentence_transformers import CrossEncoder

reranker = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')

def rerank_results(query: str, results: List[Document], top_k: int = 3):
    # Score each result against query
    pairs = [(query, doc.page_content) for doc in results]
    scores = reranker.predict(pairs)

    # Sort by score
    scored_results = list(zip(results, scores))
    scored_results.sort(key=lambda x: x[1], reverse=True)

    return [doc for doc, score in scored_results[:top_k]]

Query Enhancement

查询增强

python
undefined
python
undefined

Query expansion

Query expansion

def expand_query(query: str) -> str: expansion_prompt = f""" Generate 3 alternative phrasings of this query: "{query}"
Return as JSON array of strings.
"""
alternatives = llm(expansion_prompt)
return [query] + alternatives
def expand_query(query: str) -> str: expansion_prompt = f""" Generate 3 alternative phrasings of this query: "{query}"
Return as JSON array of strings.
"""
alternatives = llm(expansion_prompt)
return [query] + alternatives

Multi-query retrieval

Multi-query retrieval

def multi_query_retrieval(query: str, k: int = 5): queries = expand_query(query) all_results = []
for q in queries:
    results = vectorstore.similarity_search(q, k=k)
    all_results.extend(results)

# Deduplicate and rerank
unique_results = deduplicate(all_results)
return rerank_results(query, unique_results, k)
undefined
def multi_query_retrieval(query: str, k: int = 5): queries = expand_query(query) all_results = []
for q in queries:
    results = vectorstore.similarity_search(q, k=k)
    all_results.extend(results)

# Deduplicate and rerank
unique_results = deduplicate(all_results)
return rerank_results(query, unique_results, k)
undefined

Evaluation Plan

评估方案

python
undefined
python
undefined

Define golden dataset

Define golden dataset

golden_dataset = [ { "query": "How do I authenticate users?", "expected_docs": ["auth_guide.md", "user_management.md"], "relevant_chunks": ["chunk_123", "chunk_456"], }, ]
golden_dataset = [ { "query": "How do I authenticate users?", "expected_docs": ["auth_guide.md", "user_management.md"], "relevant_chunks": ["chunk_123", "chunk_456"], }, ]

Metrics

Metrics

def evaluate_retrieval(dataset): results = { "precision": [], "recall": [], "mrr": [], # Mean Reciprocal Rank "ndcg": [] # Normalized Discounted Cumulative Gain }
for item in dataset:
    retrieved = retrieval_fn(item["query"])
    retrieved_ids = [doc.metadata["chunk_id"] for doc in retrieved]

    # Calculate metrics
    relevant = set(item["relevant_chunks"])
    retrieved_set = set(retrieved_ids)

    precision = len(relevant & retrieved_set) / len(retrieved_set)
    recall = len(relevant & retrieved_set) / len(relevant)

    results["precision"].append(precision)
    results["recall"].append(recall)

return {k: sum(v)/len(v) for k, v in results.items()}
undefined
def evaluate_retrieval(dataset): results = { "precision": [], "recall": [], "mrr": [], # Mean Reciprocal Rank "ndcg": [] # Normalized Discounted Cumulative Gain }
for item in dataset:
    retrieved = retrieval_fn(item["query"])
    retrieved_ids = [doc.metadata["chunk_id"] for doc in retrieved]

    # Calculate metrics
    relevant = set(item["relevant_chunks"])
    retrieved_set = set(retrieved_ids)

    precision = len(relevant & retrieved_set) / len(retrieved_set)
    recall = len(relevant & retrieved_set) / len(relevant)

    results["precision"].append(precision)
    results["recall"].append(recall)

return {k: sum(v)/len(v) for k, v in results.items()}
undefined

Context Window Management

上下文窗口管理

python
def fit_context_window(chunks: List[Document], max_tokens: int = 4000):
    """Select chunks that fit in context window"""
    total_tokens = 0
    selected_chunks = []

    for chunk in chunks:
        chunk_tokens = count_tokens(chunk.page_content)
        if total_tokens + chunk_tokens <= max_tokens:
            selected_chunks.append(chunk)
            total_tokens += chunk_tokens
        else:
            break

    return selected_chunks
python
def fit_context_window(chunks: List[Document], max_tokens: int = 4000):
    """Select chunks that fit in context window"""
    total_tokens = 0
    selected_chunks = []

    for chunk in chunks:
        chunk_tokens = count_tokens(chunk.page_content)
        if total_tokens + chunk_tokens <= max_tokens:
            selected_chunks.append(chunk)
            total_tokens += chunk_tokens
        else:
            break

    return selected_chunks

Best Practices

最佳实践

  1. Chunk size: 500-1000 chars for general text
  2. Overlap: 10-20% overlap between chunks
  3. Metadata: Rich metadata for filtering
  4. Hybrid search: Combine semantic + keyword
  5. Reranking: Cross-encoder for final ranking
  6. Evaluation: Golden dataset with metrics
  7. Context management: Don't exceed model limits
  1. Chunk size: 通用文本建议500-1000字符
  2. Overlap: 块间重叠10-20%
  3. Metadata: 丰富元数据以支持过滤
  4. Hybrid search: 结合语义与关键词检索
  5. Reranking: 使用Cross-encoder进行最终排序
  6. Evaluation: 基于黄金数据集开展指标评估
  7. Context management: 不超过模型上下文限制

Output Checklist

输出检查清单

  • Chunking strategy defined
  • Metadata schema documented
  • Vector store configured
  • Retrieval algorithm implemented
  • Reranking pipeline added
  • Query enhancement (optional)
  • Context window management
  • Evaluation dataset created
  • Metrics implementation
  • Performance baseline established
  • 定义分块策略
  • 文档化元数据schema
  • 配置向量存储
  • 实现检索算法
  • 添加重排序管道
  • 查询增强(可选)
  • 上下文窗口管理
  • 创建评估数据集
  • 实现指标计算
  • 建立性能基准