embedding-strategies

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Embedding Strategies

嵌入模型策略

Guide to selecting and optimizing embedding models for vector search applications.
为向量搜索应用选择并优化嵌入模型的指南。

When to Use This Skill

何时使用该技能

  • Choosing embedding models for RAG
  • Optimizing chunking strategies
  • Fine-tuning embeddings for domains
  • Comparing embedding model performance
  • Reducing embedding dimensions
  • Handling multilingual content
  • 为RAG选择嵌入模型
  • 优化分块策略
  • 针对特定领域微调嵌入向量
  • 对比嵌入模型性能
  • 降低嵌入向量维度
  • 处理多语言内容

Core Concepts

核心概念

1. Embedding Model Comparison (2026)

1. 嵌入模型对比(2026年)

ModelDimensionsMax TokensBest For
voyage-3-large102432000Claude apps (Anthropic recommended)
voyage-3102432000Claude apps, cost-effective
voyage-code-3102432000Code search
voyage-finance-2102432000Financial documents
voyage-law-2102432000Legal documents
text-embedding-3-large30728191OpenAI apps, high accuracy
text-embedding-3-small15368191OpenAI apps, cost-effective
bge-large-en-v1.51024512Open source, local deployment
all-MiniLM-L6-v2384256Fast, lightweight
multilingual-e5-large1024512Multi-language
模型名称向量维度最大Token数最佳适用场景
voyage-3-large102432000Claude应用(Anthropic推荐)
voyage-3102432000Claude应用,高性价比
voyage-code-3102432000代码搜索
voyage-finance-2102432000金融文档
voyage-law-2102432000法律文档
text-embedding-3-large30728191OpenAI应用,高精度
text-embedding-3-small15368191OpenAI应用,高性价比
bge-large-en-v1.51024512开源,本地部署
all-MiniLM-L6-v2384256快速、轻量级
multilingual-e5-large1024512多语言场景

2. Embedding Pipeline

2. 嵌入模型流程

Document → Chunking → Preprocessing → Embedding Model → Vector
        [Overlap, Size]  [Clean, Normalize]  [API/Local]
Document → Chunking → Preprocessing → Embedding Model → Vector
        [Overlap, Size]  [Clean, Normalize]  [API/Local]

Templates

模板示例

Template 1: Voyage AI Embeddings (Recommended for Claude)

模板1:Voyage AI嵌入向量(Claude推荐)

python
from langchain_voyageai import VoyageAIEmbeddings
from typing import List
import os
python
from langchain_voyageai import VoyageAIEmbeddings
from typing import List
import os

Initialize Voyage AI embeddings (recommended by Anthropic for Claude)

Initialize Voyage AI embeddings (recommended by Anthropic for Claude)

embeddings = VoyageAIEmbeddings( model="voyage-3-large", voyage_api_key=os.environ.get("VOYAGE_API_KEY") )
def get_embeddings(texts: List[str]) -> List[List[float]]: """Get embeddings from Voyage AI.""" return embeddings.embed_documents(texts)
def get_query_embedding(query: str) -> List[float]: """Get single query embedding.""" return embeddings.embed_query(query)
embeddings = VoyageAIEmbeddings( model="voyage-3-large", voyage_api_key=os.environ.get("VOYAGE_API_KEY") )
def get_embeddings(texts: List[str]) -> List[List[float]]: """Get embeddings from Voyage AI.""" return embeddings.embed_documents(texts)
def get_query_embedding(query: str) -> List[float]: """Get single query embedding.""" return embeddings.embed_query(query)

Specialized models for domains

Specialized models for domains

code_embeddings = VoyageAIEmbeddings(model="voyage-code-3") finance_embeddings = VoyageAIEmbeddings(model="voyage-finance-2") legal_embeddings = VoyageAIEmbeddings(model="voyage-law-2")
undefined
code_embeddings = VoyageAIEmbeddings(model="voyage-code-3") finance_embeddings = VoyageAIEmbeddings(model="voyage-finance-2") legal_embeddings = VoyageAIEmbeddings(model="voyage-law-2")
undefined

Template 2: OpenAI Embeddings

模板2:OpenAI嵌入向量

python
from openai import OpenAI
from typing import List
import numpy as np

client = OpenAI()

def get_embeddings(
    texts: List[str],
    model: str = "text-embedding-3-small",
    dimensions: int = None
) -> List[List[float]]:
    """Get embeddings from OpenAI with optional dimension reduction."""
    # Handle batching for large lists
    batch_size = 100
    all_embeddings = []

    for i in range(0, len(texts), batch_size):
        batch = texts[i:i + batch_size]

        kwargs = {"input": batch, "model": model}
        if dimensions:
            # Matryoshka dimensionality reduction
            kwargs["dimensions"] = dimensions

        response = client.embeddings.create(**kwargs)
        embeddings = [item.embedding for item in response.data]
        all_embeddings.extend(embeddings)

    return all_embeddings


def get_embedding(text: str, **kwargs) -> List[float]:
    """Get single embedding."""
    return get_embeddings([text], **kwargs)[0]
python
from openai import OpenAI
from typing import List
import numpy as np

client = OpenAI()

def get_embeddings(
    texts: List[str],
    model: str = "text-embedding-3-small",
    dimensions: int = None
) -> List[List[float]]:
    """Get embeddings from OpenAI with optional dimension reduction."""
    # Handle batching for large lists
    batch_size = 100
    all_embeddings = []

    for i in range(0, len(texts), batch_size):
        batch = texts[i:i + batch_size]

        kwargs = {"input": batch, "model": model}
        if dimensions:
            # Matryoshka dimensionality reduction
            kwargs["dimensions"] = dimensions

        response = client.embeddings.create(**kwargs)
        embeddings = [item.embedding for item in response.data]
        all_embeddings.extend(embeddings)

    return all_embeddings


def get_embedding(text: str, **kwargs) -> List[float]:
    """Get single embedding."""
    return get_embeddings([text], **kwargs)[0]

Dimension reduction with Matryoshka embeddings

Dimension reduction with Matryoshka embeddings

def get_reduced_embedding(text: str, dimensions: int = 512) -> List[float]: """Get embedding with reduced dimensions (Matryoshka).""" return get_embedding( text, model="text-embedding-3-small", dimensions=dimensions )
undefined
def get_reduced_embedding(text: str, dimensions: int = 512) -> List[float]: """Get embedding with reduced dimensions (Matryoshka).""" return get_embedding( text, model="text-embedding-3-small", dimensions=dimensions )
undefined

Template 3: Local Embeddings with Sentence Transformers

模板3:使用Sentence Transformers的本地嵌入向量

python
from sentence_transformers import SentenceTransformer
from typing import List, Optional
import numpy as np

class LocalEmbedder:
    """Local embedding with sentence-transformers."""

    def __init__(
        self,
        model_name: str = "BAAI/bge-large-en-v1.5",
        device: str = "cuda"
    ):
        self.model = SentenceTransformer(model_name, device=device)
        self.model_name = model_name

    def embed(
        self,
        texts: List[str],
        normalize: bool = True,
        show_progress: bool = False
    ) -> np.ndarray:
        """Embed texts with optional normalization."""
        embeddings = self.model.encode(
            texts,
            normalize_embeddings=normalize,
            show_progress_bar=show_progress,
            convert_to_numpy=True
        )
        return embeddings

    def embed_query(self, query: str) -> np.ndarray:
        """Embed a query with appropriate prefix for retrieval models."""
        # BGE and similar models benefit from query prefix
        if "bge" in self.model_name.lower():
            query = f"Represent this sentence for searching relevant passages: {query}"
        return self.embed([query])[0]

    def embed_documents(self, documents: List[str]) -> np.ndarray:
        """Embed documents for indexing."""
        return self.embed(documents)
python
from sentence_transformers import SentenceTransformer
from typing import List, Optional
import numpy as np

class LocalEmbedder:
    """Local embedding with sentence-transformers."""

    def __init__(
        self,
        model_name: str = "BAAI/bge-large-en-v1.5",
        device: str = "cuda"
    ):
        self.model = SentenceTransformer(model_name, device=device)
        self.model_name = model_name

    def embed(
        self,
        texts: List[str],
        normalize: bool = True,
        show_progress: bool = False
    ) -> np.ndarray:
        """Embed texts with optional normalization."""
        embeddings = self.model.encode(
            texts,
            normalize_embeddings=normalize,
            show_progress_bar=show_progress,
            convert_to_numpy=True
        )
        return embeddings

    def embed_query(self, query: str) -> np.ndarray:
        """Embed a query with appropriate prefix for retrieval models."""
        # BGE and similar models benefit from query prefix
        if "bge" in self.model_name.lower():
            query = f"Represent this sentence for searching relevant passages: {query}"
        return self.embed([query])[0]

    def embed_documents(self, documents: List[str]) -> np.ndarray:
        """Embed documents for indexing."""
        return self.embed(documents)

E5 model with instructions

E5 model with instructions

class E5Embedder: def init(self, model_name: str = "intfloat/multilingual-e5-large"): self.model = SentenceTransformer(model_name)
def embed_query(self, query: str) -> np.ndarray:
    """E5 requires 'query:' prefix for queries."""
    return self.model.encode(f"query: {query}")

def embed_document(self, document: str) -> np.ndarray:
    """E5 requires 'passage:' prefix for documents."""
    return self.model.encode(f"passage: {document}")
undefined
class E5Embedder: def init(self, model_name: str = "intfloat/multilingual-e5-large"): self.model = SentenceTransformer(model_name)
def embed_query(self, query: str) -> np.ndarray:
    """E5 requires 'query:' prefix for queries."""
    return self.model.encode(f"query: {query}")

def embed_document(self, document: str) -> np.ndarray:
    """E5 requires 'passage:' prefix for documents."""
    return self.model.encode(f"passage: {document}")
undefined

Template 4: Chunking Strategies

模板4:分块策略

python
from typing import List, Tuple
import re

def chunk_by_tokens(
    text: str,
    chunk_size: int = 512,
    chunk_overlap: int = 50,
    tokenizer=None
) -> List[str]:
    """Chunk text by token count."""
    import tiktoken
    tokenizer = tokenizer or tiktoken.get_encoding("cl100k_base")

    tokens = tokenizer.encode(text)
    chunks = []

    start = 0
    while start < len(tokens):
        end = start + chunk_size
        chunk_tokens = tokens[start:end]
        chunk_text = tokenizer.decode(chunk_tokens)
        chunks.append(chunk_text)
        start = end - chunk_overlap

    return chunks


def chunk_by_sentences(
    text: str,
    max_chunk_size: int = 1000,
    min_chunk_size: int = 100
) -> List[str]:
    """Chunk text by sentences, respecting size limits."""
    import nltk
    sentences = nltk.sent_tokenize(text)

    chunks = []
    current_chunk = []
    current_size = 0

    for sentence in sentences:
        sentence_size = len(sentence)

        if current_size + sentence_size > max_chunk_size and current_chunk:
            chunks.append(" ".join(current_chunk))
            current_chunk = []
            current_size = 0

        current_chunk.append(sentence)
        current_size += sentence_size

    if current_chunk:
        chunks.append(" ".join(current_chunk))

    return chunks


def chunk_by_semantic_sections(
    text: str,
    headers_pattern: str = r'^#{1,3}\s+.+$'
) -> List[Tuple[str, str]]:
    """Chunk markdown by headers, preserving hierarchy."""
    lines = text.split('\n')
    chunks = []
    current_header = ""
    current_content = []

    for line in lines:
        if re.match(headers_pattern, line, re.MULTILINE):
            if current_content:
                chunks.append((current_header, '\n'.join(current_content)))
            current_header = line
            current_content = []
        else:
            current_content.append(line)

    if current_content:
        chunks.append((current_header, '\n'.join(current_content)))

    return chunks


def recursive_character_splitter(
    text: str,
    chunk_size: int = 1000,
    chunk_overlap: int = 200,
    separators: List[str] = None
) -> List[str]:
    """LangChain-style recursive splitter."""
    separators = separators or ["\n\n", "\n", ". ", " ", ""]

    def split_text(text: str, separators: List[str]) -> List[str]:
        if not text:
            return []

        separator = separators[0]
        remaining_separators = separators[1:]

        if separator == "":
            # Character-level split
            return [text[i:i+chunk_size] for i in range(0, len(text), chunk_size - chunk_overlap)]

        splits = text.split(separator)
        chunks = []
        current_chunk = []
        current_length = 0

        for split in splits:
            split_length = len(split) + len(separator)

            if current_length + split_length > chunk_size and current_chunk:
                chunk_text = separator.join(current_chunk)

                # Recursively split if still too large
                if len(chunk_text) > chunk_size and remaining_separators:
                    chunks.extend(split_text(chunk_text, remaining_separators))
                else:
                    chunks.append(chunk_text)

                # Start new chunk with overlap
                overlap_splits = []
                overlap_length = 0
                for s in reversed(current_chunk):
                    if overlap_length + len(s) <= chunk_overlap:
                        overlap_splits.insert(0, s)
                        overlap_length += len(s)
                    else:
                        break
                current_chunk = overlap_splits
                current_length = overlap_length

            current_chunk.append(split)
            current_length += split_length

        if current_chunk:
            chunks.append(separator.join(current_chunk))

        return chunks

    return split_text(text, separators)
python
from typing import List, Tuple
import re

def chunk_by_tokens(
    text: str,
    chunk_size: int = 512,
    chunk_overlap: int = 50,
    tokenizer=None
) -> List[str]:
    """Chunk text by token count."""
    import tiktoken
    tokenizer = tokenizer or tiktoken.get_encoding("cl100k_base")

    tokens = tokenizer.encode(text)
    chunks = []

    start = 0
    while start < len(tokens):
        end = start + chunk_size
        chunk_tokens = tokens[start:end]
        chunk_text = tokenizer.decode(chunk_tokens)
        chunks.append(chunk_text)
        start = end - chunk_overlap

    return chunks


def chunk_by_sentences(
    text: str,
    max_chunk_size: int = 1000,
    min_chunk_size: int = 100
) -> List[str]:
    """Chunk text by sentences, respecting size limits."""
    import nltk
    sentences = nltk.sent_tokenize(text)

    chunks = []
    current_chunk = []
    current_size = 0

    for sentence in sentences:
        sentence_size = len(sentence)

        if current_size + sentence_size > max_chunk_size and current_chunk:
            chunks.append(" ".join(current_chunk))
            current_chunk = []
            current_size = 0

        current_chunk.append(sentence)
        current_size += sentence_size

    if current_chunk:
        chunks.append(" ".join(current_chunk))

    return chunks


def chunk_by_semantic_sections(
    text: str,
    headers_pattern: str = r'^#{1,3}\s+.+$'
) -> List[Tuple[str, str]]:
    """Chunk markdown by headers, preserving hierarchy."""
    lines = text.split('\n')
    chunks = []
    current_header = ""
    current_content = []

    for line in lines:
        if re.match(headers_pattern, line, re.MULTILINE):
            if current_content:
                chunks.append((current_header, '\n'.join(current_content)))
            current_header = line
            current_content = []
        else:
            current_content.append(line)

    if current_content:
        chunks.append((current_header, '\n'.join(current_content)))

    return chunks


def recursive_character_splitter(
    text: str,
    chunk_size: int = 1000,
    chunk_overlap: int = 200,
    separators: List[str] = None
) -> List[str]:
    """LangChain-style recursive splitter."""
    separators = separators or ["\n\n", "\n", ". ", " ", ""]

    def split_text(text: str, separators: List[str]) -> List[str]:
        if not text:
            return []

        separator = separators[0]
        remaining_separators = separators[1:]

        if separator == "":
            # Character-level split
            return [text[i:i+chunk_size] for i in range(0, len(text), chunk_size - chunk_overlap)]

        splits = text.split(separator)
        chunks = []
        current_chunk = []
        current_length = 0

        for split in splits:
            split_length = len(split) + len(separator)

            if current_length + split_length > chunk_size and current_chunk:
                chunk_text = separator.join(current_chunk)

                # Recursively split if still too large
                if len(chunk_text) > chunk_size and remaining_separators:
                    chunks.extend(split_text(chunk_text, remaining_separators))
                else:
                    chunks.append(chunk_text)

                # Start new chunk with overlap
                overlap_splits = []
                overlap_length = 0
                for s in reversed(current_chunk):
                    if overlap_length + len(s) <= chunk_overlap:
                        overlap_splits.insert(0, s)
                        overlap_length += len(s)
                    else:
                        break
                current_chunk = overlap_splits
                current_length = overlap_length

            current_chunk.append(split)
            current_length += split_length

        if current_chunk:
            chunks.append(separator.join(current_chunk))

        return chunks

    return split_text(text, separators)

Template 5: Domain-Specific Embedding Pipeline

模板5:特定领域嵌入模型流程

python
import re
from typing import List, Optional
from dataclasses import dataclass

@dataclass
class EmbeddedDocument:
    id: str
    document_id: str
    chunk_index: int
    text: str
    embedding: List[float]
    metadata: dict

class DomainEmbeddingPipeline:
    """Pipeline for domain-specific embeddings."""

    def __init__(
        self,
        embedding_model: str = "voyage-3-large",
        chunk_size: int = 512,
        chunk_overlap: int = 50,
        preprocessing_fn=None
    ):
        self.embeddings = VoyageAIEmbeddings(model=embedding_model)
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.preprocess = preprocessing_fn or self._default_preprocess

    def _default_preprocess(self, text: str) -> str:
        """Default preprocessing."""
        # Remove excessive whitespace
        text = re.sub(r'\s+', ' ', text)
        # Remove special characters (customize for your domain)
        text = re.sub(r'[^\w\s.,!?-]', '', text)
        return text.strip()

    async def process_documents(
        self,
        documents: List[dict],
        id_field: str = "id",
        content_field: str = "content",
        metadata_fields: Optional[List[str]] = None
    ) -> List[EmbeddedDocument]:
        """Process documents for vector storage."""
        processed = []

        for doc in documents:
            content = doc[content_field]
            doc_id = doc[id_field]

            # Preprocess
            cleaned = self.preprocess(content)

            # Chunk
            chunks = chunk_by_tokens(
                cleaned,
                self.chunk_size,
                self.chunk_overlap
            )

            # Create embeddings
            embeddings = await self.embeddings.aembed_documents(chunks)

            # Create records
            for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
                metadata = {"document_id": doc_id, "chunk_index": i}

                # Add specified metadata fields
                if metadata_fields:
                    for field in metadata_fields:
                        if field in doc:
                            metadata[field] = doc[field]

                processed.append(EmbeddedDocument(
                    id=f"{doc_id}_chunk_{i}",
                    document_id=doc_id,
                    chunk_index=i,
                    text=chunk,
                    embedding=embedding,
                    metadata=metadata
                ))

        return processed
python
import re
from typing import List, Optional
from dataclasses import dataclass

@dataclass
class EmbeddedDocument:
    id: str
    document_id: str
    chunk_index: int
    text: str
    embedding: List[float]
    metadata: dict

class DomainEmbeddingPipeline:
    """Pipeline for domain-specific embeddings."""

    def __init__(
        self,
        embedding_model: str = "voyage-3-large",
        chunk_size: int = 512,
        chunk_overlap: int = 50,
        preprocessing_fn=None
    ):
        self.embeddings = VoyageAIEmbeddings(model=embedding_model)
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.preprocess = preprocessing_fn or self._default_preprocess

    def _default_preprocess(self, text: str) -> str:
        """Default preprocessing."""
        # Remove excessive whitespace
        text = re.sub(r'\s+', ' ', text)
        # Remove special characters (customize for your domain)
        text = re.sub(r'[^\w\s.,!?-]', '', text)
        return text.strip()

    async def process_documents(
        self,
        documents: List[dict],
        id_field: str = "id",
        content_field: str = "content",
        metadata_fields: Optional[List[str]] = None
    ) -> List[EmbeddedDocument]:
        """Process documents for vector storage."""
        processed = []

        for doc in documents:
            content = doc[content_field]
            doc_id = doc[id_field]

            # Preprocess
            cleaned = self.preprocess(content)

            # Chunk
            chunks = chunk_by_tokens(
                cleaned,
                self.chunk_size,
                self.chunk_overlap
            )

            # Create embeddings
            embeddings = await self.embeddings.aembed_documents(chunks)

            # Create records
            for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
                metadata = {"document_id": doc_id, "chunk_index": i}

                # Add specified metadata fields
                if metadata_fields:
                    for field in metadata_fields:
                        if field in doc:
                            metadata[field] = doc[field]

                processed.append(EmbeddedDocument(
                    id=f"{doc_id}_chunk_{i}",
                    document_id=doc_id,
                    chunk_index=i,
                    text=chunk,
                    embedding=embedding,
                    metadata=metadata
                ))

        return processed

Code-specific pipeline

Code-specific pipeline

class CodeEmbeddingPipeline: """Specialized pipeline for code embeddings."""
def __init__(self):
    # Use Voyage's code-specific model
    self.embeddings = VoyageAIEmbeddings(model="voyage-code-3")

def chunk_code(self, code: str, language: str) -> List[dict]:
    """Chunk code by functions/classes using tree-sitter."""
    try:
        import tree_sitter_languages
        parser = tree_sitter_languages.get_parser(language)
        tree = parser.parse(bytes(code, "utf8"))

        chunks = []
        # Extract function and class definitions
        self._extract_nodes(tree.root_node, code, chunks)
        return chunks
    except ImportError:
        # Fallback to simple chunking
        return [{"text": code, "type": "module"}]

def _extract_nodes(self, node, source_code: str, chunks: list):
    """Recursively extract function/class definitions."""
    if node.type in ['function_definition', 'class_definition', 'method_definition']:
        text = source_code[node.start_byte:node.end_byte]
        chunks.append({
            "text": text,
            "type": node.type,
            "name": self._get_name(node),
            "start_line": node.start_point[0],
            "end_line": node.end_point[0]
        })
    for child in node.children:
        self._extract_nodes(child, source_code, chunks)

def _get_name(self, node) -> str:
    """Extract name from function/class node."""
    for child in node.children:
        if child.type == 'identifier' or child.type == 'name':
            return child.text.decode('utf8')
    return "unknown"

async def embed_with_context(
    self,
    chunk: str,
    context: str = ""
) -> List[float]:
    """Embed code with surrounding context."""
    if context:
        combined = f"Context: {context}\n\nCode:\n{chunk}"
    else:
        combined = chunk
    return await self.embeddings.aembed_query(combined)
undefined
class CodeEmbeddingPipeline: """Specialized pipeline for code embeddings."""
def __init__(self):
    # Use Voyage's code-specific model
    self.embeddings = VoyageAIEmbeddings(model="voyage-code-3")

def chunk_code(self, code: str, language: str) -> List[dict]:
    """Chunk code by functions/classes using tree-sitter."""
    try:
        import tree_sitter_languages
        parser = tree_sitter_languages.get_parser(language)
        tree = parser.parse(bytes(code, "utf8"))

        chunks = []
        # Extract function and class definitions
        self._extract_nodes(tree.root_node, code, chunks)
        return chunks
    except ImportError:
        # Fallback to simple chunking
        return [{"text": code, "type": "module"}]

def _extract_nodes(self, node, source_code: str, chunks: list):
    """Recursively extract function/class definitions."""
    if node.type in ['function_definition', 'class_definition', 'method_definition']:
        text = source_code[node.start_byte:node.end_byte]
        chunks.append({
            "text": text,
            "type": node.type,
            "name": self._get_name(node),
            "start_line": node.start_point[0],
            "end_line": node.end_point[0]
        })
    for child in node.children:
        self._extract_nodes(child, source_code, chunks)

def _get_name(self, node) -> str:
    """Extract name from function/class node."""
    for child in node.children:
        if child.type == 'identifier' or child.type == 'name':
            return child.text.decode('utf8')
    return "unknown"

async def embed_with_context(
    self,
    chunk: str,
    context: str = ""
) -> List[float]:
    """Embed code with surrounding context."""
    if context:
        combined = f"Context: {context}\n\nCode:\n{chunk}"
    else:
        combined = chunk
    return await self.embeddings.aembed_query(combined)
undefined

Template 6: Embedding Quality Evaluation

模板6:嵌入模型质量评估

python
import numpy as np
from typing import List, Dict

def evaluate_retrieval_quality(
    queries: List[str],
    relevant_docs: List[List[str]],  # List of relevant doc IDs per query
    retrieved_docs: List[List[str]],  # List of retrieved doc IDs per query
    k: int = 10
) -> Dict[str, float]:
    """Evaluate embedding quality for retrieval."""

    def precision_at_k(relevant: set, retrieved: List[str], k: int) -> float:
        retrieved_k = retrieved[:k]
        relevant_retrieved = len(set(retrieved_k) & relevant)
        return relevant_retrieved / k if k > 0 else 0

    def recall_at_k(relevant: set, retrieved: List[str], k: int) -> float:
        retrieved_k = retrieved[:k]
        relevant_retrieved = len(set(retrieved_k) & relevant)
        return relevant_retrieved / len(relevant) if relevant else 0

    def mrr(relevant: set, retrieved: List[str]) -> float:
        for i, doc in enumerate(retrieved):
            if doc in relevant:
                return 1 / (i + 1)
        return 0

    def ndcg_at_k(relevant: set, retrieved: List[str], k: int) -> float:
        dcg = sum(
            1 / np.log2(i + 2) if doc in relevant else 0
            for i, doc in enumerate(retrieved[:k])
        )
        ideal_dcg = sum(1 / np.log2(i + 2) for i in range(min(len(relevant), k)))
        return dcg / ideal_dcg if ideal_dcg > 0 else 0

    metrics = {
        f"precision@{k}": [],
        f"recall@{k}": [],
        "mrr": [],
        f"ndcg@{k}": []
    }

    for relevant, retrieved in zip(relevant_docs, retrieved_docs):
        relevant_set = set(relevant)
        metrics[f"precision@{k}"].append(precision_at_k(relevant_set, retrieved, k))
        metrics[f"recall@{k}"].append(recall_at_k(relevant_set, retrieved, k))
        metrics["mrr"].append(mrr(relevant_set, retrieved))
        metrics[f"ndcg@{k}"].append(ndcg_at_k(relevant_set, retrieved, k))

    return {name: np.mean(values) for name, values in metrics.items()}


def compute_embedding_similarity(
    embeddings1: np.ndarray,
    embeddings2: np.ndarray,
    metric: str = "cosine"
) -> np.ndarray:
    """Compute similarity matrix between embedding sets."""
    if metric == "cosine":
        # Normalize and compute dot product
        norm1 = embeddings1 / np.linalg.norm(embeddings1, axis=1, keepdims=True)
        norm2 = embeddings2 / np.linalg.norm(embeddings2, axis=1, keepdims=True)
        return norm1 @ norm2.T
    elif metric == "euclidean":
        from scipy.spatial.distance import cdist
        return -cdist(embeddings1, embeddings2, metric='euclidean')
    elif metric == "dot":
        return embeddings1 @ embeddings2.T
    else:
        raise ValueError(f"Unknown metric: {metric}")


def compare_embedding_models(
    texts: List[str],
    models: Dict[str, callable],
    queries: List[str],
    relevant_indices: List[List[int]],
    k: int = 5
) -> Dict[str, Dict[str, float]]:
    """Compare multiple embedding models on retrieval quality."""
    results = {}

    for model_name, embed_fn in models.items():
        # Embed all texts
        doc_embeddings = np.array(embed_fn(texts))

        retrieved_per_query = []
        for query in queries:
            query_embedding = np.array(embed_fn([query])[0])
            # Compute similarities
            similarities = compute_embedding_similarity(
                query_embedding.reshape(1, -1),
                doc_embeddings,
                metric="cosine"
            )[0]
            # Get top-k indices
            top_k_indices = np.argsort(similarities)[::-1][:k]
            retrieved_per_query.append([str(i) for i in top_k_indices])

        # Convert relevant indices to string IDs
        relevant_docs = [[str(i) for i in indices] for indices in relevant_indices]

        results[model_name] = evaluate_retrieval_quality(
            queries, relevant_docs, retrieved_per_query, k
        )

    return results
python
import numpy as np
from typing import List, Dict

def evaluate_retrieval_quality(
    queries: List[str],
    relevant_docs: List[List[str]],  # List of relevant doc IDs per query
    retrieved_docs: List[List[str]],  # List of retrieved doc IDs per query
    k: int = 10
) -> Dict[str, float]:
    """Evaluate embedding quality for retrieval."""

    def precision_at_k(relevant: set, retrieved: List[str], k: int) -> float:
        retrieved_k = retrieved[:k]
        relevant_retrieved = len(set(retrieved_k) & relevant)
        return relevant_retrieved / k if k > 0 else 0

    def recall_at_k(relevant: set, retrieved: List[str], k: int) -> float:
        retrieved_k = retrieved[:k]
        relevant_retrieved = len(set(retrieved_k) & relevant)
        return relevant_retrieved / len(relevant) if relevant else 0

    def mrr(relevant: set, retrieved: List[str]) -> float:
        for i, doc in enumerate(retrieved):
            if doc in relevant:
                return 1 / (i + 1)
        return 0

    def ndcg_at_k(relevant: set, retrieved: List[str], k: int) -> float:
        dcg = sum(
            1 / np.log2(i + 2) if doc in relevant else 0
            for i, doc in enumerate(retrieved[:k])
        )
        ideal_dcg = sum(1 / np.log2(i + 2) for i in range(min(len(relevant), k)))
        return dcg / ideal_dcg if ideal_dcg > 0 else 0

    metrics = {
        f"precision@{k}": [],
        f"recall@{k}": [],
        "mrr": [],
        f"ndcg@{k}": []
    }

    for relevant, retrieved in zip(relevant_docs, retrieved_docs):
        relevant_set = set(relevant)
        metrics[f"precision@{k}"].append(precision_at_k(relevant_set, retrieved, k))
        metrics[f"recall@{k}"].append(recall_at_k(relevant_set, retrieved, k))
        metrics["mrr"].append(mrr(relevant_set, retrieved))
        metrics[f"ndcg@{k}"].append(ndcg_at_k(relevant_set, retrieved, k))

    return {name: np.mean(values) for name, values in metrics.items()}


def compute_embedding_similarity(
    embeddings1: np.ndarray,
    embeddings2: np.ndarray,
    metric: str = "cosine"
) -> np.ndarray:
    """Compute similarity matrix between embedding sets."""
    if metric == "cosine":
        # Normalize and compute dot product
        norm1 = embeddings1 / np.linalg.norm(embeddings1, axis=1, keepdims=True)
        norm2 = embeddings2 / np.linalg.norm(embeddings2, axis=1, keepdims=True)
        return norm1 @ norm2.T
    elif metric == "euclidean":
        from scipy.spatial.distance import cdist
        return -cdist(embeddings1, embeddings2, metric='euclidean')
    elif metric == "dot":
        return embeddings1 @ embeddings2.T
    else:
        raise ValueError(f"Unknown metric: {metric}")


def compare_embedding_models(
    texts: List[str],
    models: Dict[str, callable],
    queries: List[str],
    relevant_indices: List[List[int]],
    k: int = 5
) -> Dict[str, Dict[str, float]]:
    """Compare multiple embedding models on retrieval quality."""
    results = {}

    for model_name, embed_fn in models.items():
        # Embed all texts
        doc_embeddings = np.array(embed_fn(texts))

        retrieved_per_query = []
        for query in queries:
            query_embedding = np.array(embed_fn([query])[0])
            # Compute similarities
            similarities = compute_embedding_similarity(
                query_embedding.reshape(1, -1),
                doc_embeddings,
                metric="cosine"
            )[0]
            # Get top-k indices
            top_k_indices = np.argsort(similarities)[::-1][:k]
            retrieved_per_query.append([str(i) for i in top_k_indices])

        # Convert relevant indices to string IDs
        relevant_docs = [[str(i) for i in indices] for indices in relevant_indices]

        results[model_name] = evaluate_retrieval_quality(
            queries, relevant_docs, retrieved_per_query, k
        )

    return results

Best Practices

最佳实践

Do's

建议事项

  • Match model to use case: Code vs prose vs multilingual
  • Chunk thoughtfully: Preserve semantic boundaries
  • Normalize embeddings: For cosine similarity search
  • Batch requests: More efficient than one-by-one
  • Cache embeddings: Avoid recomputing for static content
  • Use Voyage AI for Claude apps: Recommended by Anthropic
  • 匹配模型与使用场景:代码场景、普通文本场景或多语言场景
  • 合理分块:保留语义边界
  • 标准化嵌入向量:用于余弦相似度搜索
  • 批量请求:比逐个请求更高效
  • 缓存嵌入向量:避免为静态内容重复计算
  • Claude应用使用Voyage AI:Anthropic官方推荐

Don'ts

避免事项

  • Don't ignore token limits: Truncation loses information
  • Don't mix embedding models: Incompatible vector spaces
  • Don't skip preprocessing: Garbage in, garbage out
  • Don't over-chunk: Lose important context
  • Don't forget metadata: Essential for filtering and debugging
  • 不要忽略Token限制:截断会丢失信息
  • 不要混合嵌入模型:向量空间不兼容
  • 不要跳过预处理:输入质量差则输出质量差
  • 不要过度分块:丢失重要上下文
  • 不要忘记元数据:对过滤和调试至关重要

Resources

资源