vector-database-management
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseVector Database Management
向量数据库管理
Table of Contents
目录
Introduction
简介
Vector databases are specialized systems designed to store, index, and query high-dimensional vector embeddings efficiently. They power modern AI applications including semantic search, recommendation systems, RAG (Retrieval Augmented Generation), and similarity-based matching.
向量数据库是专门设计用于高效存储、索引和查询高维向量嵌入的系统,可为语义搜索、推荐系统、RAG(检索增强生成)和基于相似度的匹配等现代AI应用提供支持。
Key Concepts
核心概念
- Vector Embeddings: Numerical representations of data (text, images, audio) in high-dimensional space
- Similarity Search: Finding vectors that are "close" to a query vector using distance metrics
- Metadata Filtering: Combining vector similarity with structured data filtering
- Indexing: Optimization structures (HNSW, IVF, etc.) for fast approximate nearest neighbor search
- 向量嵌入:将数据(文本、图像、音频)转换为高维空间中的数值表示
- 相似度搜索:使用距离度量找到与查询向量“相近”的向量
- 元数据过滤:将向量相似度与结构化数据过滤相结合
- 索引:用于快速近似最近邻搜索的优化结构(如HNSW、IVF等)
Database Comparison
数据库对比
| Feature | Pinecone | Weaviate | Chroma |
|---|---|---|---|
| Deployment | Fully managed | Managed or self-hosted | Self-hosted or cloud |
| Index Types | Serverless, Pods | HNSW | HNSW |
| Metadata Filtering | Advanced | GraphQL-based | Simple |
| Hybrid Search | Sparse-Dense | Built-in | Limited |
| Scale | Massive | Large | Small-Medium |
| Best For | Production RAG | Knowledge graphs | Local development |
| 特性 | Pinecone | Weaviate | Chroma |
|---|---|---|---|
| 部署方式 | 全托管 | 托管或自托管 | 自托管或云端 |
| 索引类型 | Serverless、Pods | HNSW | HNSW |
| 元数据过滤 | 高级 | 基于GraphQL | 基础 |
| 混合搜索 | 稀疏-稠密 | 内置 | 有限 |
| 扩容能力 | 大规模 | 大型 | 中小型 |
| 最佳适用场景 | 生产环境RAG | 知识图谱 | 本地开发 |
Vector Embeddings Fundamentals
向量嵌入基础
Understanding Vector Representations
理解向量表示
Vector embeddings transform unstructured data into numerical arrays that capture semantic meaning:
python
undefined向量嵌入可将非结构化数据转换为能捕获语义信息的数值数组:
python
undefinedText to embeddings using OpenAI
Text to embeddings using OpenAI
from openai import OpenAI
client = OpenAI(api_key="YOUR_API_KEY")
def generate_embedding(text: str, model: str = "text-embedding-3-small") -> list[float]:
"""Generate embeddings from text using OpenAI."""
response = client.embeddings.create(
input=text,
model=model
)
return response.data[0].embedding
from openai import OpenAI
client = OpenAI(api_key="YOUR_API_KEY")
def generate_embedding(text: str, model: str = "text-embedding-3-small") -> list[float]:
"""Generate embeddings from text using OpenAI."""
response = client.embeddings.create(
input=text,
model=model
)
return response.data[0].embedding
Example usage
Example usage
text = "Vector databases enable semantic search capabilities"
embedding = generate_embedding(text)
print(f"Embedding dimension: {len(embedding)}") # 1536 dimensions
print(f"First 5 values: {embedding[:5]}")
undefinedtext = "Vector databases enable semantic search capabilities"
embedding = generate_embedding(text)
print(f"Embedding dimension: {len(embedding)}") # 1536 dimensions
print(f"First 5 values: {embedding[:5]}")
undefinedPopular Embedding Models
热门嵌入模型
python
undefinedpython
undefined1. OpenAI Embeddings (Production-grade)
1. OpenAI Embeddings (Production-grade)
from openai import OpenAI
def openai_embeddings(texts: list[str]) -> list[list[float]]:
"""Batch generate OpenAI embeddings."""
client = OpenAI(api_key="YOUR_API_KEY")
response = client.embeddings.create(
input=texts,
model="text-embedding-3-large" # 3072 dimensions
)
return [item.embedding for item in response.data]
from openai import OpenAI
def openai_embeddings(texts: list[str]) -> list[list[float]]:
"""Batch generate OpenAI embeddings."""
client = OpenAI(api_key="YOUR_API_KEY")
response = client.embeddings.create(
input=texts,
model="text-embedding-3-large" # 3072 dimensions
)
return [item.embedding for item in response.data]
2. Sentence Transformers (Open-source)
2. Sentence Transformers (Open-source)
from sentence_transformers import SentenceTransformer
def sentence_transformer_embeddings(texts: list[str]) -> list[list[float]]:
"""Generate embeddings using Sentence Transformers."""
model = SentenceTransformer('all-MiniLM-L6-v2') # 384 dimensions
embeddings = model.encode(texts)
return embeddings.tolist()
from sentence_transformers import SentenceTransformer
def sentence_transformer_embeddings(texts: list[str]) -> list[list[float]]:
"""Generate embeddings using Sentence Transformers."""
model = SentenceTransformer('all-MiniLM-L6-v2') # 384 dimensions
embeddings = model.encode(texts)
return embeddings.tolist()
3. Cohere Embeddings
3. Cohere Embeddings
import cohere
def cohere_embeddings(texts: list[str]) -> list[list[float]]:
"""Generate embeddings using Cohere."""
co = cohere.Client("YOUR_API_KEY")
response = co.embed(
texts=texts,
model="embed-english-v3.0",
input_type="search_document"
)
return response.embeddings
undefinedimport cohere
def cohere_embeddings(texts: list[str]) -> list[list[float]]:
"""Generate embeddings using Cohere."""
co = cohere.Client("YOUR_API_KEY")
response = co.embed(
texts=texts,
model="embed-english-v3.0",
input_type="search_document"
)
return response.embeddings
undefinedEmbedding Dimensions & Trade-offs
嵌入维度与权衡
python
undefinedpython
undefinedDifferent embedding models for different use cases
Different embedding models for different use cases
EMBEDDING_CONFIGS = {
"openai-small": {
"model": "text-embedding-3-small",
"dimensions": 1536,
"cost_per_1m": 0.02,
"use_case": "General purpose, cost-effective"
},
"openai-large": {
"model": "text-embedding-3-large",
"dimensions": 3072,
"cost_per_1m": 0.13,
"use_case": "High accuracy requirements"
},
"sentence-transformers": {
"model": "all-MiniLM-L6-v2",
"dimensions": 384,
"cost_per_1m": 0.00, # Open-source
"use_case": "Local development, privacy-sensitive"
},
"cohere-multilingual": {
"model": "embed-multilingual-v3.0",
"dimensions": 1024,
"cost_per_1m": 0.10,
"use_case": "Multi-language applications"
}
}
undefinedEMBEDDING_CONFIGS = {
"openai-small": {
"model": "text-embedding-3-small",
"dimensions": 1536,
"cost_per_1m": 0.02,
"use_case": "General purpose, cost-effective"
},
"openai-large": {
"model": "text-embedding-3-large",
"dimensions": 3072,
"cost_per_1m": 0.13,
"use_case": "High accuracy requirements"
},
"sentence-transformers": {
"model": "all-MiniLM-L6-v2",
"dimensions": 384,
"cost_per_1m": 0.00, # Open-source
"use_case": "Local development, privacy-sensitive"
},
"cohere-multilingual": {
"model": "embed-multilingual-v3.0",
"dimensions": 1024,
"cost_per_1m": 0.10,
"use_case": "Multi-language applications"
}
}
undefinedDatabase Setup & Configuration
数据库设置与配置
Pinecone Setup
Pinecone 设置
python
undefinedpython
undefinedInstall Pinecone SDK
Install Pinecone SDK
pip install pinecone-client
pip install pinecone-client
from pinecone import Pinecone, ServerlessSpec
from pinecone import Pinecone, ServerlessSpec
Initialize Pinecone client
Initialize Pinecone client
pc = Pinecone(api_key="YOUR_API_KEY")
pc = Pinecone(api_key="YOUR_API_KEY")
List existing indexes
List existing indexes
indexes = pc.list_indexes()
print(f"Existing indexes: {[idx.name for idx in indexes]}")
indexes = pc.list_indexes()
print(f"Existing indexes: {[idx.name for idx in indexes]}")
Create serverless index (recommended for production)
Create serverless index (recommended for production)
index_name = "production-search"
if index_name not in [idx.name for idx in pc.list_indexes()]:
pc.create_index(
name=index_name,
dimension=1536, # Match your embedding model
metric="cosine", # cosine, dotproduct, or euclidean
spec=ServerlessSpec(
cloud="aws",
region="us-east-1"
),
deletion_protection="enabled", # Prevent accidental deletion
tags={
"environment": "production",
"team": "ml",
"project": "semantic-search"
}
)
print(f"Created index: {index_name}")
index_name = "production-search"
if index_name not in [idx.name for idx in pc.list_indexes()]:
pc.create_index(
name=index_name,
dimension=1536, # Match your embedding model
metric="cosine", # cosine, dotproduct, or euclidean
spec=ServerlessSpec(
cloud="aws",
region="us-east-1"
),
deletion_protection="enabled", # Prevent accidental deletion
tags={
"environment": "production",
"team": "ml",
"project": "semantic-search"
}
)
print(f"Created index: {index_name}")
Connect to index
Connect to index
index = pc.Index(index_name)
index = pc.Index(index_name)
Get index stats
Get index stats
stats = index.describe_index_stats()
print(f"Index stats: {stats}")
undefinedstats = index.describe_index_stats()
print(f"Index stats: {stats}")
undefinedSelective Metadata Indexing (Pinecone)
选择性元数据索引(Pinecone)
python
undefinedpython
undefinedConfigure which metadata fields to index for filtering
Configure which metadata fields to index for filtering
This optimizes memory usage and query performance
This optimizes memory usage and query performance
from pinecone import Pinecone, ServerlessSpec
pc = Pinecone(api_key="YOUR_API_KEY")
from pinecone import Pinecone, ServerlessSpec
pc = Pinecone(api_key="YOUR_API_KEY")
Create index with metadata configuration
Create index with metadata configuration
pc.create_index(
name="optimized-index",
dimension=1536,
metric="cosine",
spec=ServerlessSpec(
cloud="aws",
region="us-east-1",
schema={
"fields": {
# Index these fields for filtering
"document_id": {"filterable": True},
"category": {"filterable": True},
"created_at": {"filterable": True},
"tags": {"filterable": True},
# Store but don't index (saves memory)
"document_title": {"filterable": False},
"document_url": {"filterable": False},
"full_content": {"filterable": False}
}
}
)
)
pc.create_index(
name="optimized-index",
dimension=1536,
metric="cosine",
spec=ServerlessSpec(
cloud="aws",
region="us-east-1",
schema={
"fields": {
# Index these fields for filtering
"document_id": {"filterable": True},
"category": {"filterable": True},
"created_at": {"filterable": True},
"tags": {"filterable": True},
# Store but don't index (saves memory)
"document_title": {"filterable": False},
"document_url": {"filterable": False},
"full_content": {"filterable": False}
}
}
)
)
This configuration allows you to:
This configuration allows you to:
1. Filter by document_id, category, created_at, tags
1. Filter by document_id, category, created_at, tags
2. Retrieve document_title, document_url, full_content in results
2. Retrieve document_title, document_url, full_content in results
3. Save memory by not indexing non-filterable fields
3. Save memory by not indexing non-filterable fields
undefinedundefinedWeaviate Setup
Weaviate 设置
python
undefinedpython
undefinedInstall Weaviate client
Install Weaviate client
pip install weaviate-client
pip install weaviate-client
import weaviate
from weaviate.classes.config import Configure, Property, DataType
import weaviate
from weaviate.classes.config import Configure, Property, DataType
Connect to Weaviate
Connect to Weaviate
client = weaviate.connect_to_local()
client = weaviate.connect_to_local()
Or connect to Weaviate Cloud
Or connect to Weaviate Cloud
client = weaviate.connect_to_wcs(
client = weaviate.connect_to_wcs(
cluster_url="YOUR_WCS_URL",
cluster_url="YOUR_WCS_URL",
auth_credentials=weaviate.auth.AuthApiKey("YOUR_API_KEY")
auth_credentials=weaviate.auth.AuthApiKey("YOUR_API_KEY")
)
)
Create collection (schema)
Create collection (schema)
try:
collection = client.collections.create(
name="Documents",
vectorizer_config=Configure.Vectorizer.text2vec_openai(
model="text-embedding-3-small"
),
properties=[
Property(name="title", data_type=DataType.TEXT),
Property(name="content", data_type=DataType.TEXT),
Property(name="category", data_type=DataType.TEXT),
Property(name="created_at", data_type=DataType.DATE),
Property(name="tags", data_type=DataType.TEXT_ARRAY)
]
)
print(f"Created collection: Documents")
except Exception as e:
print(f"Collection exists or error: {e}")
try:
collection = client.collections.create(
name="Documents",
vectorizer_config=Configure.Vectorizer.text2vec_openai(
model="text-embedding-3-small"
),
properties=[
Property(name="title", data_type=DataType.TEXT),
Property(name="content", data_type=DataType.TEXT),
Property(name="category", data_type=DataType.TEXT),
Property(name="created_at", data_type=DataType.DATE),
Property(name="tags", data_type=DataType.TEXT_ARRAY)
]
)
print(f"Created collection: Documents")
except Exception as e:
print(f"Collection exists or error: {e}")
Get collection
Get collection
documents = client.collections.get("Documents")
documents = client.collections.get("Documents")
Check collection info
Check collection info
print(documents.config.get())
undefinedprint(documents.config.get())
undefinedChroma Setup
Chroma 设置
python
undefinedpython
undefinedInstall Chroma
Install Chroma
pip install chromadb
pip install chromadb
import chromadb
from chromadb.config import Settings
import chromadb
from chromadb.config import Settings
Initialize Chroma client (persistent)
Initialize Chroma client (persistent)
client = chromadb.PersistentClient(path="./chroma_db")
client = chromadb.PersistentClient(path="./chroma_db")
Or use ephemeral (in-memory)
Or use ephemeral (in-memory)
client = chromadb.EphemeralClient()
client = chromadb.EphemeralClient()
Create or get collection
Create or get collection
collection = client.get_or_create_collection(
name="documents",
metadata={
"description": "Document collection for semantic search",
"hnsw:space": "cosine" # cosine, l2, or ip (inner product)
}
)
collection = client.get_or_create_collection(
name="documents",
metadata={
"description": "Document collection for semantic search",
"hnsw:space": "cosine" # cosine, l2, or ip (inner product)
}
)
List all collections
List all collections
collections = client.list_collections()
print(f"Available collections: {[c.name for c in collections]}")
collections = client.list_collections()
print(f"Available collections: {[c.name for c in collections]}")
Get collection info
Get collection info
print(f"Collection count: {collection.count()}")
undefinedprint(f"Collection count: {collection.count()}")
undefinedIndex Operations
索引操作
Creating Indexes with Different Configurations
创建不同配置的索引
python
from pinecone import Pinecone, ServerlessSpec, PodSpec
pc = Pinecone(api_key="YOUR_API_KEY")python
from pinecone import Pinecone, ServerlessSpec, PodSpec
pc = Pinecone(api_key="YOUR_API_KEY")1. Serverless index (auto-scaling, pay-per-use)
1. Serverless index (auto-scaling, pay-per-use)
pc.create_index(
name="serverless-index",
dimension=1536,
metric="cosine",
spec=ServerlessSpec(
cloud="aws",
region="us-east-1"
)
)
pc.create_index(
name="serverless-index",
dimension=1536,
metric="cosine",
spec=ServerlessSpec(
cloud="aws",
region="us-east-1"
)
)
2. Pod-based index (dedicated resources)
2. Pod-based index (dedicated resources)
pc.create_index(
name="pod-index",
dimension=1536,
metric="dotproduct",
spec=PodSpec(
environment="us-east-1-aws",
pod_type="p1.x1", # Performance tier
pods=2, # Number of pods
replicas=2, # Replicas for high availability
shards=1
)
)
pc.create_index(
name="pod-index",
dimension=1536,
metric="dotproduct",
spec=PodSpec(
environment="us-east-1-aws",
pod_type="p1.x1", # Performance tier
pods=2, # Number of pods
replicas=2, # Replicas for high availability
shards=1
)
)
3. Sparse index (for BM25-like search)
3. Sparse index (for BM25-like search)
pc.create_index(
name="sparse-index",
dimension=None, # Sparse vectors don't have fixed dimension
metric="dotproduct",
spec=ServerlessSpec(
cloud="aws",
region="us-east-1"
)
)
undefinedpc.create_index(
name="sparse-index",
dimension=None, # Sparse vectors don't have fixed dimension
metric="dotproduct",
spec=ServerlessSpec(
cloud="aws",
region="us-east-1"
)
)
undefinedIndex Management Operations
索引管理操作
python
from pinecone import Pinecone
pc = Pinecone(api_key="YOUR_API_KEY")python
from pinecone import Pinecone
pc = Pinecone(api_key="YOUR_API_KEY")List all indexes
List all indexes
indexes = pc.list_indexes()
for idx in indexes:
print(f"Name: {idx.name}, Status: {idx.status.state}, Host: {idx.host}")
indexes = pc.list_indexes()
for idx in indexes:
print(f"Name: {idx.name}, Status: {idx.status.state}, Host: {idx.host}")
Describe specific index
Describe specific index
index_info = pc.describe_index("production-search")
print(f"Dimension: {index_info.dimension}")
print(f"Metric: {index_info.metric}")
print(f"Status: {index_info.status}")
index_info = pc.describe_index("production-search")
print(f"Dimension: {index_info.dimension}")
print(f"Metric: {index_info.metric}")
print(f"Status: {index_info.status}")
Connect to index
Connect to index
index = pc.Index("production-search")
index = pc.Index("production-search")
Get index statistics
Get index statistics
stats = index.describe_index_stats()
print(f"Total vectors: {stats.total_vector_count}")
print(f"Namespaces: {stats.namespaces}")
print(f"Index fullness: {stats.index_fullness}")
stats = index.describe_index_stats()
print(f"Total vectors: {stats.total_vector_count}")
print(f"Namespaces: {stats.namespaces}")
print(f"Index fullness: {stats.index_fullness}")
Delete index (be careful!)
Delete index (be careful!)
pc.delete_index("test-index")
pc.delete_index("test-index")
undefinedundefinedConfiguring Index for Optimal Performance
配置索引以实现最佳性能
python
undefinedpython
undefinedConfiguration for different use cases
Configuration for different use cases
1. High-throughput search (many queries/second)
1. High-throughput search (many queries/second)
pc.create_index(
name="high-throughput",
dimension=1536,
metric="cosine",
spec=PodSpec(
environment="us-east-1-aws",
pod_type="p2.x1", # Higher performance tier
pods=4,
replicas=3 # More replicas = higher query throughput
)
)
pc.create_index(
name="high-throughput",
dimension=1536,
metric="cosine",
spec=PodSpec(
environment="us-east-1-aws",
pod_type="p2.x1", # Higher performance tier
pods=4,
replicas=3 # More replicas = higher query throughput
)
)
2. Large-scale storage (billions of vectors)
2. Large-scale storage (billions of vectors)
pc.create_index(
name="large-scale",
dimension=1536,
metric="cosine",
spec=PodSpec(
environment="us-east-1-aws",
pod_type="s1.x1", # Storage-optimized
pods=8,
shards=4 # More shards = more storage capacity
)
)
pc.create_index(
name="large-scale",
dimension=1536,
metric="cosine",
spec=PodSpec(
environment="us-east-1-aws",
pod_type="s1.x1", # Storage-optimized
pods=8,
shards=4 # More shards = more storage capacity
)
)
3. Cost-optimized development
3. Cost-optimized development
pc.create_index(
name="dev-environment",
dimension=1536,
metric="cosine",
spec=ServerlessSpec(
cloud="aws",
region="us-east-1"
) # Serverless = pay only for what you use
)
undefinedpc.create_index(
name="dev-environment",
dimension=1536,
metric="cosine",
spec=ServerlessSpec(
cloud="aws",
region="us-east-1"
) # Serverless = pay only for what you use
)
undefinedVector Operations
向量操作
Upserting Vectors (Pinecone)
插入/更新向量(Pinecone)
python
from pinecone import Pinecone
import uuid
pc = Pinecone(api_key="YOUR_API_KEY")
index = pc.Index("production-search")python
from pinecone import Pinecone
import uuid
pc = Pinecone(api_key="YOUR_API_KEY")
index = pc.Index("production-search")1. Single vector upsert
1. Single vector upsert
vector_id = str(uuid.uuid4())
index.upsert(
vectors=[
{
"id": vector_id,
"values": [0.1, 0.2, 0.3, ...], # 1536 dimensions
"metadata": {
"title": "Introduction to Vector Databases",
"category": "education",
"author": "John Doe",
"created_at": "2024-01-15",
"tags": ["ml", "ai", "databases"]
}
}
],
namespace="documents"
)
vector_id = str(uuid.uuid4())
index.upsert(
vectors=[
{
"id": vector_id,
"values": [0.1, 0.2, 0.3, ...], # 1536 dimensions
"metadata": {
"title": "Introduction to Vector Databases",
"category": "education",
"author": "John Doe",
"created_at": "2024-01-15",
"tags": ["ml", "ai", "databases"]
}
}
],
namespace="documents"
)
2. Batch upsert (efficient for large datasets)
2. Batch upsert (efficient for large datasets)
batch_size = 100
vectors = []
for i, (doc_id, embedding, metadata) in enumerate(documents):
vectors.append({
"id": doc_id,
"values": embedding,
"metadata": metadata
})
# Upsert in batches
if len(vectors) >= batch_size or i == len(documents) - 1:
index.upsert(vectors=vectors, namespace="documents")
print(f"Upserted batch of {len(vectors)} vectors")
vectors = []batch_size = 100
vectors = []
for i, (doc_id, embedding, metadata) in enumerate(documents):
vectors.append({
"id": doc_id,
"values": embedding,
"metadata": metadata
})
# Upsert in batches
if len(vectors) >= batch_size or i == len(documents) - 1:
index.upsert(vectors=vectors, namespace="documents")
print(f"Upserted batch of {len(vectors)} vectors")
vectors = []3. Upsert with async for better performance
3. Upsert with async for better performance
from pinecone import Pinecone
import asyncio
async def upsert_vectors_async(vectors_batch):
"""Async upsert for parallel processing."""
index.upsert(vectors=vectors_batch, namespace="documents", async_req=True)
from pinecone import Pinecone
import asyncio
async def upsert_vectors_async(vectors_batch):
"""Async upsert for parallel processing."""
index.upsert(vectors=vectors_batch, namespace="documents", async_req=True)
Parallel upsert
Parallel upsert
tasks = []
for batch in batches:
tasks.append(upsert_vectors_async(batch))
await asyncio.gather(*tasks)
undefinedtasks = []
for batch in batches:
tasks.append(upsert_vectors_async(batch))
await asyncio.gather(*tasks)
undefinedSparse Vector Operations (Pinecone)
稀疏向量操作(Pinecone)
python
undefinedpython
undefinedSparse vectors are useful for keyword-based search (like BM25)
Sparse vectors are useful for keyword-based search (like BM25)
Combined with dense vectors for hybrid search
Combined with dense vectors for hybrid search
from pinecone import Pinecone
pc = Pinecone(api_key="YOUR_API_KEY")
index = pc.Index("hybrid-search-index")
from pinecone import Pinecone
pc = Pinecone(api_key="YOUR_API_KEY")
index = pc.Index("hybrid-search-index")
Upsert vector with both dense and sparse components
Upsert vector with both dense and sparse components
index.upsert(
vectors=[
{
"id": "doc1",
"values": [0.1, 0.2, ..., 0.5], # Dense vector
"sparse_values": {
"indices": [10, 45, 123, 234, 678], # Token IDs
"values": [0.8, 0.6, 0.9, 0.7, 0.5] # TF-IDF weights
},
"metadata": {"title": "Hybrid Search Document"}
}
],
namespace="hybrid"
)
index.upsert(
vectors=[
{
"id": "doc1",
"values": [0.1, 0.2, ..., 0.5], # Dense vector
"sparse_values": {
"indices": [10, 45, 123, 234, 678], # Token IDs
"values": [0.8, 0.6, 0.9, 0.7, 0.5] # TF-IDF weights
},
"metadata": {"title": "Hybrid Search Document"}
}
],
namespace="hybrid"
)
Query with hybrid search
Query with hybrid search
results = index.query(
vector=[0.1, 0.2, ..., 0.5], # Dense query vector
sparse_vector={
"indices": [10, 45, 123],
"values": [0.8, 0.7, 0.9]
},
top_k=10,
namespace="hybrid",
include_metadata=True
)
undefinedresults = index.query(
vector=[0.1, 0.2, ..., 0.5], # Dense query vector
sparse_vector={
"indices": [10, 45, 123],
"values": [0.8, 0.7, 0.9]
},
top_k=10,
namespace="hybrid",
include_metadata=True
)
undefinedVector Operations (Weaviate)
向量操作(Weaviate)
python
import weaviate
from weaviate.classes.query import MetadataQuery
client = weaviate.connect_to_local()
documents = client.collections.get("Documents")python
import weaviate
from weaviate.classes.query import MetadataQuery
client = weaviate.connect_to_local()
documents = client.collections.get("Documents")1. Insert single object
1. Insert single object
doc_uuid = documents.data.insert(
properties={
"title": "Vector Database Guide",
"content": "A comprehensive guide to vector databases...",
"category": "tutorial",
"created_at": "2024-01-15T10:00:00Z",
"tags": ["database", "ml", "ai"]
}
)
print(f"Inserted: {doc_uuid}")
doc_uuid = documents.data.insert(
properties={
"title": "Vector Database Guide",
"content": "A comprehensive guide to vector databases...",
"category": "tutorial",
"created_at": "2024-01-15T10:00:00Z",
"tags": ["database", "ml", "ai"]
}
)
print(f"Inserted: {doc_uuid}")
2. Batch insert
2. Batch insert
with documents.batch.dynamic() as batch:
for doc in document_list:
batch.add_object(
properties={
"title": doc["title"],
"content": doc["content"],
"category": doc["category"],
"created_at": doc["created_at"],
"tags": doc["tags"]
}
)
with documents.batch.dynamic() as batch:
for doc in document_list:
batch.add_object(
properties={
"title": doc["title"],
"content": doc["content"],
"category": doc["category"],
"created_at": doc["created_at"],
"tags": doc["tags"]
}
)
3. Insert with custom vector
3. Insert with custom vector
documents.data.insert(
properties={"title": "Custom Vector Doc", "content": "..."},
vector=[0.1, 0.2, 0.3, ...] # Your pre-computed vector
)
documents.data.insert(
properties={"title": "Custom Vector Doc", "content": "..."},
vector=[0.1, 0.2, 0.3, ...] # Your pre-computed vector
)
4. Update object
4. Update object
documents.data.update(
uuid=doc_uuid,
properties={"title": "Updated Title"}
)
documents.data.update(
uuid=doc_uuid,
properties={"title": "Updated Title"}
)
5. Delete object
5. Delete object
documents.data.delete_by_id(uuid=doc_uuid)
undefineddocuments.data.delete_by_id(uuid=doc_uuid)
undefinedVector Operations (Chroma)
向量操作(Chroma)
python
import chromadb
client = chromadb.PersistentClient(path="./chroma_db")
collection = client.get_collection("documents")python
import chromadb
client = chromadb.PersistentClient(path="./chroma_db")
collection = client.get_collection("documents")1. Add documents with auto-embedding
1. Add documents with auto-embedding
collection.add(
documents=[
"This is document 1",
"This is document 2",
"This is document 3"
],
metadatas=[
{"category": "tech", "author": "Alice"},
{"category": "science", "author": "Bob"},
{"category": "tech", "author": "Charlie"}
],
ids=["doc1", "doc2", "doc3"]
)
collection.add(
documents=[
"This is document 1",
"This is document 2",
"This is document 3"
],
metadatas=[
{"category": "tech", "author": "Alice"},
{"category": "science", "author": "Bob"},
{"category": "tech", "author": "Charlie"}
],
ids=["doc1", "doc2", "doc3"]
)
2. Add with custom embeddings
2. Add with custom embeddings
collection.add(
embeddings=[
[0.1, 0.2, 0.3, ...],
[0.4, 0.5, 0.6, ...]
],
metadatas=[
{"title": "Doc 1"},
{"title": "Doc 2"}
],
ids=["custom1", "custom2"]
)
collection.add(
embeddings=[
[0.1, 0.2, 0.3, ...],
[0.4, 0.5, 0.6, ...]
],
metadatas=[
{"title": "Doc 1"},
{"title": "Doc 2"}
],
ids=["custom1", "custom2"]
)
3. Update documents
3. Update documents
collection.update(
ids=["doc1"],
documents=["Updated document content"],
metadatas=[{"category": "tech", "updated": True}]
)
collection.update(
ids=["doc1"],
documents=["Updated document content"],
metadatas=[{"category": "tech", "updated": True}]
)
4. Delete documents
4. Delete documents
collection.delete(ids=["doc1", "doc2"])
collection.delete(ids=["doc1", "doc2"])
5. Get documents by IDs
5. Get documents by IDs
results = collection.get(
ids=["doc1", "doc2"],
include=["documents", "metadatas", "embeddings"]
)
undefinedresults = collection.get(
ids=["doc1", "doc2"],
include=["documents", "metadatas", "embeddings"]
)
undefinedSimilarity Search
相似度搜索
Basic Similarity Search (Pinecone)
基础相似度搜索(Pinecone)
python
from pinecone import Pinecone
from openai import OpenAIpython
from pinecone import Pinecone
from openai import OpenAIInitialize clients
Initialize clients
pc = Pinecone(api_key="PINECONE_API_KEY")
openai_client = OpenAI(api_key="OPENAI_API_KEY")
index = pc.Index("production-search")
pc = Pinecone(api_key="PINECONE_API_KEY")
openai_client = OpenAI(api_key="OPENAI_API_KEY")
index = pc.Index("production-search")
1. Generate query embedding
1. Generate query embedding
query_text = "What are the benefits of vector databases?"
response = openai_client.embeddings.create(
input=query_text,
model="text-embedding-3-small"
)
query_embedding = response.data[0].embedding
query_text = "What are the benefits of vector databases?"
response = openai_client.embeddings.create(
input=query_text,
model="text-embedding-3-small"
)
query_embedding = response.data[0].embedding
2. Search for similar vectors
2. Search for similar vectors
results = index.query(
vector=query_embedding,
top_k=10,
namespace="documents",
include_values=False,
include_metadata=True
)
results = index.query(
vector=query_embedding,
top_k=10,
namespace="documents",
include_values=False,
include_metadata=True
)
3. Process results
3. Process results
print(f"Found {len(results.matches)} results")
for match in results.matches:
print(f"ID: {match.id}")
print(f"Score: {match.score:.4f}")
print(f"Title: {match.metadata.get('title')}")
print(f"Category: {match.metadata.get('category')}")
print("---")
undefinedprint(f"Found {len(results.matches)} results")
for match in results.matches:
print(f"ID: {match.id}")
print(f"Score: {match.score:.4f}")
print(f"Title: {match.metadata.get('title')}")
print(f"Category: {match.metadata.get('category')}")
print("---")
undefinedSearch by ID (Query by Example)
按ID搜索(示例查询)
python
undefinedpython
undefinedSearch using an existing vector as query
Search using an existing vector as query
results = index.query(
id="existing-doc-id", # Use this document as the query
top_k=10,
namespace="documents",
include_metadata=True
)
results = index.query(
id="existing-doc-id", # Use this document as the query
top_k=10,
namespace="documents",
include_metadata=True
)
Useful for "find similar items" features
Useful for "find similar items" features
print(f"Documents similar to {results.matches[0].metadata.get('title')}:")
for match in results.matches[1:]: # Skip first (self)
print(f"- {match.metadata.get('title')} (score: {match.score:.4f})")
undefinedprint(f"Documents similar to {results.matches[0].metadata.get('title')}:")
for match in results.matches[1:]: # Skip first (self)
print(f"- {match.metadata.get('title')} (score: {match.score:.4f})")
undefinedMulti-vector Search (Pinecone)
多向量搜索(Pinecone)
python
undefinedpython
undefinedSearch multiple query vectors in one request
Search multiple query vectors in one request
query_embeddings = [
[0.1, 0.2, ...], # Query 1
[0.3, 0.4, ...], # Query 2
[0.5, 0.6, ...] # Query 3
]
results = index.query(
queries=query_embeddings,
top_k=5,
namespace="documents",
include_metadata=True
)
query_embeddings = [
[0.1, 0.2, ...], # Query 1
[0.3, 0.4, ...], # Query 2
[0.5, 0.6, ...] # Query 3
]
results = index.query(
queries=query_embeddings,
top_k=5,
namespace="documents",
include_metadata=True
)
Process results for each query
Process results for each query
for i, query_results in enumerate(results):
print(f"\nResults for query {i+1}:")
for match in query_results.matches:
print(f"- {match.metadata.get('title')} (score: {match.score:.4f})")
undefinedfor i, query_results in enumerate(results):
print(f"\nResults for query {i+1}:")
for match in query_results.matches:
print(f"- {match.metadata.get('title')} (score: {match.score:.4f})")
undefinedSimilarity Search (Weaviate)
相似度搜索(Weaviate)
python
import weaviate
from weaviate.classes.query import MetadataQuery
client = weaviate.connect_to_local()
documents = client.collections.get("Documents")python
import weaviate
from weaviate.classes.query import MetadataQuery
client = weaviate.connect_to_local()
documents = client.collections.get("Documents")1. Near text search (semantic)
1. Near text search (semantic)
response = documents.query.near_text(
query="vector database performance optimization",
limit=10,
return_metadata=MetadataQuery(distance=True, certainty=True)
)
for obj in response.objects:
print(f"Title: {obj.properties['title']}")
print(f"Distance: {obj.metadata.distance:.4f}")
print(f"Certainty: {obj.metadata.certainty:.4f}")
print("---")
response = documents.query.near_text(
query="vector database performance optimization",
limit=10,
return_metadata=MetadataQuery(distance=True, certainty=True)
)
for obj in response.objects:
print(f"Title: {obj.properties['title']}")
print(f"Distance: {obj.metadata.distance:.4f}")
print(f"Certainty: {obj.metadata.certainty:.4f}")
print("---")
2. Near vector search (with custom embedding)
2. Near vector search (with custom embedding)
response = documents.query.near_vector(
near_vector=[0.1, 0.2, 0.3, ...],
limit=10
)
response = documents.query.near_vector(
near_vector=[0.1, 0.2, 0.3, ...],
limit=10
)
3. Near object search (find similar to existing object)
3. Near object search (find similar to existing object)
response = documents.query.near_object(
near_object="uuid-of-reference-object",
limit=10
)
undefinedresponse = documents.query.near_object(
near_object="uuid-of-reference-object",
limit=10
)
undefinedSimilarity Search (Chroma)
相似度搜索(Chroma)
python
import chromadb
client = chromadb.PersistentClient(path="./chroma_db")
collection = client.get_collection("documents")python
import chromadb
client = chromadb.PersistentClient(path="./chroma_db")
collection = client.get_collection("documents")1. Query with text (auto-embedding)
1. Query with text (auto-embedding)
results = collection.query(
query_texts=["What is machine learning?"],
n_results=10,
include=["documents", "metadatas", "distances"]
)
print(f"Found {len(results['ids'][0])} results")
for i, doc_id in enumerate(results['ids'][0]):
print(f"ID: {doc_id}")
print(f"Distance: {results['distances'][0][i]:.4f}")
print(f"Document: {results['documents'][0][i][:100]}...")
print(f"Metadata: {results['metadatas'][0][i]}")
print("---")
results = collection.query(
query_texts=["What is machine learning?"],
n_results=10,
include=["documents", "metadatas", "distances"]
)
print(f"Found {len(results['ids'][0])} results")
for i, doc_id in enumerate(results['ids'][0]):
print(f"ID: {doc_id}")
print(f"Distance: {results['distances'][0][i]:.4f}")
print(f"Document: {results['documents'][0][i][:100]}...")
print(f"Metadata: {results['metadatas'][0][i]}")
print("---")
2. Query with custom embedding
2. Query with custom embedding
results = collection.query(
query_embeddings=[[0.1, 0.2, 0.3, ...]],
n_results=10
)
undefinedresults = collection.query(
query_embeddings=[[0.1, 0.2, 0.3, ...]],
n_results=10
)
undefinedMetadata Filtering
元数据过滤
Pinecone Metadata Filters
Pinecone 元数据过滤器
python
from pinecone import Pinecone
pc = Pinecone(api_key="YOUR_API_KEY")
index = pc.Index("production-search")python
from pinecone import Pinecone
pc = Pinecone(api_key="YOUR_API_KEY")
index = pc.Index("production-search")1. Equality filter
1. Equality filter
results = index.query(
vector=query_embedding,
top_k=10,
filter={"category": {"$eq": "education"}},
include_metadata=True
)
results = index.query(
vector=query_embedding,
top_k=10,
filter={"category": {"$eq": "education"}},
include_metadata=True
)
2. Inequality filter
2. Inequality filter
results = index.query(
vector=query_embedding,
top_k=10,
filter={"year": {"$ne": 2023}},
include_metadata=True
)
results = index.query(
vector=query_embedding,
top_k=10,
filter={"year": {"$ne": 2023}},
include_metadata=True
)
3. Range filters
3. Range filters
results = index.query(
vector=query_embedding,
top_k=10,
filter={
"$and": [
{"year": {"$gte": 2020}},
{"year": {"$lte": 2024}}
]
},
include_metadata=True
)
results = index.query(
vector=query_embedding,
top_k=10,
filter={
"$and": [
{"year": {"$gte": 2020}},
{"year": {"$lte": 2024}}
]
},
include_metadata=True
)
4. In/Not-in filters
4. In/Not-in filters
results = index.query(
vector=query_embedding,
top_k=10,
filter={
"category": {"$in": ["education", "tutorial", "guide"]}
},
include_metadata=True
)
results = index.query(
vector=query_embedding,
top_k=10,
filter={
"category": {"$in": ["education", "tutorial", "guide"]}
},
include_metadata=True
)
5. Existence check
5. Existence check
results = index.query(
vector=query_embedding,
top_k=10,
filter={"author": {"$exists": True}},
include_metadata=True
)
results = index.query(
vector=query_embedding,
top_k=10,
filter={"author": {"$exists": True}},
include_metadata=True
)
6. Complex AND/OR queries
6. Complex AND/OR queries
results = index.query(
vector=query_embedding,
top_k=10,
filter={
"$and": [
{"category": {"$eq": "education"}},
{
"$or": [
{"year": {"$eq": 2024}},
{"featured": {"$eq": True}}
]
},
{"tags": {"$in": ["ml", "ai"]}}
]
},
include_metadata=True
)
results = index.query(
vector=query_embedding,
top_k=10,
filter={
"$and": [
{"category": {"$eq": "education"}},
{
"$or": [
{"year": {"$eq": 2024}},
{"featured": {"$eq": True}}
]
},
{"tags": {"$in": ["ml", "ai"]}}
]
},
include_metadata=True
)
7. Greater than/Less than
7. Greater than/Less than
results = index.query(
vector=query_embedding,
top_k=10,
filter={
"view_count": {"$gt": 1000},
"rating": {"$gte": 4.5}
},
include_metadata=True
)
undefinedresults = index.query(
vector=query_embedding,
top_k=10,
filter={
"view_count": {"$gt": 1000},
"rating": {"$gte": 4.5}
},
include_metadata=True
)
undefinedProduction Metadata Filter Patterns
生产环境元数据过滤模式
python
undefinedpython
undefinedPattern 1: Time-based filtering (recent content)
Pattern 1: Time-based filtering (recent content)
from datetime import datetime, timedelta
def search_recent_documents(query_text: str, days: int = 30):
"""Search only documents from last N days."""
cutoff_date = (datetime.now() - timedelta(days=days)).isoformat()
results = index.query(
vector=generate_embedding(query_text),
top_k=10,
filter={
"created_at": {"$gte": cutoff_date}
},
include_metadata=True
)
return resultsfrom datetime import datetime, timedelta
def search_recent_documents(query_text: str, days: int = 30):
"""Search only documents from last N days."""
cutoff_date = (datetime.now() - timedelta(days=days)).isoformat()
results = index.query(
vector=generate_embedding(query_text),
top_k=10,
filter={
"created_at": {"$gte": cutoff_date}
},
include_metadata=True
)
return resultsPattern 2: User permission filtering
Pattern 2: User permission filtering
def search_with_permissions(query_text: str, user_id: str, user_roles: list):
"""Search only documents user has access to."""
results = index.query(
vector=generate_embedding(query_text),
top_k=10,
filter={
"$or": [
{"owner_id": {"$eq": user_id}},
{"shared_with": {"$in": [user_id]}},
{"public": {"$eq": True}},
{"required_roles": {"$in": user_roles}}
]
},
include_metadata=True
)
return results
def search_with_permissions(query_text: str, user_id: str, user_roles: list):
"""Search only documents user has access to."""
results = index.query(
vector=generate_embedding(query_text),
top_k=10,
filter={
"$or": [
{"owner_id": {"$eq": user_id}},
{"shared_with": {"$in": [user_id]}},
{"public": {"$eq": True}},
{"required_roles": {"$in": user_roles}}
]
},
include_metadata=True
)
return results
Pattern 3: Multi-tenant filtering
Pattern 3: Multi-tenant filtering
def search_tenant_documents(query_text: str, tenant_id: str, category: str = None):
"""Search within a specific tenant's data."""
filter_dict = {"tenant_id": {"$eq": tenant_id}}
if category:
filter_dict["category"] = {"$eq": category}
results = index.query(
vector=generate_embedding(query_text),
top_k=10,
filter=filter_dict,
include_metadata=True
)
return resultsdef search_tenant_documents(query_text: str, tenant_id: str, category: str = None):
"""Search within a specific tenant's data."""
filter_dict = {"tenant_id": {"$eq": tenant_id}}
if category:
filter_dict["category"] = {"$eq": category}
results = index.query(
vector=generate_embedding(query_text),
top_k=10,
filter=filter_dict,
include_metadata=True
)
return resultsPattern 4: Faceted search
Pattern 4: Faceted search
def faceted_search(query_text: str, facets: dict):
"""Search with multiple facet filters."""
filter_conditions = []
for field, values in facets.items():
if isinstance(values, list):
filter_conditions.append({field: {"$in": values}})
else:
filter_conditions.append({field: {"$eq": values}})
results = index.query(
vector=generate_embedding(query_text),
top_k=10,
filter={"$and": filter_conditions} if filter_conditions else {},
include_metadata=True
)
return resultsdef faceted_search(query_text: str, facets: dict):
"""Search with multiple facet filters."""
filter_conditions = []
for field, values in facets.items():
if isinstance(values, list):
filter_conditions.append({field: {"$in": values}})
else:
filter_conditions.append({field: {"$eq": values}})
results = index.query(
vector=generate_embedding(query_text),
top_k=10,
filter={"$and": filter_conditions} if filter_conditions else {},
include_metadata=True
)
return resultsUsage
Usage
results = faceted_search(
"machine learning tutorials",
facets={
"category": ["education", "tutorial"],
"difficulty": "beginner",
"language": ["english", "spanish"]
}
)
undefinedresults = faceted_search(
"machine learning tutorials",
facets={
"category": ["education", "tutorial"],
"difficulty": "beginner",
"language": ["english", "spanish"]
}
)
undefinedWeaviate Metadata Filtering
Weaviate 元数据过滤
python
import weaviate
from weaviate.classes.query import Filter
client = weaviate.connect_to_local()
documents = client.collections.get("Documents")python
import weaviate
from weaviate.classes.query import Filter
client = weaviate.connect_to_local()
documents = client.collections.get("Documents")1. Simple equality filter
1. Simple equality filter
response = documents.query.near_text(
query="vector databases",
limit=10,
filters=Filter.by_property("category").equal("education")
)
response = documents.query.near_text(
query="vector databases",
limit=10,
filters=Filter.by_property("category").equal("education")
)
2. Greater than filter
2. Greater than filter
response = documents.query.near_text(
query="machine learning",
limit=10,
filters=Filter.by_property("year").greater_than(2020)
)
response = documents.query.near_text(
query="machine learning",
limit=10,
filters=Filter.by_property("year").greater_than(2020)
)
3. Contains any filter
3. Contains any filter
response = documents.query.near_text(
query="AI tutorials",
limit=10,
filters=Filter.by_property("tags").contains_any(["ml", "ai", "deep-learning"])
)
response = documents.query.near_text(
query="AI tutorials",
limit=10,
filters=Filter.by_property("tags").contains_any(["ml", "ai", "deep-learning"])
)
4. Complex AND/OR filters
4. Complex AND/OR filters
response = documents.query.near_text(
query="database optimization",
limit=10,
filters=(
Filter.by_property("category").equal("tutorial") &
(Filter.by_property("difficulty").equal("beginner") |
Filter.by_property("featured").equal(True))
)
)
undefinedresponse = documents.query.near_text(
query="database optimization",
limit=10,
filters=(
Filter.by_property("category").equal("tutorial") &
(Filter.by_property("difficulty").equal("beginner") |
Filter.by_property("featured").equal(True))
)
)
undefinedChroma Metadata Filtering
Chroma 元数据过滤
python
import chromadb
client = chromadb.PersistentClient(path="./chroma_db")
collection = client.get_collection("documents")python
import chromadb
client = chromadb.PersistentClient(path="./chroma_db")
collection = client.get_collection("documents")1. Simple equality filter
1. Simple equality filter
results = collection.query(
query_texts=["vector databases"],
n_results=10,
where={"category": "education"}
)
results = collection.query(
query_texts=["vector databases"],
n_results=10,
where={"category": "education"}
)
2. AND conditions
2. AND conditions
results = collection.query(
query_texts=["machine learning"],
n_results=10,
where={
"$and": [
{"category": "education"},
{"difficulty": "beginner"}
]
}
)
results = collection.query(
query_texts=["machine learning"],
n_results=10,
where={
"$and": [
{"category": "education"},
{"difficulty": "beginner"}
]
}
)
3. OR conditions
3. OR conditions
results = collection.query(
query_texts=["AI tutorials"],
n_results=10,
where={
"$or": [
{"category": "education"},
{"category": "tutorial"}
]
}
)
results = collection.query(
query_texts=["AI tutorials"],
n_results=10,
where={
"$or": [
{"category": "education"},
{"category": "tutorial"}
]
}
)
4. Greater than/Less than
4. Greater than/Less than
results = collection.query(
query_texts=["recent content"],
n_results=10,
where={"year": {"$gte": 2023}}
)
results = collection.query(
query_texts=["recent content"],
n_results=10,
where={"year": {"$gte": 2023}}
)
5. In operator
5. In operator
results = collection.query(
query_texts=["programming guides"],
n_results=10,
where={"language": {"$in": ["python", "javascript", "go"]}}
)
undefinedresults = collection.query(
query_texts=["programming guides"],
n_results=10,
where={"language": {"$in": ["python", "javascript", "go"]}}
)
undefinedHybrid Search
混合搜索
Pinecone Hybrid Search (Dense + Sparse)
Pinecone 混合搜索(稠密+稀疏)
python
from pinecone import Pinecone
from typing import Dict, List
import re
from collections import Counter
pc = Pinecone(api_key="YOUR_API_KEY")
index = pc.Index("hybrid-search-index")
def create_sparse_vector(text: str, top_k: int = 100) -> Dict:
"""Create sparse vector using simple TF approach."""
# Tokenize
tokens = re.findall(r'\w+', text.lower())
# Calculate term frequencies
tf = Counter(tokens)
# Create vocabulary mapping
vocab = {word: hash(word) % 10000 for word in set(tokens)}
# Get top-k terms
top_terms = tf.most_common(top_k)
# Create sparse vector
indices = [vocab[term] for term, _ in top_terms]
values = [float(freq) / len(tokens) for _, freq in top_terms]
return {
"indices": indices,
"values": values
}
def hybrid_search(query_text: str, top_k: int = 10, alpha: float = 0.5):
"""
Perform hybrid search combining dense and sparse vectors.
alpha: weight for dense search (0.0 = sparse only, 1.0 = dense only)
"""
# Generate dense vector
dense_vector = generate_embedding(query_text)
# Generate sparse vector
sparse_vector = create_sparse_vector(query_text)
# Hybrid query
results = index.query(
vector=dense_vector,
sparse_vector=sparse_vector,
top_k=top_k,
include_metadata=True
)
return resultspython
from pinecone import Pinecone
from typing import Dict, List
import re
from collections import Counter
pc = Pinecone(api_key="YOUR_API_KEY")
index = pc.Index("hybrid-search-index")
def create_sparse_vector(text: str, top_k: int = 100) -> Dict:
"""Create sparse vector using simple TF approach."""
# Tokenize
tokens = re.findall(r'\w+', text.lower())
# Calculate term frequencies
tf = Counter(tokens)
# Create vocabulary mapping
vocab = {word: hash(word) % 10000 for word in set(tokens)}
# Get top-k terms
top_terms = tf.most_common(top_k)
# Create sparse vector
indices = [vocab[term] for term, _ in top_terms]
values = [float(freq) / len(tokens) for _, freq in top_terms]
return {
"indices": indices,
"values": values
}
def hybrid_search(query_text: str, top_k: int = 10, alpha: float = 0.5):
"""
Perform hybrid search combining dense and sparse vectors.
alpha: weight for dense search (0.0 = sparse only, 1.0 = dense only)
"""
# Generate dense vector
dense_vector = generate_embedding(query_text)
# Generate sparse vector
sparse_vector = create_sparse_vector(query_text)
# Hybrid query
results = index.query(
vector=dense_vector,
sparse_vector=sparse_vector,
top_k=top_k,
include_metadata=True
)
return resultsExample usage
Example usage
results = hybrid_search("machine learning vector databases", top_k=10)
for match in results.matches:
print(f"{match.metadata['title']}: {match.score:.4f}")
undefinedresults = hybrid_search("machine learning vector databases", top_k=10)
for match in results.matches:
print(f"{match.metadata['title']}: {match.score:.4f}")
undefinedWeaviate Hybrid Search
Weaviate 混合搜索
python
import weaviate
client = weaviate.connect_to_local()
documents = client.collections.get("Documents")python
import weaviate
client = weaviate.connect_to_local()
documents = client.collections.get("Documents")Hybrid search (combines dense vector + BM25 keyword search)
Hybrid search (combines dense vector + BM25 keyword search)
response = documents.query.hybrid(
query="vector database performance",
limit=10,
alpha=0.5, # 0 = pure keyword, 1 = pure vector, 0.5 = balanced
fusion_type="rankedFusion" # or "relativeScore"
)
for obj in response.objects:
print(f"Title: {obj.properties['title']}")
print(f"Score: {obj.metadata.score}")
print("---")
response = documents.query.hybrid(
query="vector database performance",
limit=10,
alpha=0.5, # 0 = pure keyword, 1 = pure vector, 0.5 = balanced
fusion_type="rankedFusion" # or "relativeScore"
)
for obj in response.objects:
print(f"Title: {obj.properties['title']}")
print(f"Score: {obj.metadata.score}")
print("---")
Hybrid search with filters
Hybrid search with filters
response = documents.query.hybrid(
query="machine learning tutorials",
limit=10,
alpha=0.7, # Favor semantic search
filters=Filter.by_property("category").equal("education")
)
response = documents.query.hybrid(
query="machine learning tutorials",
limit=10,
alpha=0.7, # Favor semantic search
filters=Filter.by_property("category").equal("education")
)
Hybrid search with custom vector
Hybrid search with custom vector
response = documents.query.hybrid(
query="custom query",
vector=[0.1, 0.2, 0.3, ...], # Your pre-computed vector
limit=10,
alpha=0.5
)
undefinedresponse = documents.query.hybrid(
query="custom query",
vector=[0.1, 0.2, 0.3, ...], # Your pre-computed vector
limit=10,
alpha=0.5
)
undefinedBM25 + Vector Hybrid (Custom Implementation)
BM25 + 向量混合(自定义实现)
python
from rank_bm25 import BM25Okapi
import numpy as np
class HybridSearchEngine:
"""Custom hybrid search combining BM25 and vector search."""
def __init__(self, index, documents: List[Dict]):
self.index = index
self.documents = documents
# Build BM25 index
tokenized_docs = [doc['content'].lower().split() for doc in documents]
self.bm25 = BM25Okapi(tokenized_docs)
self.doc_ids = [doc['id'] for doc in documents]
def search(self, query: str, top_k: int = 10, alpha: float = 0.5):
"""
Hybrid search with custom score fusion.
alpha: weight for vector search (1-alpha for BM25)
"""
# 1. Vector search
query_embedding = generate_embedding(query)
vector_results = self.index.query(
vector=query_embedding,
top_k=top_k * 2, # Get more candidates
include_metadata=True
)
# 2. BM25 search
tokenized_query = query.lower().split()
bm25_scores = self.bm25.get_scores(tokenized_query)
# 3. Normalize scores
vector_scores = {
m.id: m.score for m in vector_results.matches
}
max_vec_score = max(vector_scores.values()) if vector_scores else 1.0
max_bm25_score = max(bm25_scores) if max(bm25_scores) > 0 else 1.0
# 4. Combine scores
hybrid_scores = {}
all_ids = set(vector_scores.keys()) | set(self.doc_ids)
for doc_id in all_ids:
vec_score = vector_scores.get(doc_id, 0) / max_vec_score
idx = self.doc_ids.index(doc_id) if doc_id in self.doc_ids else -1
bm25_score = bm25_scores[idx] / max_bm25_score if idx >= 0 else 0
hybrid_scores[doc_id] = (alpha * vec_score) + ((1 - alpha) * bm25_score)
# 5. Rank and return top-k
ranked = sorted(hybrid_scores.items(), key=lambda x: x[1], reverse=True)
return ranked[:top_k]python
from rank_bm25 import BM25Okapi
import numpy as np
class HybridSearchEngine:
"""Custom hybrid search combining BM25 and vector search."""
def __init__(self, index, documents: List[Dict]):
self.index = index
self.documents = documents
# Build BM25 index
tokenized_docs = [doc['content'].lower().split() for doc in documents]
self.bm25 = BM25Okapi(tokenized_docs)
self.doc_ids = [doc['id'] for doc in documents]
def search(self, query: str, top_k: int = 10, alpha: float = 0.5):
"""
Hybrid search with custom score fusion.
alpha: weight for vector search (1-alpha for BM25)
"""
# 1. Vector search
query_embedding = generate_embedding(query)
vector_results = self.index.query(
vector=query_embedding,
top_k=top_k * 2, # Get more candidates
include_metadata=True
)
# 2. BM25 search
tokenized_query = query.lower().split()
bm25_scores = self.bm25.get_scores(tokenized_query)
# 3. Normalize scores
vector_scores = {
m.id: m.score for m in vector_results.matches
}
max_vec_score = max(vector_scores.values()) if vector_scores else 1.0
max_bm25_score = max(bm25_scores) if max(bm25_scores) > 0 else 1.0
# 4. Combine scores
hybrid_scores = {}
all_ids = set(vector_scores.keys()) | set(self.doc_ids)
for doc_id in all_ids:
vec_score = vector_scores.get(doc_id, 0) / max_vec_score
idx = self.doc_ids.index(doc_id) if doc_id in self.doc_ids else -1
bm25_score = bm25_scores[idx] / max_bm25_score if idx >= 0 else 0
hybrid_scores[doc_id] = (alpha * vec_score) + ((1 - alpha) * bm25_score)
# 5. Rank and return top-k
ranked = sorted(hybrid_scores.items(), key=lambda x: x[1], reverse=True)
return ranked[:top_k]Usage
Usage
engine = HybridSearchEngine(index, documents)
results = engine.search("machine learning databases", top_k=10, alpha=0.7)
undefinedengine = HybridSearchEngine(index, documents)
results = engine.search("machine learning databases", top_k=10, alpha=0.7)
undefinedNamespace & Collection Management
命名空间与集合管理
Pinecone Namespaces
Pinecone 命名空间
python
from pinecone import Pinecone
pc = Pinecone(api_key="YOUR_API_KEY")
index = pc.Index("production-search")python
from pinecone import Pinecone
pc = Pinecone(api_key="YOUR_API_KEY")
index = pc.Index("production-search")1. Upsert to specific namespace
1. Upsert to specific namespace
index.upsert(
vectors=[
{"id": "doc1", "values": [...], "metadata": {...}}
],
namespace="production"
)
index.upsert(
vectors=[
{"id": "doc1", "values": [...], "metadata": {...}}
],
namespace="production"
)
2. Query specific namespace
2. Query specific namespace
results = index.query(
vector=[...],
top_k=10,
namespace="production",
include_metadata=True
)
results = index.query(
vector=[...],
top_k=10,
namespace="production",
include_metadata=True
)
3. Get namespace statistics
3. Get namespace statistics
stats = index.describe_index_stats()
for namespace, info in stats.namespaces.items():
print(f"Namespace: {namespace}")
print(f" Vector count: {info.vector_count}")
stats = index.describe_index_stats()
for namespace, info in stats.namespaces.items():
print(f"Namespace: {namespace}")
print(f" Vector count: {info.vector_count}")
4. Delete all vectors in namespace
4. Delete all vectors in namespace
index.delete(delete_all=True, namespace="test")
index.delete(delete_all=True, namespace="test")
5. Multi-namespace architecture
5. Multi-namespace architecture
NAMESPACES = {
"production": "Live user-facing data",
"staging": "Testing before production",
"development": "Development and experiments",
"archive": "Historical data"
}
def upsert_with_environment(vectors, environment="production"):
"""Upsert to appropriate namespace."""
namespace = environment if environment in NAMESPACES else "development"
index.upsert(vectors=vectors, namespace=namespace)
def search_across_namespaces(query_vector, namespaces=["production", "archive"]):
"""Search multiple namespaces and combine results."""
all_results = []
for ns in namespaces:
results = index.query(
vector=query_vector,
top_k=10,
namespace=ns,
include_metadata=True
)
for match in results.matches:
match.metadata["source_namespace"] = ns
all_results.append(match)
# Sort by score
all_results.sort(key=lambda x: x.score, reverse=True)
return all_results[:10]undefinedNAMESPACES = {
"production": "Live user-facing data",
"staging": "Testing before production",
"development": "Development and experiments",
"archive": "Historical data"
}
def upsert_with_environment(vectors, environment="production"):
"""Upsert to appropriate namespace."""
namespace = environment if environment in NAMESPACES else "development"
index.upsert(vectors=vectors, namespace=namespace)
def search_across_namespaces(query_vector, namespaces=["production", "archive"]):
"""Search multiple namespaces and combine results."""
all_results = []
for ns in namespaces:
results = index.query(
vector=query_vector,
top_k=10,
namespace=ns,
include_metadata=True
)
for match in results.matches:
match.metadata["source_namespace"] = ns
all_results.append(match)
# Sort by score
all_results.sort(key=lambda x: x.score, reverse=True)
return all_results[:10]undefinedWeaviate Collections
Weaviate 集合
python
import weaviate
from weaviate.classes.config import Configure
client = weaviate.connect_to_local()python
import weaviate
from weaviate.classes.config import Configure
client = weaviate.connect_to_local()1. Create multiple collections
1. Create multiple collections
collections_config = [
{
"name": "Products",
"properties": ["name", "description", "category", "price"]
},
{
"name": "Users",
"properties": ["username", "bio", "interests"]
},
{
"name": "Reviews",
"properties": ["content", "rating", "product_id", "user_id"]
}
]
for config in collections_config:
try:
client.collections.create(
name=config["name"],
vectorizer_config=Configure.Vectorizer.text2vec_openai()
)
except Exception as e:
print(f"Collection {config['name']} exists: {e}")
collections_config = [
{
"name": "Products",
"properties": ["name", "description", "category", "price"]
},
{
"name": "Users",
"properties": ["username", "bio", "interests"]
},
{
"name": "Reviews",
"properties": ["content", "rating", "product_id", "user_id"]
}
]
for config in collections_config:
try:
client.collections.create(
name=config["name"],
vectorizer_config=Configure.Vectorizer.text2vec_openai()
)
except Exception as e:
print(f"Collection {config['name']} exists: {e}")
2. Cross-collection references
2. Cross-collection references
client.collections.create(
name="Orders",
references=[
weaviate.classes.config.ReferenceProperty(
name="hasProduct",
target_collection="Products"
),
weaviate.classes.config.ReferenceProperty(
name="byUser",
target_collection="Users"
)
]
)
client.collections.create(
name="Orders",
references=[
weaviate.classes.config.ReferenceProperty(
name="hasProduct",
target_collection="Products"
),
weaviate.classes.config.ReferenceProperty(
name="byUser",
target_collection="Users"
)
]
)
3. Multi-collection search
3. Multi-collection search
def search_all_collections(query: str):
"""Search across multiple collections."""
results = {}
for collection_name in ["Products", "Users", "Reviews"]:
collection = client.collections.get(collection_name)
response = collection.query.near_text(
query=query,
limit=5
)
results[collection_name] = response.objects
return resultsdef search_all_collections(query: str):
"""Search across multiple collections."""
results = {}
for collection_name in ["Products", "Users", "Reviews"]:
collection = client.collections.get(collection_name)
response = collection.query.near_text(
query=query,
limit=5
)
results[collection_name] = response.objects
return results4. Delete collection
4. Delete collection
client.collections.delete("TestCollection")
undefinedclient.collections.delete("TestCollection")
undefinedChroma Collections
Chroma 集合
python
import chromadb
client = chromadb.PersistentClient(path="./chroma_db")python
import chromadb
client = chromadb.PersistentClient(path="./chroma_db")1. Create multiple collections
1. Create multiple collections
collections = {
"documents": {
"metadata": {"description": "Document embeddings"},
"embedding_function": None # Use default
},
"images": {
"metadata": {"description": "Image embeddings"},
"embedding_function": None
},
"code": {
"metadata": {"description": "Code snippets"},
"embedding_function": None
}
}
for name, config in collections.items():
collection = client.get_or_create_collection(
name=name,
metadata=config["metadata"]
)
collections = {
"documents": {
"metadata": {"description": "Document embeddings"},
"embedding_function": None # Use default
},
"images": {
"metadata": {"description": "Image embeddings"},
"embedding_function": None
},
"code": {
"metadata": {"description": "Code snippets"},
"embedding_function": None
}
}
for name, config in collections.items():
collection = client.get_or_create_collection(
name=name,
metadata=config["metadata"]
)
2. List all collections
2. List all collections
all_collections = client.list_collections()
for coll in all_collections:
print(f"Collection: {coll.name}")
print(f" Count: {coll.count()}")
print(f" Metadata: {coll.metadata}")
all_collections = client.list_collections()
for coll in all_collections:
print(f"Collection: {coll.name}")
print(f" Count: {coll.count()}")
print(f" Metadata: {coll.metadata}")
3. Collection-specific operations
3. Collection-specific operations
docs_collection = client.get_collection("documents")
docs_collection.add(
documents=["Document text..."],
metadatas=[{"type": "article"}],
ids=["doc1"]
)
docs_collection = client.get_collection("documents")
docs_collection.add(
documents=["Document text..."],
metadatas=[{"type": "article"}],
ids=["doc1"]
)
4. Delete collection
4. Delete collection
client.delete_collection("test_collection")
client.delete_collection("test_collection")
5. Multi-collection search
5. Multi-collection search
def search_all_collections(query: str, n_results: int = 5):
"""Search across all collections."""
results = {}
for collection in client.list_collections():
try:
collection_results = collection.query(
query_texts=[query],
n_results=n_results
)
results[collection.name] = collection_results
except Exception as e:
print(f"Error searching {collection.name}: {e}")
return resultsundefineddef search_all_collections(query: str, n_results: int = 5):
"""Search across all collections."""
results = {}
for collection in client.list_collections():
try:
collection_results = collection.query(
query_texts=[query],
n_results=n_results
)
results[collection.name] = collection_results
except Exception as e:
print(f"Error searching {collection.name}: {e}")
return resultsundefinedPerformance & Scaling
性能与扩容
Batch Operations Best Practices
批量操作最佳实践
python
from pinecone import Pinecone
from typing import List, Dict
import asyncio
from concurrent.futures import ThreadPoolExecutor
import time
pc = Pinecone(api_key="YOUR_API_KEY")
index = pc.Index("production-search")python
from pinecone import Pinecone
from typing import List, Dict
import asyncio
from concurrent.futures import ThreadPoolExecutor
import time
pc = Pinecone(api_key="YOUR_API_KEY")
index = pc.Index("production-search")1. Optimal batch size
1. Optimal batch size
OPTIMAL_BATCH_SIZE = 100 # Pinecone recommendation
def batch_upsert(vectors: List[Dict], batch_size: int = OPTIMAL_BATCH_SIZE):
"""Efficiently upsert vectors in batches."""
total_batches = (len(vectors) + batch_size - 1) // batch_size
for i in range(0, len(vectors), batch_size):
batch = vectors[i:i + batch_size]
index.upsert(vectors=batch, namespace="documents")
if (i // batch_size + 1) % 10 == 0:
print(f"Processed {i // batch_size + 1}/{total_batches} batches")OPTIMAL_BATCH_SIZE = 100 # Pinecone recommendation
def batch_upsert(vectors: List[Dict], batch_size: int = OPTIMAL_BATCH_SIZE):
"""Efficiently upsert vectors in batches."""
total_batches = (len(vectors) + batch_size - 1) // batch_size
for i in range(0, len(vectors), batch_size):
batch = vectors[i:i + batch_size]
index.upsert(vectors=batch, namespace="documents")
if (i // batch_size + 1) % 10 == 0:
print(f"Processed {i // batch_size + 1}/{total_batches} batches")2. Parallel batch upsert
2. Parallel batch upsert
def parallel_batch_upsert(vectors: List[Dict], num_workers: int = 4):
"""Parallel upsert using thread pool."""
batch_size = 100
batches = [
vectors[i:i + batch_size]
for i in range(0, len(vectors), batch_size)
]
def upsert_batch(batch):
try:
index.upsert(vectors=batch, namespace="documents")
return len(batch)
except Exception as e:
print(f"Error upserting batch: {e}")
return 0
with ThreadPoolExecutor(max_workers=num_workers) as executor:
results = list(executor.map(upsert_batch, batches))
print(f"Successfully upserted {sum(results)} vectors")def parallel_batch_upsert(vectors: List[Dict], num_workers: int = 4):
"""Parallel upsert using thread pool."""
batch_size = 100
batches = [
vectors[i:i + batch_size]
for i in range(0, len(vectors), batch_size)
]
def upsert_batch(batch):
try:
index.upsert(vectors=batch, namespace="documents")
return len(batch)
except Exception as e:
print(f"Error upserting batch: {e}")
return 0
with ThreadPoolExecutor(max_workers=num_workers) as executor:
results = list(executor.map(upsert_batch, batches))
print(f"Successfully upserted {sum(results)} vectors")3. Rate limiting for API calls
3. Rate limiting for API calls
class RateLimiter:
"""Simple rate limiter for API calls."""
def __init__(self, max_calls: int, time_window: float):
self.max_calls = max_calls
self.time_window = time_window
self.calls = []
def wait_if_needed(self):
"""Wait if rate limit would be exceeded."""
now = time.time()
# Remove old calls outside time window
self.calls = [call_time for call_time in self.calls
if now - call_time < self.time_window]
if len(self.calls) >= self.max_calls:
sleep_time = self.time_window - (now - self.calls[0])
if sleep_time > 0:
time.sleep(sleep_time)
self.calls = []
self.calls.append(now)class RateLimiter:
"""Simple rate limiter for API calls."""
def __init__(self, max_calls: int, time_window: float):
self.max_calls = max_calls
self.time_window = time_window
self.calls = []
def wait_if_needed(self):
"""Wait if rate limit would be exceeded."""
now = time.time()
# Remove old calls outside time window
self.calls = [call_time for call_time in self.calls
if now - call_time < self.time_window]
if len(self.calls) >= self.max_calls:
sleep_time = self.time_window - (now - self.calls[0])
if sleep_time > 0:
time.sleep(sleep_time)
self.calls = []
self.calls.append(now)Usage
Usage
rate_limiter = RateLimiter(max_calls=100, time_window=60) # 100 calls/minute
for batch in batches:
rate_limiter.wait_if_needed()
index.upsert(vectors=batch)
rate_limiter = RateLimiter(max_calls=100, time_window=60) # 100 calls/minute
for batch in batches:
rate_limiter.wait_if_needed()
index.upsert(vectors=batch)
4. Bulk delete optimization
4. Bulk delete optimization
def bulk_delete_by_filter(filter_dict: Dict, namespace: str = "documents"):
"""Delete vectors matching filter (more efficient than individual deletes)."""
# First, get IDs matching filter
results = index.query(
vector=[0] * 1536, # Dummy vector
top_k=10000, # Max allowed
filter=filter_dict,
namespace=namespace,
include_values=False
)
ids_to_delete = [match.id for match in results.matches]
# Delete in batches
batch_size = 1000
for i in range(0, len(ids_to_delete), batch_size):
batch = ids_to_delete[i:i + batch_size]
index.delete(ids=batch, namespace=namespace)
print(f"Deleted {len(batch)} vectors")undefineddef bulk_delete_by_filter(filter_dict: Dict, namespace: str = "documents"):
"""Delete vectors matching filter (more efficient than individual deletes)."""
# First, get IDs matching filter
results = index.query(
vector=[0] * 1536, # Dummy vector
top_k=10000, # Max allowed
filter=filter_dict,
namespace=namespace,
include_values=False
)
ids_to_delete = [match.id for match in results.matches]
# Delete in batches
batch_size = 1000
for i in range(0, len(ids_to_delete), batch_size):
batch = ids_to_delete[i:i + batch_size]
index.delete(ids=batch, namespace=namespace)
print(f"Deleted {len(batch)} vectors")undefinedQuery Optimization
查询优化
python
undefinedpython
undefined1. Minimize data transfer
1. Minimize data transfer
results = index.query(
vector=query_vector,
top_k=10,
include_values=False, # Don't return vectors if not needed
include_metadata=False, # Don't return metadata if not needed
namespace="documents"
)
results = index.query(
vector=query_vector,
top_k=10,
include_values=False, # Don't return vectors if not needed
include_metadata=False, # Don't return metadata if not needed
namespace="documents"
)
2. Use appropriate top_k
2. Use appropriate top_k
Smaller top_k = faster queries
Smaller top_k = faster queries
results_small = index.query(vector=query_vector, top_k=10) # Fast
results_large = index.query(vector=query_vector, top_k=1000) # Slower
results_small = index.query(vector=query_vector, top_k=10) # Fast
results_large = index.query(vector=query_vector, top_k=1000) # Slower
3. Filter before vector search when possible
3. Filter before vector search when possible
Good: Reduces search space
Good: Reduces search space
results = index.query(
vector=query_vector,
top_k=10,
filter={"category": "education"}, # Reduces candidates
namespace="documents"
)
results = index.query(
vector=query_vector,
top_k=10,
filter={"category": "education"}, # Reduces candidates
namespace="documents"
)
4. Batch queries when possible
4. Batch queries when possible
More efficient than individual queries
More efficient than individual queries
queries = [embedding1, embedding2, embedding3]
results = index.query(
queries=queries,
top_k=10,
namespace="documents"
)
queries = [embedding1, embedding2, embedding3]
results = index.query(
queries=queries,
top_k=10,
namespace="documents"
)
5. Cache frequent queries
5. Cache frequent queries
from functools import lru_cache
import hashlib
import json
def vector_hash(vector: List[float]) -> str:
"""Create hash of vector for caching."""
return hashlib.md5(json.dumps(vector).encode()).hexdigest()
class CachedIndex:
"""Wrapper with query caching."""
def __init__(self, index, cache_size: int = 1000):
self.index = index
self.cache = {}
self.cache_size = cache_size
def query(self, vector: List[float], top_k: int = 10, **kwargs):
"""Query with caching."""
cache_key = f"{vector_hash(vector)}_{top_k}_{json.dumps(kwargs)}"
if cache_key in self.cache:
return self.cache[cache_key]
results = self.index.query(vector=vector, top_k=top_k, **kwargs)
if len(self.cache) >= self.cache_size:
# Remove oldest entry
self.cache.pop(next(iter(self.cache)))
self.cache[cache_key] = results
return resultsfrom functools import lru_cache
import hashlib
import json
def vector_hash(vector: List[float]) -> str:
"""Create hash of vector for caching."""
return hashlib.md5(json.dumps(vector).encode()).hexdigest()
class CachedIndex:
"""Wrapper with query caching."""
def __init__(self, index, cache_size: int = 1000):
self.index = index
self.cache = {}
self.cache_size = cache_size
def query(self, vector: List[float], top_k: int = 10, **kwargs):
"""Query with caching."""
cache_key = f"{vector_hash(vector)}_{top_k}_{json.dumps(kwargs)}"
if cache_key in self.cache:
return self.cache[cache_key]
results = self.index.query(vector=vector, top_k=top_k, **kwargs)
if len(self.cache) >= self.cache_size:
# Remove oldest entry
self.cache.pop(next(iter(self.cache)))
self.cache[cache_key] = results
return resultsUsage
Usage
cached_index = CachedIndex(index)
results = cached_index.query(query_vector, top_k=10) # Cached on subsequent calls
undefinedcached_index = CachedIndex(index)
results = cached_index.query(query_vector, top_k=10) # Cached on subsequent calls
undefinedScaling Strategies
扩容策略
python
undefinedpython
undefined1. Index sizing for scale
1. Index sizing for scale
def calculate_index_requirements(
num_vectors: int,
dimension: int,
metadata_size_per_vector: int = 1024 # bytes
) -> Dict:
"""Calculate storage and cost for index."""
# Approximate calculations
vector_size = dimension * 4 # 4 bytes per float32
total_vector_storage = num_vectors * vector_size
total_metadata_storage = num_vectors * metadata_size_per_vector
total_storage = total_vector_storage + total_metadata_storage
# Pinecone pricing (approximate)
storage_cost_per_gb_month = 0.095 # Serverless pricing
total_gb = total_storage / (1024 ** 3)
monthly_storage_cost = total_gb * storage_cost_per_gb_month
return {
"num_vectors": num_vectors,
"total_storage_gb": round(total_gb, 2),
"monthly_storage_cost_usd": round(monthly_storage_cost, 2),
"recommended_pod_type": "s1.x1" if num_vectors > 10_000_000 else "p1.x1"
}def calculate_index_requirements(
num_vectors: int,
dimension: int,
metadata_size_per_vector: int = 1024 # bytes
) -> Dict:
"""Calculate storage and cost for index."""
# Approximate calculations
vector_size = dimension * 4 # 4 bytes per float32
total_vector_storage = num_vectors * vector_size
total_metadata_storage = num_vectors * metadata_size_per_vector
total_storage = total_vector_storage + total_metadata_storage
# Pinecone pricing (approximate)
storage_cost_per_gb_month = 0.095 # Serverless pricing
total_gb = total_storage / (1024 ** 3)
monthly_storage_cost = total_gb * storage_cost_per_gb_month
return {
"num_vectors": num_vectors,
"total_storage_gb": round(total_gb, 2),
"monthly_storage_cost_usd": round(monthly_storage_cost, 2),
"recommended_pod_type": "s1.x1" if num_vectors > 10_000_000 else "p1.x1"
}Example
Example
reqs = calculate_index_requirements(
num_vectors=10_000_000,
dimension=1536
)
print(f"10M vectors storage: {reqs['total_storage_gb']} GB")
print(f"Monthly cost: ${reqs['monthly_storage_cost_usd']}")
reqs = calculate_index_requirements(
num_vectors=10_000_000,
dimension=1536
)
print(f"10M vectors storage: {reqs['total_storage_gb']} GB")
print(f"Monthly cost: ${reqs['monthly_storage_cost_usd']}")
2. Sharding strategy for massive scale
2. Sharding strategy for massive scale
def create_sharded_indexes(
base_name: str,
num_shards: int,
dimension: int,
metric: str = "cosine"
):
"""Create multiple indexes for horizontal scaling."""
indexes = []
for shard_id in range(num_shards):
index_name = f"{base_name}-shard-{shard_id}"
pc.create_index(
name=index_name,
dimension=dimension,
metric=metric,
spec=ServerlessSpec(cloud="aws", region="us-east-1")
)
indexes.append(index_name)
return indexesdef route_to_shard(vector_id: str, num_shards: int) -> int:
"""Determine which shard a vector belongs to."""
return hash(vector_id) % num_shards
def query_sharded_indexes(query_vector: List[float], indexes: List, top_k: int = 10):
"""Query all shards and merge results."""
all_results = []
for index_name in indexes:
idx = pc.Index(index_name)
results = idx.query(
vector=query_vector,
top_k=top_k,
include_metadata=True
)
all_results.extend(results.matches)
# Sort by score and return top_k
all_results.sort(key=lambda x: x.score, reverse=True)
return all_results[:top_k]undefineddef create_sharded_indexes(
base_name: str,
num_shards: int,
dimension: int,
metric: str = "cosine"
):
"""Create multiple indexes for horizontal scaling."""
indexes = []
for shard_id in range(num_shards):
index_name = f"{base_name}-shard-{shard_id}"
pc.create_index(
name=index_name,
dimension=dimension,
metric=metric,
spec=ServerlessSpec(cloud="aws", region="us-east-1")
)
indexes.append(index_name)
return indexesdef route_to_shard(vector_id: str, num_shards: int) -> int:
"""Determine which shard a vector belongs to."""
return hash(vector_id) % num_shards
def query_sharded_indexes(query_vector: List[float], indexes: List, top_k: int = 10):
"""Query all shards and merge results."""
all_results = []
for index_name in indexes:
idx = pc.Index(index_name)
results = idx.query(
vector=query_vector,
top_k=top_k,
include_metadata=True
)
all_results.extend(results.matches)
# Sort by score and return top_k
all_results.sort(key=lambda x: x.score, reverse=True)
return all_results[:top_k]undefinedProduction Best Practices
生产环境最佳实践
Error Handling & Retries
错误处理与重试
python
import time
from typing import Optional, Callable
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class PineconeRetryHandler:
"""Robust error handling for Pinecone operations."""
def __init__(self, max_retries: int = 3, base_delay: float = 1.0):
self.max_retries = max_retries
self.base_delay = base_delay
def retry_with_backoff(
self,
operation: Callable,
*args,
**kwargs
) -> Optional[any]:
"""Retry operation with exponential backoff."""
for attempt in range(self.max_retries):
try:
return operation(*args, **kwargs)
except Exception as e:
if attempt == self.max_retries - 1:
logger.error(f"Operation failed after {self.max_retries} attempts: {e}")
raise
delay = self.base_delay * (2 ** attempt)
logger.warning(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay}s...")
time.sleep(delay)
return Nonepython
import time
from typing import Optional, Callable
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class PineconeRetryHandler:
"""Robust error handling for Pinecone operations."""
def __init__(self, max_retries: int = 3, base_delay: float = 1.0):
self.max_retries = max_retries
self.base_delay = base_delay
def retry_with_backoff(
self,
operation: Callable,
*args,
**kwargs
) -> Optional[any]:
"""Retry operation with exponential backoff."""
for attempt in range(self.max_retries):
try:
return operation(*args, **kwargs)
except Exception as e:
if attempt == self.max_retries - 1:
logger.error(f"Operation failed after {self.max_retries} attempts: {e}")
raise
delay = self.base_delay * (2 ** attempt)
logger.warning(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay}s...")
time.sleep(delay)
return NoneUsage
Usage
retry_handler = PineconeRetryHandler(max_retries=3)
retry_handler = PineconeRetryHandler(max_retries=3)
Upsert with retry
Upsert with retry
def safe_upsert(vectors, namespace="documents"):
return retry_handler.retry_with_backoff(
index.upsert,
vectors=vectors,
namespace=namespace
)
def safe_upsert(vectors, namespace="documents"):
return retry_handler.retry_with_backoff(
index.upsert,
vectors=vectors,
namespace=namespace
)
Query with retry
Query with retry
def safe_query(vector, top_k=10, **kwargs):
return retry_handler.retry_with_backoff(
index.query,
vector=vector,
top_k=top_k,
**kwargs
)
def safe_query(vector, top_k=10, **kwargs):
return retry_handler.retry_with_backoff(
index.query,
vector=vector,
top_k=top_k,
**kwargs
)
Example
Example
try:
results = safe_query(query_vector, top_k=10, include_metadata=True)
except Exception as e:
logger.error(f"Query failed permanently: {e}")
undefinedtry:
results = safe_query(query_vector, top_k=10, include_metadata=True)
except Exception as e:
logger.error(f"Query failed permanently: {e}")
undefinedMonitoring & Observability
监控与可观测性
python
import time
from dataclasses import dataclass
from typing import Dict, List
from datetime import datetime
@dataclass
class QueryMetrics:
"""Track query performance metrics."""
query_time: float
result_count: int
top_score: float
timestamp: datetime
namespace: str
filter_used: bool
class VectorDBMonitor:
"""Monitor vector database operations."""
def __init__(self):
self.metrics: List[QueryMetrics] = []
def track_query(
self,
query_func: Callable,
*args,
**kwargs
):
"""Track query execution and metrics."""
start_time = time.time()
results = query_func(*args, **kwargs)
elapsed = time.time() - start_time
metrics = QueryMetrics(
query_time=elapsed,
result_count=len(results.matches),
top_score=results.matches[0].score if results.matches else 0.0,
timestamp=datetime.now(),
namespace=kwargs.get('namespace', 'default'),
filter_used='filter' in kwargs
)
self.metrics.append(metrics)
# Alert on slow queries
if elapsed > 1.0: # 1 second threshold
logger.warning(f"Slow query detected: {elapsed:.2f}s")
return results
def get_stats(self) -> Dict:
"""Get aggregate statistics."""
if not self.metrics:
return {}
query_times = [m.query_time for m in self.metrics]
return {
"total_queries": len(self.metrics),
"avg_query_time": sum(query_times) / len(query_times),
"p95_query_time": sorted(query_times)[int(len(query_times) * 0.95)],
"p99_query_time": sorted(query_times)[int(len(query_times) * 0.99)],
"avg_results": sum(m.result_count for m in self.metrics) / len(self.metrics),
"filtered_queries_pct": sum(1 for m in self.metrics if m.filter_used) / len(self.metrics) * 100
}python
import time
from dataclasses import dataclass
from typing import Dict, List
from datetime import datetime
@dataclass
class QueryMetrics:
"""Track query performance metrics."""
query_time: float
result_count: int
top_score: float
timestamp: datetime
namespace: str
filter_used: bool
class VectorDBMonitor:
"""Monitor vector database operations."""
def __init__(self):
self.metrics: List[QueryMetrics] = []
def track_query(
self,
query_func: Callable,
*args,
**kwargs
):
"""Track query execution and metrics."""
start_time = time.time()
results = query_func(*args, **kwargs)
elapsed = time.time() - start_time
metrics = QueryMetrics(
query_time=elapsed,
result_count=len(results.matches),
top_score=results.matches[0].score if results.matches else 0.0,
timestamp=datetime.now(),
namespace=kwargs.get('namespace', 'default'),
filter_used='filter' in kwargs
)
self.metrics.append(metrics)
# Alert on slow queries
if elapsed > 1.0: # 1 second threshold
logger.warning(f"Slow query detected: {elapsed:.2f}s")
return results
def get_stats(self) -> Dict:
"""Get aggregate statistics."""
if not self.metrics:
return {}
query_times = [m.query_time for m in self.metrics]
return {
"total_queries": len(self.metrics),
"avg_query_time": sum(query_times) / len(query_times),
"p95_query_time": sorted(query_times)[int(len(query_times) * 0.95)],
"p99_query_time": sorted(query_times)[int(len(query_times) * 0.99)],
"avg_results": sum(m.result_count for m in self.metrics) / len(self.metrics),
"filtered_queries_pct": sum(1 for m in self.metrics if m.filter_used) / len(self.metrics) * 100
}Usage
Usage
monitor = VectorDBMonitor()
monitor = VectorDBMonitor()
Wrap queries
Wrap queries
results = monitor.track_query(
index.query,
vector=query_vector,
top_k=10,
namespace="documents",
filter={"category": "education"}
)
results = monitor.track_query(
index.query,
vector=query_vector,
top_k=10,
namespace="documents",
filter={"category": "education"}
)
Get statistics
Get statistics
stats = monitor.get_stats()
print(f"Average query time: {stats['avg_query_time']:.3f}s")
print(f"P95 query time: {stats['p95_query_time']:.3f}s")
undefinedstats = monitor.get_stats()
print(f"Average query time: {stats['avg_query_time']:.3f}s")
print(f"P95 query time: {stats['p95_query_time']:.3f}s")
undefinedData Validation
数据验证
python
from typing import List, Dict
import numpy as np
class VectorValidator:
"""Validate vectors and metadata before operations."""
def __init__(self, expected_dimension: int):
self.expected_dimension = expected_dimension
def validate_vector(self, vector: List[float]) -> tuple[bool, str]:
"""Validate vector format and content."""
# Check type
if not isinstance(vector, (list, np.ndarray)):
return False, "Vector must be list or numpy array"
# Check dimension
if len(vector) != self.expected_dimension:
return False, f"Expected {self.expected_dimension} dimensions, got {len(vector)}"
# Check for NaN or Inf
if any(not np.isfinite(v) for v in vector):
return False, "Vector contains NaN or Inf values"
# Check for zero vector
if all(v == 0 for v in vector):
return False, "Zero vector not allowed"
return True, "Valid"
def validate_metadata(self, metadata: Dict) -> tuple[bool, str]:
"""Validate metadata format."""
# Check type
if not isinstance(metadata, dict):
return False, "Metadata must be dictionary"
# Check metadata size (Pinecone limit: 40KB)
metadata_str = str(metadata)
if len(metadata_str.encode('utf-8')) > 40_000:
return False, "Metadata exceeds 40KB limit"
# Check for required fields (customize as needed)
required_fields = ["title", "category"]
for field in required_fields:
if field not in metadata:
return False, f"Missing required field: {field}"
return True, "Valid"
def validate_batch(
self,
vectors: List[Dict]
) -> tuple[List[Dict], List[str]]:
"""Validate batch of vectors, return valid ones and errors."""
valid_vectors = []
errors = []
for i, item in enumerate(vectors):
# Validate vector
is_valid, error = self.validate_vector(item.get('values', []))
if not is_valid:
errors.append(f"Vector {i} ({item.get('id', 'unknown')}): {error}")
continue
# Validate metadata
if 'metadata' in item:
is_valid, error = self.validate_metadata(item['metadata'])
if not is_valid:
errors.append(f"Metadata {i} ({item.get('id', 'unknown')}): {error}")
continue
valid_vectors.append(item)
return valid_vectors, errorspython
from typing import List, Dict
import numpy as np
class VectorValidator:
"""Validate vectors and metadata before operations."""
def __init__(self, expected_dimension: int):
self.expected_dimension = expected_dimension
def validate_vector(self, vector: List[float]) -> tuple[bool, str]:
"""Validate vector format and content."""
# Check type
if not isinstance(vector, (list, np.ndarray)):
return False, "Vector must be list or numpy array"
# Check dimension
if len(vector) != self.expected_dimension:
return False, f"Expected {self.expected_dimension} dimensions, got {len(vector)}"
# Check for NaN or Inf
if any(not np.isfinite(v) for v in vector):
return False, "Vector contains NaN or Inf values"
# Check for zero vector
if all(v == 0 for v in vector):
return False, "Zero vector not allowed"
return True, "Valid"
def validate_metadata(self, metadata: Dict) -> tuple[bool, str]:
"""Validate metadata format."""
# Check type
if not isinstance(metadata, dict):
return False, "Metadata must be dictionary"
# Check metadata size (Pinecone limit: 40KB)
metadata_str = str(metadata)
if len(metadata_str.encode('utf-8')) > 40_000:
return False, "Metadata exceeds 40KB limit"
# Check for required fields (customize as needed)
required_fields = ["title", "category"]
for field in required_fields:
if field not in metadata:
return False, f"Missing required field: {field}"
return True, "Valid"
def validate_batch(
self,
vectors: List[Dict]
) -> tuple[List[Dict], List[str]]:
"""Validate batch of vectors, return valid ones and errors."""
valid_vectors = []
errors = []
for i, item in enumerate(vectors):
# Validate vector
is_valid, error = self.validate_vector(item.get('values', []))
if not is_valid:
errors.append(f"Vector {i} ({item.get('id', 'unknown')}): {error}")
continue
# Validate metadata
if 'metadata' in item:
is_valid, error = self.validate_metadata(item['metadata'])
if not is_valid:
errors.append(f"Metadata {i} ({item.get('id', 'unknown')}): {error}")
continue
valid_vectors.append(item)
return valid_vectors, errorsUsage
Usage
validator = VectorValidator(expected_dimension=1536)
validator = VectorValidator(expected_dimension=1536)
Validate single vector
Validate single vector
is_valid, error = validator.validate_vector(embedding)
if not is_valid:
print(f"Invalid vector: {error}")
is_valid, error = validator.validate_vector(embedding)
if not is_valid:
print(f"Invalid vector: {error}")
Validate batch
Validate batch
valid_vectors, errors = validator.validate_batch(vectors_to_upsert)
if errors:
for error in errors:
logger.error(error)
valid_vectors, errors = validator.validate_batch(vectors_to_upsert)
if errors:
for error in errors:
logger.error(error)
Upsert only valid vectors
Upsert only valid vectors
if valid_vectors:
index.upsert(vectors=valid_vectors)
undefinedif valid_vectors:
index.upsert(vectors=valid_vectors)
undefinedBackup & Disaster Recovery
备份与灾难恢复
python
import json
import gzip
from datetime import datetime
from pathlib import Path
class VectorDBBackup:
"""Backup and restore vector database data."""
def __init__(self, index, backup_dir: str = "./backups"):
self.index = index
self.backup_dir = Path(backup_dir)
self.backup_dir.mkdir(exist_ok=True)
def backup_namespace(
self,
namespace: str = "documents",
compress: bool = True
) -> str:
"""Backup all vectors in a namespace."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"backup_{namespace}_{timestamp}.json"
if compress:
filename += ".gz"
filepath = self.backup_dir / filename
# Fetch all vectors (in batches)
all_vectors = []
batch_size = 100
# Get all IDs first (would need to be tracked separately)
# This is a simplified example
stats = self.index.describe_index_stats()
# For actual implementation, you'd need to track IDs
# or use fetch with known IDs
# Save to file
data = {
"namespace": namespace,
"timestamp": timestamp,
"vector_count": len(all_vectors),
"vectors": all_vectors
}
if compress:
with gzip.open(filepath, 'wt', encoding='utf-8') as f:
json.dump(data, f)
else:
with open(filepath, 'w') as f:
json.dump(data, f, indent=2)
logger.info(f"Backed up {len(all_vectors)} vectors to {filepath}")
return str(filepath)
def restore_from_backup(
self,
backup_file: str,
target_namespace: str = None
):
"""Restore vectors from backup file."""
filepath = Path(backup_file)
# Load backup
if filepath.suffix == '.gz':
with gzip.open(filepath, 'rt', encoding='utf-8') as f:
data = json.load(f)
else:
with open(filepath, 'r') as f:
data = json.load(f)
namespace = target_namespace or data['namespace']
vectors = data['vectors']
# Restore in batches
batch_size = 100
for i in range(0, len(vectors), batch_size):
batch = vectors[i:i + batch_size]
self.index.upsert(vectors=batch, namespace=namespace)
logger.info(f"Restored {len(batch)} vectors")
logger.info(f"Restored {len(vectors)} vectors to namespace '{namespace}'")python
import json
import gzip
from datetime import datetime
from pathlib import Path
class VectorDBBackup:
"""Backup and restore vector database data."""
def __init__(self, index, backup_dir: str = "./backups"):
self.index = index
self.backup_dir = Path(backup_dir)
self.backup_dir.mkdir(exist_ok=True)
def backup_namespace(
self,
namespace: str = "documents",
compress: bool = True
) -> str:
"""Backup all vectors in a namespace."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"backup_{namespace}_{timestamp}.json"
if compress:
filename += ".gz"
filepath = self.backup_dir / filename
# Fetch all vectors (in batches)
all_vectors = []
batch_size = 100
# Get all IDs first (would need to be tracked separately)
# This is a simplified example
stats = self.index.describe_index_stats()
# For actual implementation, you'd need to track IDs
# or use fetch with known IDs
# Save to file
data = {
"namespace": namespace,
"timestamp": timestamp,
"vector_count": len(all_vectors),
"vectors": all_vectors
}
if compress:
with gzip.open(filepath, 'wt', encoding='utf-8') as f:
json.dump(data, f)
else:
with open(filepath, 'w') as f:
json.dump(data, f, indent=2)
logger.info(f"Backed up {len(all_vectors)} vectors to {filepath}")
return str(filepath)
def restore_from_backup(
self,
backup_file: str,
target_namespace: str = None
):
"""Restore vectors from backup file."""
filepath = Path(backup_file)
# Load backup
if filepath.suffix == '.gz':
with gzip.open(filepath, 'rt', encoding='utf-8') as f:
data = json.load(f)
else:
with open(filepath, 'r') as f:
data = json.load(f)
namespace = target_namespace or data['namespace']
vectors = data['vectors']
# Restore in batches
batch_size = 100
for i in range(0, len(vectors), batch_size):
batch = vectors[i:i + batch_size]
self.index.upsert(vectors=batch, namespace=namespace)
logger.info(f"Restored {len(batch)} vectors")
logger.info(f"Restored {len(vectors)} vectors to namespace '{namespace}'")Usage
Usage
backup_manager = VectorDBBackup(index)
backup_manager = VectorDBBackup(index)
Backup
Backup
backup_file = backup_manager.backup_namespace("production")
backup_file = backup_manager.backup_namespace("production")
Restore
Restore
backup_manager.restore_from_backup(backup_file, target_namespace="production-restored")
undefinedbackup_manager.restore_from_backup(backup_file, target_namespace="production-restored")
undefinedCost Optimization
成本优化
Storage Optimization
存储优化
python
undefinedpython
undefined1. Reduce metadata size
1. Reduce metadata size
Bad: Storing full content in metadata
Bad: Storing full content in metadata
bad_metadata = {
"title": "Long document title",
"full_content": "...<entire document>...", # Wastes space
"description": "...<long description>...",
"extra_field_1": "...",
"extra_field_2": "..."
}
bad_metadata = {
"title": "Long document title",
"full_content": "...<entire document>...", # Wastes space
"description": "...<long description>...",
"extra_field_1": "...",
"extra_field_2": "..."
}
Good: Store only necessary metadata
Good: Store only necessary metadata
good_metadata = {
"title": "Long document title",
"doc_id": "doc-123", # Reference to external store
"category": "education",
"created_at": "2024-01-15"
}
good_metadata = {
"title": "Long document title",
"doc_id": "doc-123", # Reference to external store
"category": "education",
"created_at": "2024-01-15"
}
2. Use selective metadata indexing
2. Use selective metadata indexing
Only index fields you'll filter on
Only index fields you'll filter on
pc.create_index(
name="optimized-index",
dimension=1536,
metric="cosine",
spec=ServerlessSpec(
cloud="aws",
region="us-east-1",
schema={
"fields": {
"category": {"filterable": True}, # Need to filter
"created_at": {"filterable": True}, # Need to filter
"title": {"filterable": False}, # Just for display
"description": {"filterable": False} # Just for display
}
}
)
)
pc.create_index(
name="optimized-index",
dimension=1536,
metric="cosine",
spec=ServerlessSpec(
cloud="aws",
region="us-east-1",
schema={
"fields": {
"category": {"filterable": True}, # Need to filter
"created_at": {"filterable": True}, # Need to filter
"title": {"filterable": False}, # Just for display
"description": {"filterable": False} # Just for display
}
}
)
)
3. Regular cleanup of unused vectors
3. Regular cleanup of unused vectors
def cleanup_old_vectors(days_old: int = 90):
"""Delete vectors older than specified days."""
from datetime import datetime, timedelta
cutoff_date = (datetime.now() - timedelta(days=days_old)).isoformat()
# Delete by filter
index.delete(
filter={"created_at": {"$lt": cutoff_date}},
namespace="documents"
)def cleanup_old_vectors(days_old: int = 90):
"""Delete vectors older than specified days."""
from datetime import datetime, timedelta
cutoff_date = (datetime.now() - timedelta(days=days_old)).isoformat()
# Delete by filter
index.delete(
filter={"created_at": {"$lt": cutoff_date}},
namespace="documents"
)4. Compress dimensions for smaller models
4. Compress dimensions for smaller models
text-embedding-3-small: 1536 dimensions
text-embedding-3-small: 1536 dimensions
all-MiniLM-L6-v2: 384 dimensions (75% storage reduction)
all-MiniLM-L6-v2: 384 dimensions (75% storage reduction)
Trade-off: slightly lower accuracy for significant cost savings
Trade-off: slightly lower accuracy for significant cost savings
undefinedundefinedQuery Cost Optimization
查询成本优化
python
undefinedpython
undefined1. Batch queries instead of individual
1. Batch queries instead of individual
Bad: Multiple individual queries
Bad: Multiple individual queries
for query in queries:
results = index.query(vector=query, top_k=10) # N API calls
for query in queries:
results = index.query(vector=query, top_k=10) # N API calls
Good: Single batch query
Good: Single batch query
results = index.query(
queries=query_vectors, # 1 API call
top_k=10
)
results = index.query(
queries=query_vectors, # 1 API call
top_k=10
)
2. Use appropriate top_k
2. Use appropriate top_k
Larger top_k = more expensive
Larger top_k = more expensive
results = index.query(
vector=query_vector,
top_k=10, # Usually sufficient
# top_k=1000 # Much more expensive
)
results = index.query(
vector=query_vector,
top_k=10, # Usually sufficient
# top_k=1000 # Much more expensive
)
3. Minimize data transfer
3. Minimize data transfer
results = index.query(
vector=query_vector,
top_k=10,
include_values=False, # Save bandwidth
include_metadata=False # Save bandwidth if not needed
)
results = index.query(
vector=query_vector,
top_k=10,
include_values=False, # Save bandwidth
include_metadata=False # Save bandwidth if not needed
)
4. Use caching for repeated queries
4. Use caching for repeated queries
from functools import lru_cache
@lru_cache(maxsize=1000)
def cached_search(query_text: str, top_k: int = 10):
"""Cache search results for identical queries."""
embedding = generate_embedding(query_text)
results = index.query(
vector=embedding,
top_k=top_k,
include_metadata=True
)
return results
from functools import lru_cache
@lru_cache(maxsize=1000)
def cached_search(query_text: str, top_k: int = 10):
"""Cache search results for identical queries."""
embedding = generate_embedding(query_text)
results = index.query(
vector=embedding,
top_k=top_k,
include_metadata=True
)
return results
5. Choose serverless vs pods appropriately
5. Choose serverless vs pods appropriately
Serverless: Low/variable traffic (pay per query)
Serverless: Low/variable traffic (pay per query)
Pods: High consistent traffic (fixed cost)
Pods: High consistent traffic (fixed cost)
def choose_deployment_type(
queries_per_month: int,
avg_response_time_requirement: float = 100 # ms
) -> str:
"""Recommend deployment type based on usage."""
# Rough cost calculations (update with current pricing)
serverless_cost_per_query = 0.0001 # Example
pod_cost_per_month = 70 # p1.x1 pod
serverless_monthly_cost = queries_per_month * serverless_cost_per_query
if serverless_monthly_cost < pod_cost_per_month:
return "serverless"
else:
return "pods"undefineddef choose_deployment_type(
queries_per_month: int,
avg_response_time_requirement: float = 100 # ms
) -> str:
"""Recommend deployment type based on usage."""
# Rough cost calculations (update with current pricing)
serverless_cost_per_query = 0.0001 # Example
pod_cost_per_month = 70 # p1.x1 pod
serverless_monthly_cost = queries_per_month * serverless_cost_per_query
if serverless_monthly_cost < pod_cost_per_month:
return "serverless"
else:
return "pods"undefinedCost Monitoring
成本监控
python
import json
from datetime import datetime, timedelta
from collections import defaultdict
class CostMonitor:
"""Monitor and estimate vector database costs."""
def __init__(self):
self.operations = defaultdict(int)
self.pricing = {
"serverless_write_units": 0.0000025, # per write unit
"serverless_read_units": 0.00000625, # per read unit
"serverless_storage_gb": 0.095, # per GB per month
"p1_x1_pod": 0.096, # per hour
"p2_x1_pod": 0.240, # per hour
}
def track_operation(self, operation_type: str, units: int = 1):
"""Track database operations."""
self.operations[operation_type] += units
def estimate_monthly_cost(
self,
deployment_type: str,
storage_gb: float = 0,
pod_type: str = None
) -> Dict:
"""Estimate monthly costs."""
costs = {}
if deployment_type == "serverless":
# Storage cost
storage_cost = storage_gb * self.pricing["serverless_storage_gb"]
# Operation costs
write_cost = (
self.operations["upsert"] *
self.pricing["serverless_write_units"]
)
read_cost = (
self.operations["query"] *
self.pricing["serverless_read_units"]
)
costs = {
"storage": storage_cost,
"writes": write_cost,
"reads": read_cost,
"total": storage_cost + write_cost + read_cost
}
elif deployment_type == "pods":
# Fixed pod cost
hours_per_month = 730
pod_cost = self.pricing.get(f"{pod_type}_pod", 0) * hours_per_month
costs = {
"pod": pod_cost,
"total": pod_cost
}
return costs
def get_cost_report(self) -> str:
"""Generate cost report."""
report = f"\n{'=' * 50}\n"
report += "VECTOR DATABASE COST REPORT\n"
report += f"{'=' * 50}\n\n"
report += "Operations Summary:\n"
for operation, count in self.operations.items():
report += f" {operation}: {count:,}\n"
report += f"\n{'=' * 50}\n"
return reportpython
import json
from datetime import datetime, timedelta
from collections import defaultdict
class CostMonitor:
"""Monitor and estimate vector database costs."""
def __init__(self):
self.operations = defaultdict(int)
self.pricing = {
"serverless_write_units": 0.0000025, # per write unit
"serverless_read_units": 0.00000625, # per read unit
"serverless_storage_gb": 0.095, # per GB per month
"p1_x1_pod": 0.096, # per hour
"p2_x1_pod": 0.240, # per hour
}
def track_operation(self, operation_type: str, units: int = 1):
"""Track database operations."""
self.operations[operation_type] += units
def estimate_monthly_cost(
self,
deployment_type: str,
storage_gb: float = 0,
pod_type: str = None
) -> Dict:
"""Estimate monthly costs."""
costs = {}
if deployment_type == "serverless":
# Storage cost
storage_cost = storage_gb * self.pricing["serverless_storage_gb"]
# Operation costs
write_cost = (
self.operations["upsert"] *
self.pricing["serverless_write_units"]
)
read_cost = (
self.operations["query"] *
self.pricing["serverless_read_units"]
)
costs = {
"storage": storage_cost,
"writes": write_cost,
"reads": read_cost,
"total": storage_cost + write_cost + read_cost
}
elif deployment_type == "pods":
# Fixed pod cost
hours_per_month = 730
pod_cost = self.pricing.get(f"{pod_type}_pod", 0) * hours_per_month
costs = {
"pod": pod_cost,
"total": pod_cost
}
return costs
def get_cost_report(self) -> str:
"""Generate cost report."""
report = f"\n{'=' * 50}\n"
report += "VECTOR DATABASE COST REPORT\n"
report += f"{'=' * 50}\n\n"
report += "Operations Summary:\n"
for operation, count in self.operations.items():
report += f" {operation}: {count:,}\n"
report += f"\n{'=' * 50}\n"
return reportUsage
Usage
cost_monitor = CostMonitor()
cost_monitor = CostMonitor()
Track operations
Track operations
def monitored_upsert(vectors, **kwargs):
cost_monitor.track_operation("upsert", len(vectors))
return index.upsert(vectors=vectors, **kwargs)
def monitored_query(vector, **kwargs):
cost_monitor.track_operation("query", 1)
return index.query(vector=vector, **kwargs)
def monitored_upsert(vectors, **kwargs):
cost_monitor.track_operation("upsert", len(vectors))
return index.upsert(vectors=vectors, **kwargs)
def monitored_query(vector, **kwargs):
cost_monitor.track_operation("query", 1)
return index.query(vector=vector, **kwargs)
Get cost estimate
Get cost estimate
monthly_cost = cost_monitor.estimate_monthly_cost(
deployment_type="serverless",
storage_gb=10.5
)
print(f"Estimated monthly cost: ${monthly_cost['total']:.2f}")
print(cost_monitor.get_cost_report())
---monthly_cost = cost_monitor.estimate_monthly_cost(
deployment_type="serverless",
storage_gb=10.5
)
print(f"Estimated monthly cost: ${monthly_cost['total']:.2f}")
print(cost_monitor.get_cost_report())
---Summary
总结
This comprehensive guide covers all aspects of vector database management across Pinecone, Weaviate, and Chroma. Key takeaways:
- Choose the right database: Pinecone for production scale, Weaviate for knowledge graphs, Chroma for local development
- Optimize embeddings: Balance dimension size with accuracy and cost
- Use metadata filtering: Combine vector similarity with structured filtering for powerful search
- Implement hybrid search: Combine dense and sparse vectors for best results
- Scale efficiently: Use batching, caching, and appropriate index configurations
- Monitor and optimize costs: Track usage and choose the right deployment type
For more information:
- Pinecone Documentation: https://docs.pinecone.io
- Weaviate Documentation: https://weaviate.io/developers/weaviate
- Chroma Documentation: https://docs.trychroma.com
这份综合指南涵盖了Pinecone、Weaviate和Chroma向量数据库管理的所有方面。核心要点:
- 选择合适的数据库:Pinecone适用于生产级规模,Weaviate适用于知识图谱,Chroma适用于本地开发
- 优化向量嵌入:在维度大小、准确性和成本之间取得平衡
- 使用元数据过滤:将向量相似度与结构化过滤相结合,实现强大的搜索功能
- 实现混合搜索:结合稠密和稀疏向量以获得最佳结果
- 高效扩容:使用批量处理、缓存和合适的索引配置
- 监控并优化成本:跟踪使用情况并选择合适的部署类型
更多信息:
- Pinecone 文档:https://docs.pinecone.io
- Weaviate 文档:https://weaviate.io/developers/weaviate
- Chroma 文档:https://docs.trychroma.com