vector-database-management

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Vector Database Management

向量数据库管理

Table of Contents

目录

Introduction

简介

Vector databases are specialized systems designed to store, index, and query high-dimensional vector embeddings efficiently. They power modern AI applications including semantic search, recommendation systems, RAG (Retrieval Augmented Generation), and similarity-based matching.
向量数据库是专门设计用于高效存储、索引和查询高维向量嵌入的系统,可为语义搜索、推荐系统、RAG(检索增强生成)和基于相似度的匹配等现代AI应用提供支持。

Key Concepts

核心概念

  • Vector Embeddings: Numerical representations of data (text, images, audio) in high-dimensional space
  • Similarity Search: Finding vectors that are "close" to a query vector using distance metrics
  • Metadata Filtering: Combining vector similarity with structured data filtering
  • Indexing: Optimization structures (HNSW, IVF, etc.) for fast approximate nearest neighbor search
  • 向量嵌入:将数据(文本、图像、音频)转换为高维空间中的数值表示
  • 相似度搜索:使用距离度量找到与查询向量“相近”的向量
  • 元数据过滤:将向量相似度与结构化数据过滤相结合
  • 索引:用于快速近似最近邻搜索的优化结构(如HNSW、IVF等)

Database Comparison

数据库对比

FeaturePineconeWeaviateChroma
DeploymentFully managedManaged or self-hostedSelf-hosted or cloud
Index TypesServerless, PodsHNSWHNSW
Metadata FilteringAdvancedGraphQL-basedSimple
Hybrid SearchSparse-DenseBuilt-inLimited
ScaleMassiveLargeSmall-Medium
Best ForProduction RAGKnowledge graphsLocal development
特性PineconeWeaviateChroma
部署方式全托管托管或自托管自托管或云端
索引类型Serverless、PodsHNSWHNSW
元数据过滤高级基于GraphQL基础
混合搜索稀疏-稠密内置有限
扩容能力大规模大型中小型
最佳适用场景生产环境RAG知识图谱本地开发

Vector Embeddings Fundamentals

向量嵌入基础

Understanding Vector Representations

理解向量表示

Vector embeddings transform unstructured data into numerical arrays that capture semantic meaning:
python
undefined
向量嵌入可将非结构化数据转换为能捕获语义信息的数值数组:
python
undefined

Text to embeddings using OpenAI

Text to embeddings using OpenAI

from openai import OpenAI
client = OpenAI(api_key="YOUR_API_KEY")
def generate_embedding(text: str, model: str = "text-embedding-3-small") -> list[float]: """Generate embeddings from text using OpenAI.""" response = client.embeddings.create( input=text, model=model ) return response.data[0].embedding
from openai import OpenAI
client = OpenAI(api_key="YOUR_API_KEY")
def generate_embedding(text: str, model: str = "text-embedding-3-small") -> list[float]: """Generate embeddings from text using OpenAI.""" response = client.embeddings.create( input=text, model=model ) return response.data[0].embedding

Example usage

Example usage

text = "Vector databases enable semantic search capabilities" embedding = generate_embedding(text) print(f"Embedding dimension: {len(embedding)}") # 1536 dimensions print(f"First 5 values: {embedding[:5]}")
undefined
text = "Vector databases enable semantic search capabilities" embedding = generate_embedding(text) print(f"Embedding dimension: {len(embedding)}") # 1536 dimensions print(f"First 5 values: {embedding[:5]}")
undefined

Popular Embedding Models

热门嵌入模型

python
undefined
python
undefined

1. OpenAI Embeddings (Production-grade)

1. OpenAI Embeddings (Production-grade)

from openai import OpenAI
def openai_embeddings(texts: list[str]) -> list[list[float]]: """Batch generate OpenAI embeddings.""" client = OpenAI(api_key="YOUR_API_KEY") response = client.embeddings.create( input=texts, model="text-embedding-3-large" # 3072 dimensions ) return [item.embedding for item in response.data]
from openai import OpenAI
def openai_embeddings(texts: list[str]) -> list[list[float]]: """Batch generate OpenAI embeddings.""" client = OpenAI(api_key="YOUR_API_KEY") response = client.embeddings.create( input=texts, model="text-embedding-3-large" # 3072 dimensions ) return [item.embedding for item in response.data]

2. Sentence Transformers (Open-source)

2. Sentence Transformers (Open-source)

from sentence_transformers import SentenceTransformer
def sentence_transformer_embeddings(texts: list[str]) -> list[list[float]]: """Generate embeddings using Sentence Transformers.""" model = SentenceTransformer('all-MiniLM-L6-v2') # 384 dimensions embeddings = model.encode(texts) return embeddings.tolist()
from sentence_transformers import SentenceTransformer
def sentence_transformer_embeddings(texts: list[str]) -> list[list[float]]: """Generate embeddings using Sentence Transformers.""" model = SentenceTransformer('all-MiniLM-L6-v2') # 384 dimensions embeddings = model.encode(texts) return embeddings.tolist()

3. Cohere Embeddings

3. Cohere Embeddings

import cohere
def cohere_embeddings(texts: list[str]) -> list[list[float]]: """Generate embeddings using Cohere.""" co = cohere.Client("YOUR_API_KEY") response = co.embed( texts=texts, model="embed-english-v3.0", input_type="search_document" ) return response.embeddings
undefined
import cohere
def cohere_embeddings(texts: list[str]) -> list[list[float]]: """Generate embeddings using Cohere.""" co = cohere.Client("YOUR_API_KEY") response = co.embed( texts=texts, model="embed-english-v3.0", input_type="search_document" ) return response.embeddings
undefined

Embedding Dimensions & Trade-offs

嵌入维度与权衡

python
undefined
python
undefined

Different embedding models for different use cases

Different embedding models for different use cases

EMBEDDING_CONFIGS = { "openai-small": { "model": "text-embedding-3-small", "dimensions": 1536, "cost_per_1m": 0.02, "use_case": "General purpose, cost-effective" }, "openai-large": { "model": "text-embedding-3-large", "dimensions": 3072, "cost_per_1m": 0.13, "use_case": "High accuracy requirements" }, "sentence-transformers": { "model": "all-MiniLM-L6-v2", "dimensions": 384, "cost_per_1m": 0.00, # Open-source "use_case": "Local development, privacy-sensitive" }, "cohere-multilingual": { "model": "embed-multilingual-v3.0", "dimensions": 1024, "cost_per_1m": 0.10, "use_case": "Multi-language applications" } }
undefined
EMBEDDING_CONFIGS = { "openai-small": { "model": "text-embedding-3-small", "dimensions": 1536, "cost_per_1m": 0.02, "use_case": "General purpose, cost-effective" }, "openai-large": { "model": "text-embedding-3-large", "dimensions": 3072, "cost_per_1m": 0.13, "use_case": "High accuracy requirements" }, "sentence-transformers": { "model": "all-MiniLM-L6-v2", "dimensions": 384, "cost_per_1m": 0.00, # Open-source "use_case": "Local development, privacy-sensitive" }, "cohere-multilingual": { "model": "embed-multilingual-v3.0", "dimensions": 1024, "cost_per_1m": 0.10, "use_case": "Multi-language applications" } }
undefined

Database Setup & Configuration

数据库设置与配置

Pinecone Setup

Pinecone 设置

python
undefined
python
undefined

Install Pinecone SDK

Install Pinecone SDK

pip install pinecone-client

pip install pinecone-client

from pinecone import Pinecone, ServerlessSpec
from pinecone import Pinecone, ServerlessSpec

Initialize Pinecone client

Initialize Pinecone client

pc = Pinecone(api_key="YOUR_API_KEY")
pc = Pinecone(api_key="YOUR_API_KEY")

List existing indexes

List existing indexes

indexes = pc.list_indexes() print(f"Existing indexes: {[idx.name for idx in indexes]}")
indexes = pc.list_indexes() print(f"Existing indexes: {[idx.name for idx in indexes]}")

Create serverless index (recommended for production)

Create serverless index (recommended for production)

index_name = "production-search"
if index_name not in [idx.name for idx in pc.list_indexes()]: pc.create_index( name=index_name, dimension=1536, # Match your embedding model metric="cosine", # cosine, dotproduct, or euclidean spec=ServerlessSpec( cloud="aws", region="us-east-1" ), deletion_protection="enabled", # Prevent accidental deletion tags={ "environment": "production", "team": "ml", "project": "semantic-search" } ) print(f"Created index: {index_name}")
index_name = "production-search"
if index_name not in [idx.name for idx in pc.list_indexes()]: pc.create_index( name=index_name, dimension=1536, # Match your embedding model metric="cosine", # cosine, dotproduct, or euclidean spec=ServerlessSpec( cloud="aws", region="us-east-1" ), deletion_protection="enabled", # Prevent accidental deletion tags={ "environment": "production", "team": "ml", "project": "semantic-search" } ) print(f"Created index: {index_name}")

Connect to index

Connect to index

index = pc.Index(index_name)
index = pc.Index(index_name)

Get index stats

Get index stats

stats = index.describe_index_stats() print(f"Index stats: {stats}")
undefined
stats = index.describe_index_stats() print(f"Index stats: {stats}")
undefined

Selective Metadata Indexing (Pinecone)

选择性元数据索引(Pinecone)

python
undefined
python
undefined

Configure which metadata fields to index for filtering

Configure which metadata fields to index for filtering

This optimizes memory usage and query performance

This optimizes memory usage and query performance

from pinecone import Pinecone, ServerlessSpec
pc = Pinecone(api_key="YOUR_API_KEY")
from pinecone import Pinecone, ServerlessSpec
pc = Pinecone(api_key="YOUR_API_KEY")

Create index with metadata configuration

Create index with metadata configuration

pc.create_index( name="optimized-index", dimension=1536, metric="cosine", spec=ServerlessSpec( cloud="aws", region="us-east-1", schema={ "fields": { # Index these fields for filtering "document_id": {"filterable": True}, "category": {"filterable": True}, "created_at": {"filterable": True}, "tags": {"filterable": True}, # Store but don't index (saves memory) "document_title": {"filterable": False}, "document_url": {"filterable": False}, "full_content": {"filterable": False} } } ) )
pc.create_index( name="optimized-index", dimension=1536, metric="cosine", spec=ServerlessSpec( cloud="aws", region="us-east-1", schema={ "fields": { # Index these fields for filtering "document_id": {"filterable": True}, "category": {"filterable": True}, "created_at": {"filterable": True}, "tags": {"filterable": True}, # Store but don't index (saves memory) "document_title": {"filterable": False}, "document_url": {"filterable": False}, "full_content": {"filterable": False} } } ) )

This configuration allows you to:

This configuration allows you to:

1. Filter by document_id, category, created_at, tags

1. Filter by document_id, category, created_at, tags

2. Retrieve document_title, document_url, full_content in results

2. Retrieve document_title, document_url, full_content in results

3. Save memory by not indexing non-filterable fields

3. Save memory by not indexing non-filterable fields

undefined
undefined

Weaviate Setup

Weaviate 设置

python
undefined
python
undefined

Install Weaviate client

Install Weaviate client

pip install weaviate-client

pip install weaviate-client

import weaviate from weaviate.classes.config import Configure, Property, DataType
import weaviate from weaviate.classes.config import Configure, Property, DataType

Connect to Weaviate

Connect to Weaviate

client = weaviate.connect_to_local()
client = weaviate.connect_to_local()

Or connect to Weaviate Cloud

Or connect to Weaviate Cloud

client = weaviate.connect_to_wcs(

client = weaviate.connect_to_wcs(

cluster_url="YOUR_WCS_URL",

cluster_url="YOUR_WCS_URL",

auth_credentials=weaviate.auth.AuthApiKey("YOUR_API_KEY")

auth_credentials=weaviate.auth.AuthApiKey("YOUR_API_KEY")

)

)

Create collection (schema)

Create collection (schema)

try: collection = client.collections.create( name="Documents", vectorizer_config=Configure.Vectorizer.text2vec_openai( model="text-embedding-3-small" ), properties=[ Property(name="title", data_type=DataType.TEXT), Property(name="content", data_type=DataType.TEXT), Property(name="category", data_type=DataType.TEXT), Property(name="created_at", data_type=DataType.DATE), Property(name="tags", data_type=DataType.TEXT_ARRAY) ] ) print(f"Created collection: Documents") except Exception as e: print(f"Collection exists or error: {e}")
try: collection = client.collections.create( name="Documents", vectorizer_config=Configure.Vectorizer.text2vec_openai( model="text-embedding-3-small" ), properties=[ Property(name="title", data_type=DataType.TEXT), Property(name="content", data_type=DataType.TEXT), Property(name="category", data_type=DataType.TEXT), Property(name="created_at", data_type=DataType.DATE), Property(name="tags", data_type=DataType.TEXT_ARRAY) ] ) print(f"Created collection: Documents") except Exception as e: print(f"Collection exists or error: {e}")

Get collection

Get collection

documents = client.collections.get("Documents")
documents = client.collections.get("Documents")

Check collection info

Check collection info

print(documents.config.get())
undefined
print(documents.config.get())
undefined

Chroma Setup

Chroma 设置

python
undefined
python
undefined

Install Chroma

Install Chroma

pip install chromadb

pip install chromadb

import chromadb from chromadb.config import Settings
import chromadb from chromadb.config import Settings

Initialize Chroma client (persistent)

Initialize Chroma client (persistent)

client = chromadb.PersistentClient(path="./chroma_db")
client = chromadb.PersistentClient(path="./chroma_db")

Or use ephemeral (in-memory)

Or use ephemeral (in-memory)

client = chromadb.EphemeralClient()

client = chromadb.EphemeralClient()

Create or get collection

Create or get collection

collection = client.get_or_create_collection( name="documents", metadata={ "description": "Document collection for semantic search", "hnsw:space": "cosine" # cosine, l2, or ip (inner product) } )
collection = client.get_or_create_collection( name="documents", metadata={ "description": "Document collection for semantic search", "hnsw:space": "cosine" # cosine, l2, or ip (inner product) } )

List all collections

List all collections

collections = client.list_collections() print(f"Available collections: {[c.name for c in collections]}")
collections = client.list_collections() print(f"Available collections: {[c.name for c in collections]}")

Get collection info

Get collection info

print(f"Collection count: {collection.count()}")
undefined
print(f"Collection count: {collection.count()}")
undefined

Index Operations

索引操作

Creating Indexes with Different Configurations

创建不同配置的索引

python
from pinecone import Pinecone, ServerlessSpec, PodSpec

pc = Pinecone(api_key="YOUR_API_KEY")
python
from pinecone import Pinecone, ServerlessSpec, PodSpec

pc = Pinecone(api_key="YOUR_API_KEY")

1. Serverless index (auto-scaling, pay-per-use)

1. Serverless index (auto-scaling, pay-per-use)

pc.create_index( name="serverless-index", dimension=1536, metric="cosine", spec=ServerlessSpec( cloud="aws", region="us-east-1" ) )
pc.create_index( name="serverless-index", dimension=1536, metric="cosine", spec=ServerlessSpec( cloud="aws", region="us-east-1" ) )

2. Pod-based index (dedicated resources)

2. Pod-based index (dedicated resources)

pc.create_index( name="pod-index", dimension=1536, metric="dotproduct", spec=PodSpec( environment="us-east-1-aws", pod_type="p1.x1", # Performance tier pods=2, # Number of pods replicas=2, # Replicas for high availability shards=1 ) )
pc.create_index( name="pod-index", dimension=1536, metric="dotproduct", spec=PodSpec( environment="us-east-1-aws", pod_type="p1.x1", # Performance tier pods=2, # Number of pods replicas=2, # Replicas for high availability shards=1 ) )

3. Sparse index (for BM25-like search)

3. Sparse index (for BM25-like search)

pc.create_index( name="sparse-index", dimension=None, # Sparse vectors don't have fixed dimension metric="dotproduct", spec=ServerlessSpec( cloud="aws", region="us-east-1" ) )
undefined
pc.create_index( name="sparse-index", dimension=None, # Sparse vectors don't have fixed dimension metric="dotproduct", spec=ServerlessSpec( cloud="aws", region="us-east-1" ) )
undefined

Index Management Operations

索引管理操作

python
from pinecone import Pinecone

pc = Pinecone(api_key="YOUR_API_KEY")
python
from pinecone import Pinecone

pc = Pinecone(api_key="YOUR_API_KEY")

List all indexes

List all indexes

indexes = pc.list_indexes() for idx in indexes: print(f"Name: {idx.name}, Status: {idx.status.state}, Host: {idx.host}")
indexes = pc.list_indexes() for idx in indexes: print(f"Name: {idx.name}, Status: {idx.status.state}, Host: {idx.host}")

Describe specific index

Describe specific index

index_info = pc.describe_index("production-search") print(f"Dimension: {index_info.dimension}") print(f"Metric: {index_info.metric}") print(f"Status: {index_info.status}")
index_info = pc.describe_index("production-search") print(f"Dimension: {index_info.dimension}") print(f"Metric: {index_info.metric}") print(f"Status: {index_info.status}")

Connect to index

Connect to index

index = pc.Index("production-search")
index = pc.Index("production-search")

Get index statistics

Get index statistics

stats = index.describe_index_stats() print(f"Total vectors: {stats.total_vector_count}") print(f"Namespaces: {stats.namespaces}") print(f"Index fullness: {stats.index_fullness}")
stats = index.describe_index_stats() print(f"Total vectors: {stats.total_vector_count}") print(f"Namespaces: {stats.namespaces}") print(f"Index fullness: {stats.index_fullness}")

Delete index (be careful!)

Delete index (be careful!)

pc.delete_index("test-index")

pc.delete_index("test-index")

undefined
undefined

Configuring Index for Optimal Performance

配置索引以实现最佳性能

python
undefined
python
undefined

Configuration for different use cases

Configuration for different use cases

1. High-throughput search (many queries/second)

1. High-throughput search (many queries/second)

pc.create_index( name="high-throughput", dimension=1536, metric="cosine", spec=PodSpec( environment="us-east-1-aws", pod_type="p2.x1", # Higher performance tier pods=4, replicas=3 # More replicas = higher query throughput ) )
pc.create_index( name="high-throughput", dimension=1536, metric="cosine", spec=PodSpec( environment="us-east-1-aws", pod_type="p2.x1", # Higher performance tier pods=4, replicas=3 # More replicas = higher query throughput ) )

2. Large-scale storage (billions of vectors)

2. Large-scale storage (billions of vectors)

pc.create_index( name="large-scale", dimension=1536, metric="cosine", spec=PodSpec( environment="us-east-1-aws", pod_type="s1.x1", # Storage-optimized pods=8, shards=4 # More shards = more storage capacity ) )
pc.create_index( name="large-scale", dimension=1536, metric="cosine", spec=PodSpec( environment="us-east-1-aws", pod_type="s1.x1", # Storage-optimized pods=8, shards=4 # More shards = more storage capacity ) )

3. Cost-optimized development

3. Cost-optimized development

pc.create_index( name="dev-environment", dimension=1536, metric="cosine", spec=ServerlessSpec( cloud="aws", region="us-east-1" ) # Serverless = pay only for what you use )
undefined
pc.create_index( name="dev-environment", dimension=1536, metric="cosine", spec=ServerlessSpec( cloud="aws", region="us-east-1" ) # Serverless = pay only for what you use )
undefined

Vector Operations

向量操作

Upserting Vectors (Pinecone)

插入/更新向量(Pinecone)

python
from pinecone import Pinecone
import uuid

pc = Pinecone(api_key="YOUR_API_KEY")
index = pc.Index("production-search")
python
from pinecone import Pinecone
import uuid

pc = Pinecone(api_key="YOUR_API_KEY")
index = pc.Index("production-search")

1. Single vector upsert

1. Single vector upsert

vector_id = str(uuid.uuid4()) index.upsert( vectors=[ { "id": vector_id, "values": [0.1, 0.2, 0.3, ...], # 1536 dimensions "metadata": { "title": "Introduction to Vector Databases", "category": "education", "author": "John Doe", "created_at": "2024-01-15", "tags": ["ml", "ai", "databases"] } } ], namespace="documents" )
vector_id = str(uuid.uuid4()) index.upsert( vectors=[ { "id": vector_id, "values": [0.1, 0.2, 0.3, ...], # 1536 dimensions "metadata": { "title": "Introduction to Vector Databases", "category": "education", "author": "John Doe", "created_at": "2024-01-15", "tags": ["ml", "ai", "databases"] } } ], namespace="documents" )

2. Batch upsert (efficient for large datasets)

2. Batch upsert (efficient for large datasets)

batch_size = 100 vectors = []
for i, (doc_id, embedding, metadata) in enumerate(documents): vectors.append({ "id": doc_id, "values": embedding, "metadata": metadata })
# Upsert in batches
if len(vectors) >= batch_size or i == len(documents) - 1:
    index.upsert(vectors=vectors, namespace="documents")
    print(f"Upserted batch of {len(vectors)} vectors")
    vectors = []
batch_size = 100 vectors = []
for i, (doc_id, embedding, metadata) in enumerate(documents): vectors.append({ "id": doc_id, "values": embedding, "metadata": metadata })
# Upsert in batches
if len(vectors) >= batch_size or i == len(documents) - 1:
    index.upsert(vectors=vectors, namespace="documents")
    print(f"Upserted batch of {len(vectors)} vectors")
    vectors = []

3. Upsert with async for better performance

3. Upsert with async for better performance

from pinecone import Pinecone import asyncio
async def upsert_vectors_async(vectors_batch): """Async upsert for parallel processing.""" index.upsert(vectors=vectors_batch, namespace="documents", async_req=True)
from pinecone import Pinecone import asyncio
async def upsert_vectors_async(vectors_batch): """Async upsert for parallel processing.""" index.upsert(vectors=vectors_batch, namespace="documents", async_req=True)

Parallel upsert

Parallel upsert

tasks = [] for batch in batches: tasks.append(upsert_vectors_async(batch)) await asyncio.gather(*tasks)
undefined
tasks = [] for batch in batches: tasks.append(upsert_vectors_async(batch)) await asyncio.gather(*tasks)
undefined

Sparse Vector Operations (Pinecone)

稀疏向量操作(Pinecone)

python
undefined
python
undefined

Sparse vectors are useful for keyword-based search (like BM25)

Sparse vectors are useful for keyword-based search (like BM25)

Combined with dense vectors for hybrid search

Combined with dense vectors for hybrid search

from pinecone import Pinecone
pc = Pinecone(api_key="YOUR_API_KEY") index = pc.Index("hybrid-search-index")
from pinecone import Pinecone
pc = Pinecone(api_key="YOUR_API_KEY") index = pc.Index("hybrid-search-index")

Upsert vector with both dense and sparse components

Upsert vector with both dense and sparse components

index.upsert( vectors=[ { "id": "doc1", "values": [0.1, 0.2, ..., 0.5], # Dense vector "sparse_values": { "indices": [10, 45, 123, 234, 678], # Token IDs "values": [0.8, 0.6, 0.9, 0.7, 0.5] # TF-IDF weights }, "metadata": {"title": "Hybrid Search Document"} } ], namespace="hybrid" )
index.upsert( vectors=[ { "id": "doc1", "values": [0.1, 0.2, ..., 0.5], # Dense vector "sparse_values": { "indices": [10, 45, 123, 234, 678], # Token IDs "values": [0.8, 0.6, 0.9, 0.7, 0.5] # TF-IDF weights }, "metadata": {"title": "Hybrid Search Document"} } ], namespace="hybrid" )

Query with hybrid search

Query with hybrid search

results = index.query( vector=[0.1, 0.2, ..., 0.5], # Dense query vector sparse_vector={ "indices": [10, 45, 123], "values": [0.8, 0.7, 0.9] }, top_k=10, namespace="hybrid", include_metadata=True )
undefined
results = index.query( vector=[0.1, 0.2, ..., 0.5], # Dense query vector sparse_vector={ "indices": [10, 45, 123], "values": [0.8, 0.7, 0.9] }, top_k=10, namespace="hybrid", include_metadata=True )
undefined

Vector Operations (Weaviate)

向量操作(Weaviate)

python
import weaviate
from weaviate.classes.query import MetadataQuery

client = weaviate.connect_to_local()
documents = client.collections.get("Documents")
python
import weaviate
from weaviate.classes.query import MetadataQuery

client = weaviate.connect_to_local()
documents = client.collections.get("Documents")

1. Insert single object

1. Insert single object

doc_uuid = documents.data.insert( properties={ "title": "Vector Database Guide", "content": "A comprehensive guide to vector databases...", "category": "tutorial", "created_at": "2024-01-15T10:00:00Z", "tags": ["database", "ml", "ai"] } ) print(f"Inserted: {doc_uuid}")
doc_uuid = documents.data.insert( properties={ "title": "Vector Database Guide", "content": "A comprehensive guide to vector databases...", "category": "tutorial", "created_at": "2024-01-15T10:00:00Z", "tags": ["database", "ml", "ai"] } ) print(f"Inserted: {doc_uuid}")

2. Batch insert

2. Batch insert

with documents.batch.dynamic() as batch: for doc in document_list: batch.add_object( properties={ "title": doc["title"], "content": doc["content"], "category": doc["category"], "created_at": doc["created_at"], "tags": doc["tags"] } )
with documents.batch.dynamic() as batch: for doc in document_list: batch.add_object( properties={ "title": doc["title"], "content": doc["content"], "category": doc["category"], "created_at": doc["created_at"], "tags": doc["tags"] } )

3. Insert with custom vector

3. Insert with custom vector

documents.data.insert( properties={"title": "Custom Vector Doc", "content": "..."}, vector=[0.1, 0.2, 0.3, ...] # Your pre-computed vector )
documents.data.insert( properties={"title": "Custom Vector Doc", "content": "..."}, vector=[0.1, 0.2, 0.3, ...] # Your pre-computed vector )

4. Update object

4. Update object

documents.data.update( uuid=doc_uuid, properties={"title": "Updated Title"} )
documents.data.update( uuid=doc_uuid, properties={"title": "Updated Title"} )

5. Delete object

5. Delete object

documents.data.delete_by_id(uuid=doc_uuid)
undefined
documents.data.delete_by_id(uuid=doc_uuid)
undefined

Vector Operations (Chroma)

向量操作(Chroma)

python
import chromadb

client = chromadb.PersistentClient(path="./chroma_db")
collection = client.get_collection("documents")
python
import chromadb

client = chromadb.PersistentClient(path="./chroma_db")
collection = client.get_collection("documents")

1. Add documents with auto-embedding

1. Add documents with auto-embedding

collection.add( documents=[ "This is document 1", "This is document 2", "This is document 3" ], metadatas=[ {"category": "tech", "author": "Alice"}, {"category": "science", "author": "Bob"}, {"category": "tech", "author": "Charlie"} ], ids=["doc1", "doc2", "doc3"] )
collection.add( documents=[ "This is document 1", "This is document 2", "This is document 3" ], metadatas=[ {"category": "tech", "author": "Alice"}, {"category": "science", "author": "Bob"}, {"category": "tech", "author": "Charlie"} ], ids=["doc1", "doc2", "doc3"] )

2. Add with custom embeddings

2. Add with custom embeddings

collection.add( embeddings=[ [0.1, 0.2, 0.3, ...], [0.4, 0.5, 0.6, ...] ], metadatas=[ {"title": "Doc 1"}, {"title": "Doc 2"} ], ids=["custom1", "custom2"] )
collection.add( embeddings=[ [0.1, 0.2, 0.3, ...], [0.4, 0.5, 0.6, ...] ], metadatas=[ {"title": "Doc 1"}, {"title": "Doc 2"} ], ids=["custom1", "custom2"] )

3. Update documents

3. Update documents

collection.update( ids=["doc1"], documents=["Updated document content"], metadatas=[{"category": "tech", "updated": True}] )
collection.update( ids=["doc1"], documents=["Updated document content"], metadatas=[{"category": "tech", "updated": True}] )

4. Delete documents

4. Delete documents

collection.delete(ids=["doc1", "doc2"])
collection.delete(ids=["doc1", "doc2"])

5. Get documents by IDs

5. Get documents by IDs

results = collection.get( ids=["doc1", "doc2"], include=["documents", "metadatas", "embeddings"] )
undefined
results = collection.get( ids=["doc1", "doc2"], include=["documents", "metadatas", "embeddings"] )
undefined

Similarity Search

相似度搜索

Basic Similarity Search (Pinecone)

基础相似度搜索(Pinecone)

python
from pinecone import Pinecone
from openai import OpenAI
python
from pinecone import Pinecone
from openai import OpenAI

Initialize clients

Initialize clients

pc = Pinecone(api_key="PINECONE_API_KEY") openai_client = OpenAI(api_key="OPENAI_API_KEY")
index = pc.Index("production-search")
pc = Pinecone(api_key="PINECONE_API_KEY") openai_client = OpenAI(api_key="OPENAI_API_KEY")
index = pc.Index("production-search")

1. Generate query embedding

1. Generate query embedding

query_text = "What are the benefits of vector databases?" response = openai_client.embeddings.create( input=query_text, model="text-embedding-3-small" ) query_embedding = response.data[0].embedding
query_text = "What are the benefits of vector databases?" response = openai_client.embeddings.create( input=query_text, model="text-embedding-3-small" ) query_embedding = response.data[0].embedding

2. Search for similar vectors

2. Search for similar vectors

results = index.query( vector=query_embedding, top_k=10, namespace="documents", include_values=False, include_metadata=True )
results = index.query( vector=query_embedding, top_k=10, namespace="documents", include_values=False, include_metadata=True )

3. Process results

3. Process results

print(f"Found {len(results.matches)} results") for match in results.matches: print(f"ID: {match.id}") print(f"Score: {match.score:.4f}") print(f"Title: {match.metadata.get('title')}") print(f"Category: {match.metadata.get('category')}") print("---")
undefined
print(f"Found {len(results.matches)} results") for match in results.matches: print(f"ID: {match.id}") print(f"Score: {match.score:.4f}") print(f"Title: {match.metadata.get('title')}") print(f"Category: {match.metadata.get('category')}") print("---")
undefined

Search by ID (Query by Example)

按ID搜索(示例查询)

python
undefined
python
undefined

Search using an existing vector as query

Search using an existing vector as query

results = index.query( id="existing-doc-id", # Use this document as the query top_k=10, namespace="documents", include_metadata=True )
results = index.query( id="existing-doc-id", # Use this document as the query top_k=10, namespace="documents", include_metadata=True )

Useful for "find similar items" features

Useful for "find similar items" features

print(f"Documents similar to {results.matches[0].metadata.get('title')}:") for match in results.matches[1:]: # Skip first (self) print(f"- {match.metadata.get('title')} (score: {match.score:.4f})")
undefined
print(f"Documents similar to {results.matches[0].metadata.get('title')}:") for match in results.matches[1:]: # Skip first (self) print(f"- {match.metadata.get('title')} (score: {match.score:.4f})")
undefined

Multi-vector Search (Pinecone)

多向量搜索(Pinecone)

python
undefined
python
undefined

Search multiple query vectors in one request

Search multiple query vectors in one request

query_embeddings = [ [0.1, 0.2, ...], # Query 1 [0.3, 0.4, ...], # Query 2 [0.5, 0.6, ...] # Query 3 ]
results = index.query( queries=query_embeddings, top_k=5, namespace="documents", include_metadata=True )
query_embeddings = [ [0.1, 0.2, ...], # Query 1 [0.3, 0.4, ...], # Query 2 [0.5, 0.6, ...] # Query 3 ]
results = index.query( queries=query_embeddings, top_k=5, namespace="documents", include_metadata=True )

Process results for each query

Process results for each query

for i, query_results in enumerate(results): print(f"\nResults for query {i+1}:") for match in query_results.matches: print(f"- {match.metadata.get('title')} (score: {match.score:.4f})")
undefined
for i, query_results in enumerate(results): print(f"\nResults for query {i+1}:") for match in query_results.matches: print(f"- {match.metadata.get('title')} (score: {match.score:.4f})")
undefined

Similarity Search (Weaviate)

相似度搜索(Weaviate)

python
import weaviate
from weaviate.classes.query import MetadataQuery

client = weaviate.connect_to_local()
documents = client.collections.get("Documents")
python
import weaviate
from weaviate.classes.query import MetadataQuery

client = weaviate.connect_to_local()
documents = client.collections.get("Documents")

1. Near text search (semantic)

1. Near text search (semantic)

response = documents.query.near_text( query="vector database performance optimization", limit=10, return_metadata=MetadataQuery(distance=True, certainty=True) )
for obj in response.objects: print(f"Title: {obj.properties['title']}") print(f"Distance: {obj.metadata.distance:.4f}") print(f"Certainty: {obj.metadata.certainty:.4f}") print("---")
response = documents.query.near_text( query="vector database performance optimization", limit=10, return_metadata=MetadataQuery(distance=True, certainty=True) )
for obj in response.objects: print(f"Title: {obj.properties['title']}") print(f"Distance: {obj.metadata.distance:.4f}") print(f"Certainty: {obj.metadata.certainty:.4f}") print("---")

2. Near vector search (with custom embedding)

2. Near vector search (with custom embedding)

response = documents.query.near_vector( near_vector=[0.1, 0.2, 0.3, ...], limit=10 )
response = documents.query.near_vector( near_vector=[0.1, 0.2, 0.3, ...], limit=10 )

3. Near object search (find similar to existing object)

3. Near object search (find similar to existing object)

response = documents.query.near_object( near_object="uuid-of-reference-object", limit=10 )
undefined
response = documents.query.near_object( near_object="uuid-of-reference-object", limit=10 )
undefined

Similarity Search (Chroma)

相似度搜索(Chroma)

python
import chromadb

client = chromadb.PersistentClient(path="./chroma_db")
collection = client.get_collection("documents")
python
import chromadb

client = chromadb.PersistentClient(path="./chroma_db")
collection = client.get_collection("documents")

1. Query with text (auto-embedding)

1. Query with text (auto-embedding)

results = collection.query( query_texts=["What is machine learning?"], n_results=10, include=["documents", "metadatas", "distances"] )
print(f"Found {len(results['ids'][0])} results") for i, doc_id in enumerate(results['ids'][0]): print(f"ID: {doc_id}") print(f"Distance: {results['distances'][0][i]:.4f}") print(f"Document: {results['documents'][0][i][:100]}...") print(f"Metadata: {results['metadatas'][0][i]}") print("---")
results = collection.query( query_texts=["What is machine learning?"], n_results=10, include=["documents", "metadatas", "distances"] )
print(f"Found {len(results['ids'][0])} results") for i, doc_id in enumerate(results['ids'][0]): print(f"ID: {doc_id}") print(f"Distance: {results['distances'][0][i]:.4f}") print(f"Document: {results['documents'][0][i][:100]}...") print(f"Metadata: {results['metadatas'][0][i]}") print("---")

2. Query with custom embedding

2. Query with custom embedding

results = collection.query( query_embeddings=[[0.1, 0.2, 0.3, ...]], n_results=10 )
undefined
results = collection.query( query_embeddings=[[0.1, 0.2, 0.3, ...]], n_results=10 )
undefined

Metadata Filtering

元数据过滤

Pinecone Metadata Filters

Pinecone 元数据过滤器

python
from pinecone import Pinecone

pc = Pinecone(api_key="YOUR_API_KEY")
index = pc.Index("production-search")
python
from pinecone import Pinecone

pc = Pinecone(api_key="YOUR_API_KEY")
index = pc.Index("production-search")

1. Equality filter

1. Equality filter

results = index.query( vector=query_embedding, top_k=10, filter={"category": {"$eq": "education"}}, include_metadata=True )
results = index.query( vector=query_embedding, top_k=10, filter={"category": {"$eq": "education"}}, include_metadata=True )

2. Inequality filter

2. Inequality filter

results = index.query( vector=query_embedding, top_k=10, filter={"year": {"$ne": 2023}}, include_metadata=True )
results = index.query( vector=query_embedding, top_k=10, filter={"year": {"$ne": 2023}}, include_metadata=True )

3. Range filters

3. Range filters

results = index.query( vector=query_embedding, top_k=10, filter={ "$and": [ {"year": {"$gte": 2020}}, {"year": {"$lte": 2024}} ] }, include_metadata=True )
results = index.query( vector=query_embedding, top_k=10, filter={ "$and": [ {"year": {"$gte": 2020}}, {"year": {"$lte": 2024}} ] }, include_metadata=True )

4. In/Not-in filters

4. In/Not-in filters

results = index.query( vector=query_embedding, top_k=10, filter={ "category": {"$in": ["education", "tutorial", "guide"]} }, include_metadata=True )
results = index.query( vector=query_embedding, top_k=10, filter={ "category": {"$in": ["education", "tutorial", "guide"]} }, include_metadata=True )

5. Existence check

5. Existence check

results = index.query( vector=query_embedding, top_k=10, filter={"author": {"$exists": True}}, include_metadata=True )
results = index.query( vector=query_embedding, top_k=10, filter={"author": {"$exists": True}}, include_metadata=True )

6. Complex AND/OR queries

6. Complex AND/OR queries

results = index.query( vector=query_embedding, top_k=10, filter={ "$and": [ {"category": {"$eq": "education"}}, { "$or": [ {"year": {"$eq": 2024}}, {"featured": {"$eq": True}} ] }, {"tags": {"$in": ["ml", "ai"]}} ] }, include_metadata=True )
results = index.query( vector=query_embedding, top_k=10, filter={ "$and": [ {"category": {"$eq": "education"}}, { "$or": [ {"year": {"$eq": 2024}}, {"featured": {"$eq": True}} ] }, {"tags": {"$in": ["ml", "ai"]}} ] }, include_metadata=True )

7. Greater than/Less than

7. Greater than/Less than

results = index.query( vector=query_embedding, top_k=10, filter={ "view_count": {"$gt": 1000}, "rating": {"$gte": 4.5} }, include_metadata=True )
undefined
results = index.query( vector=query_embedding, top_k=10, filter={ "view_count": {"$gt": 1000}, "rating": {"$gte": 4.5} }, include_metadata=True )
undefined

Production Metadata Filter Patterns

生产环境元数据过滤模式

python
undefined
python
undefined

Pattern 1: Time-based filtering (recent content)

Pattern 1: Time-based filtering (recent content)

from datetime import datetime, timedelta
def search_recent_documents(query_text: str, days: int = 30): """Search only documents from last N days.""" cutoff_date = (datetime.now() - timedelta(days=days)).isoformat()
results = index.query(
    vector=generate_embedding(query_text),
    top_k=10,
    filter={
        "created_at": {"$gte": cutoff_date}
    },
    include_metadata=True
)
return results
from datetime import datetime, timedelta
def search_recent_documents(query_text: str, days: int = 30): """Search only documents from last N days.""" cutoff_date = (datetime.now() - timedelta(days=days)).isoformat()
results = index.query(
    vector=generate_embedding(query_text),
    top_k=10,
    filter={
        "created_at": {"$gte": cutoff_date}
    },
    include_metadata=True
)
return results

Pattern 2: User permission filtering

Pattern 2: User permission filtering

def search_with_permissions(query_text: str, user_id: str, user_roles: list): """Search only documents user has access to.""" results = index.query( vector=generate_embedding(query_text), top_k=10, filter={ "$or": [ {"owner_id": {"$eq": user_id}}, {"shared_with": {"$in": [user_id]}}, {"public": {"$eq": True}}, {"required_roles": {"$in": user_roles}} ] }, include_metadata=True ) return results
def search_with_permissions(query_text: str, user_id: str, user_roles: list): """Search only documents user has access to.""" results = index.query( vector=generate_embedding(query_text), top_k=10, filter={ "$or": [ {"owner_id": {"$eq": user_id}}, {"shared_with": {"$in": [user_id]}}, {"public": {"$eq": True}}, {"required_roles": {"$in": user_roles}} ] }, include_metadata=True ) return results

Pattern 3: Multi-tenant filtering

Pattern 3: Multi-tenant filtering

def search_tenant_documents(query_text: str, tenant_id: str, category: str = None): """Search within a specific tenant's data.""" filter_dict = {"tenant_id": {"$eq": tenant_id}}
if category:
    filter_dict["category"] = {"$eq": category}

results = index.query(
    vector=generate_embedding(query_text),
    top_k=10,
    filter=filter_dict,
    include_metadata=True
)
return results
def search_tenant_documents(query_text: str, tenant_id: str, category: str = None): """Search within a specific tenant's data.""" filter_dict = {"tenant_id": {"$eq": tenant_id}}
if category:
    filter_dict["category"] = {"$eq": category}

results = index.query(
    vector=generate_embedding(query_text),
    top_k=10,
    filter=filter_dict,
    include_metadata=True
)
return results

Pattern 4: Faceted search

Pattern 4: Faceted search

def faceted_search(query_text: str, facets: dict): """Search with multiple facet filters.""" filter_conditions = []
for field, values in facets.items():
    if isinstance(values, list):
        filter_conditions.append({field: {"$in": values}})
    else:
        filter_conditions.append({field: {"$eq": values}})

results = index.query(
    vector=generate_embedding(query_text),
    top_k=10,
    filter={"$and": filter_conditions} if filter_conditions else {},
    include_metadata=True
)
return results
def faceted_search(query_text: str, facets: dict): """Search with multiple facet filters.""" filter_conditions = []
for field, values in facets.items():
    if isinstance(values, list):
        filter_conditions.append({field: {"$in": values}})
    else:
        filter_conditions.append({field: {"$eq": values}})

results = index.query(
    vector=generate_embedding(query_text),
    top_k=10,
    filter={"$and": filter_conditions} if filter_conditions else {},
    include_metadata=True
)
return results

Usage

Usage

results = faceted_search( "machine learning tutorials", facets={ "category": ["education", "tutorial"], "difficulty": "beginner", "language": ["english", "spanish"] } )
undefined
results = faceted_search( "machine learning tutorials", facets={ "category": ["education", "tutorial"], "difficulty": "beginner", "language": ["english", "spanish"] } )
undefined

Weaviate Metadata Filtering

Weaviate 元数据过滤

python
import weaviate
from weaviate.classes.query import Filter

client = weaviate.connect_to_local()
documents = client.collections.get("Documents")
python
import weaviate
from weaviate.classes.query import Filter

client = weaviate.connect_to_local()
documents = client.collections.get("Documents")

1. Simple equality filter

1. Simple equality filter

response = documents.query.near_text( query="vector databases", limit=10, filters=Filter.by_property("category").equal("education") )
response = documents.query.near_text( query="vector databases", limit=10, filters=Filter.by_property("category").equal("education") )

2. Greater than filter

2. Greater than filter

response = documents.query.near_text( query="machine learning", limit=10, filters=Filter.by_property("year").greater_than(2020) )
response = documents.query.near_text( query="machine learning", limit=10, filters=Filter.by_property("year").greater_than(2020) )

3. Contains any filter

3. Contains any filter

response = documents.query.near_text( query="AI tutorials", limit=10, filters=Filter.by_property("tags").contains_any(["ml", "ai", "deep-learning"]) )
response = documents.query.near_text( query="AI tutorials", limit=10, filters=Filter.by_property("tags").contains_any(["ml", "ai", "deep-learning"]) )

4. Complex AND/OR filters

4. Complex AND/OR filters

response = documents.query.near_text( query="database optimization", limit=10, filters=( Filter.by_property("category").equal("tutorial") & (Filter.by_property("difficulty").equal("beginner") | Filter.by_property("featured").equal(True)) ) )
undefined
response = documents.query.near_text( query="database optimization", limit=10, filters=( Filter.by_property("category").equal("tutorial") & (Filter.by_property("difficulty").equal("beginner") | Filter.by_property("featured").equal(True)) ) )
undefined

Chroma Metadata Filtering

Chroma 元数据过滤

python
import chromadb

client = chromadb.PersistentClient(path="./chroma_db")
collection = client.get_collection("documents")
python
import chromadb

client = chromadb.PersistentClient(path="./chroma_db")
collection = client.get_collection("documents")

1. Simple equality filter

1. Simple equality filter

results = collection.query( query_texts=["vector databases"], n_results=10, where={"category": "education"} )
results = collection.query( query_texts=["vector databases"], n_results=10, where={"category": "education"} )

2. AND conditions

2. AND conditions

results = collection.query( query_texts=["machine learning"], n_results=10, where={ "$and": [ {"category": "education"}, {"difficulty": "beginner"} ] } )
results = collection.query( query_texts=["machine learning"], n_results=10, where={ "$and": [ {"category": "education"}, {"difficulty": "beginner"} ] } )

3. OR conditions

3. OR conditions

results = collection.query( query_texts=["AI tutorials"], n_results=10, where={ "$or": [ {"category": "education"}, {"category": "tutorial"} ] } )
results = collection.query( query_texts=["AI tutorials"], n_results=10, where={ "$or": [ {"category": "education"}, {"category": "tutorial"} ] } )

4. Greater than/Less than

4. Greater than/Less than

results = collection.query( query_texts=["recent content"], n_results=10, where={"year": {"$gte": 2023}} )
results = collection.query( query_texts=["recent content"], n_results=10, where={"year": {"$gte": 2023}} )

5. In operator

5. In operator

results = collection.query( query_texts=["programming guides"], n_results=10, where={"language": {"$in": ["python", "javascript", "go"]}} )
undefined
results = collection.query( query_texts=["programming guides"], n_results=10, where={"language": {"$in": ["python", "javascript", "go"]}} )
undefined

Hybrid Search

混合搜索

Pinecone Hybrid Search (Dense + Sparse)

Pinecone 混合搜索(稠密+稀疏)

python
from pinecone import Pinecone
from typing import Dict, List
import re
from collections import Counter

pc = Pinecone(api_key="YOUR_API_KEY")
index = pc.Index("hybrid-search-index")

def create_sparse_vector(text: str, top_k: int = 100) -> Dict:
    """Create sparse vector using simple TF approach."""
    # Tokenize
    tokens = re.findall(r'\w+', text.lower())

    # Calculate term frequencies
    tf = Counter(tokens)

    # Create vocabulary mapping
    vocab = {word: hash(word) % 10000 for word in set(tokens)}

    # Get top-k terms
    top_terms = tf.most_common(top_k)

    # Create sparse vector
    indices = [vocab[term] for term, _ in top_terms]
    values = [float(freq) / len(tokens) for _, freq in top_terms]

    return {
        "indices": indices,
        "values": values
    }

def hybrid_search(query_text: str, top_k: int = 10, alpha: float = 0.5):
    """
    Perform hybrid search combining dense and sparse vectors.
    alpha: weight for dense search (0.0 = sparse only, 1.0 = dense only)
    """
    # Generate dense vector
    dense_vector = generate_embedding(query_text)

    # Generate sparse vector
    sparse_vector = create_sparse_vector(query_text)

    # Hybrid query
    results = index.query(
        vector=dense_vector,
        sparse_vector=sparse_vector,
        top_k=top_k,
        include_metadata=True
    )

    return results
python
from pinecone import Pinecone
from typing import Dict, List
import re
from collections import Counter

pc = Pinecone(api_key="YOUR_API_KEY")
index = pc.Index("hybrid-search-index")

def create_sparse_vector(text: str, top_k: int = 100) -> Dict:
    """Create sparse vector using simple TF approach."""
    # Tokenize
    tokens = re.findall(r'\w+', text.lower())

    # Calculate term frequencies
    tf = Counter(tokens)

    # Create vocabulary mapping
    vocab = {word: hash(word) % 10000 for word in set(tokens)}

    # Get top-k terms
    top_terms = tf.most_common(top_k)

    # Create sparse vector
    indices = [vocab[term] for term, _ in top_terms]
    values = [float(freq) / len(tokens) for _, freq in top_terms]

    return {
        "indices": indices,
        "values": values
    }

def hybrid_search(query_text: str, top_k: int = 10, alpha: float = 0.5):
    """
    Perform hybrid search combining dense and sparse vectors.
    alpha: weight for dense search (0.0 = sparse only, 1.0 = dense only)
    """
    # Generate dense vector
    dense_vector = generate_embedding(query_text)

    # Generate sparse vector
    sparse_vector = create_sparse_vector(query_text)

    # Hybrid query
    results = index.query(
        vector=dense_vector,
        sparse_vector=sparse_vector,
        top_k=top_k,
        include_metadata=True
    )

    return results

Example usage

Example usage

results = hybrid_search("machine learning vector databases", top_k=10) for match in results.matches: print(f"{match.metadata['title']}: {match.score:.4f}")
undefined
results = hybrid_search("machine learning vector databases", top_k=10) for match in results.matches: print(f"{match.metadata['title']}: {match.score:.4f}")
undefined

Weaviate Hybrid Search

Weaviate 混合搜索

python
import weaviate

client = weaviate.connect_to_local()
documents = client.collections.get("Documents")
python
import weaviate

client = weaviate.connect_to_local()
documents = client.collections.get("Documents")

Hybrid search (combines dense vector + BM25 keyword search)

Hybrid search (combines dense vector + BM25 keyword search)

response = documents.query.hybrid( query="vector database performance", limit=10, alpha=0.5, # 0 = pure keyword, 1 = pure vector, 0.5 = balanced fusion_type="rankedFusion" # or "relativeScore" )
for obj in response.objects: print(f"Title: {obj.properties['title']}") print(f"Score: {obj.metadata.score}") print("---")
response = documents.query.hybrid( query="vector database performance", limit=10, alpha=0.5, # 0 = pure keyword, 1 = pure vector, 0.5 = balanced fusion_type="rankedFusion" # or "relativeScore" )
for obj in response.objects: print(f"Title: {obj.properties['title']}") print(f"Score: {obj.metadata.score}") print("---")

Hybrid search with filters

Hybrid search with filters

response = documents.query.hybrid( query="machine learning tutorials", limit=10, alpha=0.7, # Favor semantic search filters=Filter.by_property("category").equal("education") )
response = documents.query.hybrid( query="machine learning tutorials", limit=10, alpha=0.7, # Favor semantic search filters=Filter.by_property("category").equal("education") )

Hybrid search with custom vector

Hybrid search with custom vector

response = documents.query.hybrid( query="custom query", vector=[0.1, 0.2, 0.3, ...], # Your pre-computed vector limit=10, alpha=0.5 )
undefined
response = documents.query.hybrid( query="custom query", vector=[0.1, 0.2, 0.3, ...], # Your pre-computed vector limit=10, alpha=0.5 )
undefined

BM25 + Vector Hybrid (Custom Implementation)

BM25 + 向量混合(自定义实现)

python
from rank_bm25 import BM25Okapi
import numpy as np

class HybridSearchEngine:
    """Custom hybrid search combining BM25 and vector search."""

    def __init__(self, index, documents: List[Dict]):
        self.index = index
        self.documents = documents

        # Build BM25 index
        tokenized_docs = [doc['content'].lower().split() for doc in documents]
        self.bm25 = BM25Okapi(tokenized_docs)
        self.doc_ids = [doc['id'] for doc in documents]

    def search(self, query: str, top_k: int = 10, alpha: float = 0.5):
        """
        Hybrid search with custom score fusion.
        alpha: weight for vector search (1-alpha for BM25)
        """
        # 1. Vector search
        query_embedding = generate_embedding(query)
        vector_results = self.index.query(
            vector=query_embedding,
            top_k=top_k * 2,  # Get more candidates
            include_metadata=True
        )

        # 2. BM25 search
        tokenized_query = query.lower().split()
        bm25_scores = self.bm25.get_scores(tokenized_query)

        # 3. Normalize scores
        vector_scores = {
            m.id: m.score for m in vector_results.matches
        }
        max_vec_score = max(vector_scores.values()) if vector_scores else 1.0

        max_bm25_score = max(bm25_scores) if max(bm25_scores) > 0 else 1.0

        # 4. Combine scores
        hybrid_scores = {}
        all_ids = set(vector_scores.keys()) | set(self.doc_ids)

        for doc_id in all_ids:
            vec_score = vector_scores.get(doc_id, 0) / max_vec_score
            idx = self.doc_ids.index(doc_id) if doc_id in self.doc_ids else -1
            bm25_score = bm25_scores[idx] / max_bm25_score if idx >= 0 else 0

            hybrid_scores[doc_id] = (alpha * vec_score) + ((1 - alpha) * bm25_score)

        # 5. Rank and return top-k
        ranked = sorted(hybrid_scores.items(), key=lambda x: x[1], reverse=True)
        return ranked[:top_k]
python
from rank_bm25 import BM25Okapi
import numpy as np

class HybridSearchEngine:
    """Custom hybrid search combining BM25 and vector search."""

    def __init__(self, index, documents: List[Dict]):
        self.index = index
        self.documents = documents

        # Build BM25 index
        tokenized_docs = [doc['content'].lower().split() for doc in documents]
        self.bm25 = BM25Okapi(tokenized_docs)
        self.doc_ids = [doc['id'] for doc in documents]

    def search(self, query: str, top_k: int = 10, alpha: float = 0.5):
        """
        Hybrid search with custom score fusion.
        alpha: weight for vector search (1-alpha for BM25)
        """
        # 1. Vector search
        query_embedding = generate_embedding(query)
        vector_results = self.index.query(
            vector=query_embedding,
            top_k=top_k * 2,  # Get more candidates
            include_metadata=True
        )

        # 2. BM25 search
        tokenized_query = query.lower().split()
        bm25_scores = self.bm25.get_scores(tokenized_query)

        # 3. Normalize scores
        vector_scores = {
            m.id: m.score for m in vector_results.matches
        }
        max_vec_score = max(vector_scores.values()) if vector_scores else 1.0

        max_bm25_score = max(bm25_scores) if max(bm25_scores) > 0 else 1.0

        # 4. Combine scores
        hybrid_scores = {}
        all_ids = set(vector_scores.keys()) | set(self.doc_ids)

        for doc_id in all_ids:
            vec_score = vector_scores.get(doc_id, 0) / max_vec_score
            idx = self.doc_ids.index(doc_id) if doc_id in self.doc_ids else -1
            bm25_score = bm25_scores[idx] / max_bm25_score if idx >= 0 else 0

            hybrid_scores[doc_id] = (alpha * vec_score) + ((1 - alpha) * bm25_score)

        # 5. Rank and return top-k
        ranked = sorted(hybrid_scores.items(), key=lambda x: x[1], reverse=True)
        return ranked[:top_k]

Usage

Usage

engine = HybridSearchEngine(index, documents) results = engine.search("machine learning databases", top_k=10, alpha=0.7)
undefined
engine = HybridSearchEngine(index, documents) results = engine.search("machine learning databases", top_k=10, alpha=0.7)
undefined

Namespace & Collection Management

命名空间与集合管理

Pinecone Namespaces

Pinecone 命名空间

python
from pinecone import Pinecone

pc = Pinecone(api_key="YOUR_API_KEY")
index = pc.Index("production-search")
python
from pinecone import Pinecone

pc = Pinecone(api_key="YOUR_API_KEY")
index = pc.Index("production-search")

1. Upsert to specific namespace

1. Upsert to specific namespace

index.upsert( vectors=[ {"id": "doc1", "values": [...], "metadata": {...}} ], namespace="production" )
index.upsert( vectors=[ {"id": "doc1", "values": [...], "metadata": {...}} ], namespace="production" )

2. Query specific namespace

2. Query specific namespace

results = index.query( vector=[...], top_k=10, namespace="production", include_metadata=True )
results = index.query( vector=[...], top_k=10, namespace="production", include_metadata=True )

3. Get namespace statistics

3. Get namespace statistics

stats = index.describe_index_stats() for namespace, info in stats.namespaces.items(): print(f"Namespace: {namespace}") print(f" Vector count: {info.vector_count}")
stats = index.describe_index_stats() for namespace, info in stats.namespaces.items(): print(f"Namespace: {namespace}") print(f" Vector count: {info.vector_count}")

4. Delete all vectors in namespace

4. Delete all vectors in namespace

index.delete(delete_all=True, namespace="test")
index.delete(delete_all=True, namespace="test")

5. Multi-namespace architecture

5. Multi-namespace architecture

NAMESPACES = { "production": "Live user-facing data", "staging": "Testing before production", "development": "Development and experiments", "archive": "Historical data" }
def upsert_with_environment(vectors, environment="production"): """Upsert to appropriate namespace.""" namespace = environment if environment in NAMESPACES else "development" index.upsert(vectors=vectors, namespace=namespace)
def search_across_namespaces(query_vector, namespaces=["production", "archive"]): """Search multiple namespaces and combine results.""" all_results = []
for ns in namespaces:
    results = index.query(
        vector=query_vector,
        top_k=10,
        namespace=ns,
        include_metadata=True
    )
    for match in results.matches:
        match.metadata["source_namespace"] = ns
        all_results.append(match)

# Sort by score
all_results.sort(key=lambda x: x.score, reverse=True)
return all_results[:10]
undefined
NAMESPACES = { "production": "Live user-facing data", "staging": "Testing before production", "development": "Development and experiments", "archive": "Historical data" }
def upsert_with_environment(vectors, environment="production"): """Upsert to appropriate namespace.""" namespace = environment if environment in NAMESPACES else "development" index.upsert(vectors=vectors, namespace=namespace)
def search_across_namespaces(query_vector, namespaces=["production", "archive"]): """Search multiple namespaces and combine results.""" all_results = []
for ns in namespaces:
    results = index.query(
        vector=query_vector,
        top_k=10,
        namespace=ns,
        include_metadata=True
    )
    for match in results.matches:
        match.metadata["source_namespace"] = ns
        all_results.append(match)

# Sort by score
all_results.sort(key=lambda x: x.score, reverse=True)
return all_results[:10]
undefined

Weaviate Collections

Weaviate 集合

python
import weaviate
from weaviate.classes.config import Configure

client = weaviate.connect_to_local()
python
import weaviate
from weaviate.classes.config import Configure

client = weaviate.connect_to_local()

1. Create multiple collections

1. Create multiple collections

collections_config = [ { "name": "Products", "properties": ["name", "description", "category", "price"] }, { "name": "Users", "properties": ["username", "bio", "interests"] }, { "name": "Reviews", "properties": ["content", "rating", "product_id", "user_id"] } ]
for config in collections_config: try: client.collections.create( name=config["name"], vectorizer_config=Configure.Vectorizer.text2vec_openai() ) except Exception as e: print(f"Collection {config['name']} exists: {e}")
collections_config = [ { "name": "Products", "properties": ["name", "description", "category", "price"] }, { "name": "Users", "properties": ["username", "bio", "interests"] }, { "name": "Reviews", "properties": ["content", "rating", "product_id", "user_id"] } ]
for config in collections_config: try: client.collections.create( name=config["name"], vectorizer_config=Configure.Vectorizer.text2vec_openai() ) except Exception as e: print(f"Collection {config['name']} exists: {e}")

2. Cross-collection references

2. Cross-collection references

client.collections.create( name="Orders", references=[ weaviate.classes.config.ReferenceProperty( name="hasProduct", target_collection="Products" ), weaviate.classes.config.ReferenceProperty( name="byUser", target_collection="Users" ) ] )
client.collections.create( name="Orders", references=[ weaviate.classes.config.ReferenceProperty( name="hasProduct", target_collection="Products" ), weaviate.classes.config.ReferenceProperty( name="byUser", target_collection="Users" ) ] )

3. Multi-collection search

3. Multi-collection search

def search_all_collections(query: str): """Search across multiple collections.""" results = {}
for collection_name in ["Products", "Users", "Reviews"]:
    collection = client.collections.get(collection_name)
    response = collection.query.near_text(
        query=query,
        limit=5
    )
    results[collection_name] = response.objects

return results
def search_all_collections(query: str): """Search across multiple collections.""" results = {}
for collection_name in ["Products", "Users", "Reviews"]:
    collection = client.collections.get(collection_name)
    response = collection.query.near_text(
        query=query,
        limit=5
    )
    results[collection_name] = response.objects

return results

4. Delete collection

4. Delete collection

client.collections.delete("TestCollection")
undefined
client.collections.delete("TestCollection")
undefined

Chroma Collections

Chroma 集合

python
import chromadb

client = chromadb.PersistentClient(path="./chroma_db")
python
import chromadb

client = chromadb.PersistentClient(path="./chroma_db")

1. Create multiple collections

1. Create multiple collections

collections = { "documents": { "metadata": {"description": "Document embeddings"}, "embedding_function": None # Use default }, "images": { "metadata": {"description": "Image embeddings"}, "embedding_function": None }, "code": { "metadata": {"description": "Code snippets"}, "embedding_function": None } }
for name, config in collections.items(): collection = client.get_or_create_collection( name=name, metadata=config["metadata"] )
collections = { "documents": { "metadata": {"description": "Document embeddings"}, "embedding_function": None # Use default }, "images": { "metadata": {"description": "Image embeddings"}, "embedding_function": None }, "code": { "metadata": {"description": "Code snippets"}, "embedding_function": None } }
for name, config in collections.items(): collection = client.get_or_create_collection( name=name, metadata=config["metadata"] )

2. List all collections

2. List all collections

all_collections = client.list_collections() for coll in all_collections: print(f"Collection: {coll.name}") print(f" Count: {coll.count()}") print(f" Metadata: {coll.metadata}")
all_collections = client.list_collections() for coll in all_collections: print(f"Collection: {coll.name}") print(f" Count: {coll.count()}") print(f" Metadata: {coll.metadata}")

3. Collection-specific operations

3. Collection-specific operations

docs_collection = client.get_collection("documents") docs_collection.add( documents=["Document text..."], metadatas=[{"type": "article"}], ids=["doc1"] )
docs_collection = client.get_collection("documents") docs_collection.add( documents=["Document text..."], metadatas=[{"type": "article"}], ids=["doc1"] )

4. Delete collection

4. Delete collection

client.delete_collection("test_collection")
client.delete_collection("test_collection")

5. Multi-collection search

5. Multi-collection search

def search_all_collections(query: str, n_results: int = 5): """Search across all collections.""" results = {}
for collection in client.list_collections():
    try:
        collection_results = collection.query(
            query_texts=[query],
            n_results=n_results
        )
        results[collection.name] = collection_results
    except Exception as e:
        print(f"Error searching {collection.name}: {e}")

return results
undefined
def search_all_collections(query: str, n_results: int = 5): """Search across all collections.""" results = {}
for collection in client.list_collections():
    try:
        collection_results = collection.query(
            query_texts=[query],
            n_results=n_results
        )
        results[collection.name] = collection_results
    except Exception as e:
        print(f"Error searching {collection.name}: {e}")

return results
undefined

Performance & Scaling

性能与扩容

Batch Operations Best Practices

批量操作最佳实践

python
from pinecone import Pinecone
from typing import List, Dict
import asyncio
from concurrent.futures import ThreadPoolExecutor
import time

pc = Pinecone(api_key="YOUR_API_KEY")
index = pc.Index("production-search")
python
from pinecone import Pinecone
from typing import List, Dict
import asyncio
from concurrent.futures import ThreadPoolExecutor
import time

pc = Pinecone(api_key="YOUR_API_KEY")
index = pc.Index("production-search")

1. Optimal batch size

1. Optimal batch size

OPTIMAL_BATCH_SIZE = 100 # Pinecone recommendation
def batch_upsert(vectors: List[Dict], batch_size: int = OPTIMAL_BATCH_SIZE): """Efficiently upsert vectors in batches.""" total_batches = (len(vectors) + batch_size - 1) // batch_size
for i in range(0, len(vectors), batch_size):
    batch = vectors[i:i + batch_size]
    index.upsert(vectors=batch, namespace="documents")

    if (i // batch_size + 1) % 10 == 0:
        print(f"Processed {i // batch_size + 1}/{total_batches} batches")
OPTIMAL_BATCH_SIZE = 100 # Pinecone recommendation
def batch_upsert(vectors: List[Dict], batch_size: int = OPTIMAL_BATCH_SIZE): """Efficiently upsert vectors in batches.""" total_batches = (len(vectors) + batch_size - 1) // batch_size
for i in range(0, len(vectors), batch_size):
    batch = vectors[i:i + batch_size]
    index.upsert(vectors=batch, namespace="documents")

    if (i // batch_size + 1) % 10 == 0:
        print(f"Processed {i // batch_size + 1}/{total_batches} batches")

2. Parallel batch upsert

2. Parallel batch upsert

def parallel_batch_upsert(vectors: List[Dict], num_workers: int = 4): """Parallel upsert using thread pool.""" batch_size = 100 batches = [ vectors[i:i + batch_size] for i in range(0, len(vectors), batch_size) ]
def upsert_batch(batch):
    try:
        index.upsert(vectors=batch, namespace="documents")
        return len(batch)
    except Exception as e:
        print(f"Error upserting batch: {e}")
        return 0

with ThreadPoolExecutor(max_workers=num_workers) as executor:
    results = list(executor.map(upsert_batch, batches))

print(f"Successfully upserted {sum(results)} vectors")
def parallel_batch_upsert(vectors: List[Dict], num_workers: int = 4): """Parallel upsert using thread pool.""" batch_size = 100 batches = [ vectors[i:i + batch_size] for i in range(0, len(vectors), batch_size) ]
def upsert_batch(batch):
    try:
        index.upsert(vectors=batch, namespace="documents")
        return len(batch)
    except Exception as e:
        print(f"Error upserting batch: {e}")
        return 0

with ThreadPoolExecutor(max_workers=num_workers) as executor:
    results = list(executor.map(upsert_batch, batches))

print(f"Successfully upserted {sum(results)} vectors")

3. Rate limiting for API calls

3. Rate limiting for API calls

class RateLimiter: """Simple rate limiter for API calls."""
def __init__(self, max_calls: int, time_window: float):
    self.max_calls = max_calls
    self.time_window = time_window
    self.calls = []

def wait_if_needed(self):
    """Wait if rate limit would be exceeded."""
    now = time.time()
    # Remove old calls outside time window
    self.calls = [call_time for call_time in self.calls
                  if now - call_time < self.time_window]

    if len(self.calls) >= self.max_calls:
        sleep_time = self.time_window - (now - self.calls[0])
        if sleep_time > 0:
            time.sleep(sleep_time)
            self.calls = []

    self.calls.append(now)
class RateLimiter: """Simple rate limiter for API calls."""
def __init__(self, max_calls: int, time_window: float):
    self.max_calls = max_calls
    self.time_window = time_window
    self.calls = []

def wait_if_needed(self):
    """Wait if rate limit would be exceeded."""
    now = time.time()
    # Remove old calls outside time window
    self.calls = [call_time for call_time in self.calls
                  if now - call_time < self.time_window]

    if len(self.calls) >= self.max_calls:
        sleep_time = self.time_window - (now - self.calls[0])
        if sleep_time > 0:
            time.sleep(sleep_time)
            self.calls = []

    self.calls.append(now)

Usage

Usage

rate_limiter = RateLimiter(max_calls=100, time_window=60) # 100 calls/minute
for batch in batches: rate_limiter.wait_if_needed() index.upsert(vectors=batch)
rate_limiter = RateLimiter(max_calls=100, time_window=60) # 100 calls/minute
for batch in batches: rate_limiter.wait_if_needed() index.upsert(vectors=batch)

4. Bulk delete optimization

4. Bulk delete optimization

def bulk_delete_by_filter(filter_dict: Dict, namespace: str = "documents"): """Delete vectors matching filter (more efficient than individual deletes).""" # First, get IDs matching filter results = index.query( vector=[0] * 1536, # Dummy vector top_k=10000, # Max allowed filter=filter_dict, namespace=namespace, include_values=False )
ids_to_delete = [match.id for match in results.matches]

# Delete in batches
batch_size = 1000
for i in range(0, len(ids_to_delete), batch_size):
    batch = ids_to_delete[i:i + batch_size]
    index.delete(ids=batch, namespace=namespace)
    print(f"Deleted {len(batch)} vectors")
undefined
def bulk_delete_by_filter(filter_dict: Dict, namespace: str = "documents"): """Delete vectors matching filter (more efficient than individual deletes).""" # First, get IDs matching filter results = index.query( vector=[0] * 1536, # Dummy vector top_k=10000, # Max allowed filter=filter_dict, namespace=namespace, include_values=False )
ids_to_delete = [match.id for match in results.matches]

# Delete in batches
batch_size = 1000
for i in range(0, len(ids_to_delete), batch_size):
    batch = ids_to_delete[i:i + batch_size]
    index.delete(ids=batch, namespace=namespace)
    print(f"Deleted {len(batch)} vectors")
undefined

Query Optimization

查询优化

python
undefined
python
undefined

1. Minimize data transfer

1. Minimize data transfer

results = index.query( vector=query_vector, top_k=10, include_values=False, # Don't return vectors if not needed include_metadata=False, # Don't return metadata if not needed namespace="documents" )
results = index.query( vector=query_vector, top_k=10, include_values=False, # Don't return vectors if not needed include_metadata=False, # Don't return metadata if not needed namespace="documents" )

2. Use appropriate top_k

2. Use appropriate top_k

Smaller top_k = faster queries

Smaller top_k = faster queries

results_small = index.query(vector=query_vector, top_k=10) # Fast results_large = index.query(vector=query_vector, top_k=1000) # Slower
results_small = index.query(vector=query_vector, top_k=10) # Fast results_large = index.query(vector=query_vector, top_k=1000) # Slower

3. Filter before vector search when possible

3. Filter before vector search when possible

Good: Reduces search space

Good: Reduces search space

results = index.query( vector=query_vector, top_k=10, filter={"category": "education"}, # Reduces candidates namespace="documents" )
results = index.query( vector=query_vector, top_k=10, filter={"category": "education"}, # Reduces candidates namespace="documents" )

4. Batch queries when possible

4. Batch queries when possible

More efficient than individual queries

More efficient than individual queries

queries = [embedding1, embedding2, embedding3] results = index.query( queries=queries, top_k=10, namespace="documents" )
queries = [embedding1, embedding2, embedding3] results = index.query( queries=queries, top_k=10, namespace="documents" )

5. Cache frequent queries

5. Cache frequent queries

from functools import lru_cache import hashlib import json
def vector_hash(vector: List[float]) -> str: """Create hash of vector for caching.""" return hashlib.md5(json.dumps(vector).encode()).hexdigest()
class CachedIndex: """Wrapper with query caching."""
def __init__(self, index, cache_size: int = 1000):
    self.index = index
    self.cache = {}
    self.cache_size = cache_size

def query(self, vector: List[float], top_k: int = 10, **kwargs):
    """Query with caching."""
    cache_key = f"{vector_hash(vector)}_{top_k}_{json.dumps(kwargs)}"

    if cache_key in self.cache:
        return self.cache[cache_key]

    results = self.index.query(vector=vector, top_k=top_k, **kwargs)

    if len(self.cache) >= self.cache_size:
        # Remove oldest entry
        self.cache.pop(next(iter(self.cache)))

    self.cache[cache_key] = results
    return results
from functools import lru_cache import hashlib import json
def vector_hash(vector: List[float]) -> str: """Create hash of vector for caching.""" return hashlib.md5(json.dumps(vector).encode()).hexdigest()
class CachedIndex: """Wrapper with query caching."""
def __init__(self, index, cache_size: int = 1000):
    self.index = index
    self.cache = {}
    self.cache_size = cache_size

def query(self, vector: List[float], top_k: int = 10, **kwargs):
    """Query with caching."""
    cache_key = f"{vector_hash(vector)}_{top_k}_{json.dumps(kwargs)}"

    if cache_key in self.cache:
        return self.cache[cache_key]

    results = self.index.query(vector=vector, top_k=top_k, **kwargs)

    if len(self.cache) >= self.cache_size:
        # Remove oldest entry
        self.cache.pop(next(iter(self.cache)))

    self.cache[cache_key] = results
    return results

Usage

Usage

cached_index = CachedIndex(index) results = cached_index.query(query_vector, top_k=10) # Cached on subsequent calls
undefined
cached_index = CachedIndex(index) results = cached_index.query(query_vector, top_k=10) # Cached on subsequent calls
undefined

Scaling Strategies

扩容策略

python
undefined
python
undefined

1. Index sizing for scale

1. Index sizing for scale

def calculate_index_requirements( num_vectors: int, dimension: int, metadata_size_per_vector: int = 1024 # bytes ) -> Dict: """Calculate storage and cost for index.""" # Approximate calculations vector_size = dimension * 4 # 4 bytes per float32 total_vector_storage = num_vectors * vector_size total_metadata_storage = num_vectors * metadata_size_per_vector total_storage = total_vector_storage + total_metadata_storage
# Pinecone pricing (approximate)
storage_cost_per_gb_month = 0.095  # Serverless pricing
total_gb = total_storage / (1024 ** 3)
monthly_storage_cost = total_gb * storage_cost_per_gb_month

return {
    "num_vectors": num_vectors,
    "total_storage_gb": round(total_gb, 2),
    "monthly_storage_cost_usd": round(monthly_storage_cost, 2),
    "recommended_pod_type": "s1.x1" if num_vectors > 10_000_000 else "p1.x1"
}
def calculate_index_requirements( num_vectors: int, dimension: int, metadata_size_per_vector: int = 1024 # bytes ) -> Dict: """Calculate storage and cost for index.""" # Approximate calculations vector_size = dimension * 4 # 4 bytes per float32 total_vector_storage = num_vectors * vector_size total_metadata_storage = num_vectors * metadata_size_per_vector total_storage = total_vector_storage + total_metadata_storage
# Pinecone pricing (approximate)
storage_cost_per_gb_month = 0.095  # Serverless pricing
total_gb = total_storage / (1024 ** 3)
monthly_storage_cost = total_gb * storage_cost_per_gb_month

return {
    "num_vectors": num_vectors,
    "total_storage_gb": round(total_gb, 2),
    "monthly_storage_cost_usd": round(monthly_storage_cost, 2),
    "recommended_pod_type": "s1.x1" if num_vectors > 10_000_000 else "p1.x1"
}

Example

Example

reqs = calculate_index_requirements( num_vectors=10_000_000, dimension=1536 ) print(f"10M vectors storage: {reqs['total_storage_gb']} GB") print(f"Monthly cost: ${reqs['monthly_storage_cost_usd']}")
reqs = calculate_index_requirements( num_vectors=10_000_000, dimension=1536 ) print(f"10M vectors storage: {reqs['total_storage_gb']} GB") print(f"Monthly cost: ${reqs['monthly_storage_cost_usd']}")

2. Sharding strategy for massive scale

2. Sharding strategy for massive scale

def create_sharded_indexes( base_name: str, num_shards: int, dimension: int, metric: str = "cosine" ): """Create multiple indexes for horizontal scaling.""" indexes = []
for shard_id in range(num_shards):
    index_name = f"{base_name}-shard-{shard_id}"

    pc.create_index(
        name=index_name,
        dimension=dimension,
        metric=metric,
        spec=ServerlessSpec(cloud="aws", region="us-east-1")
    )

    indexes.append(index_name)

return indexes
def route_to_shard(vector_id: str, num_shards: int) -> int: """Determine which shard a vector belongs to.""" return hash(vector_id) % num_shards
def query_sharded_indexes(query_vector: List[float], indexes: List, top_k: int = 10): """Query all shards and merge results.""" all_results = []
for index_name in indexes:
    idx = pc.Index(index_name)
    results = idx.query(
        vector=query_vector,
        top_k=top_k,
        include_metadata=True
    )
    all_results.extend(results.matches)

# Sort by score and return top_k
all_results.sort(key=lambda x: x.score, reverse=True)
return all_results[:top_k]
undefined
def create_sharded_indexes( base_name: str, num_shards: int, dimension: int, metric: str = "cosine" ): """Create multiple indexes for horizontal scaling.""" indexes = []
for shard_id in range(num_shards):
    index_name = f"{base_name}-shard-{shard_id}"

    pc.create_index(
        name=index_name,
        dimension=dimension,
        metric=metric,
        spec=ServerlessSpec(cloud="aws", region="us-east-1")
    )

    indexes.append(index_name)

return indexes
def route_to_shard(vector_id: str, num_shards: int) -> int: """Determine which shard a vector belongs to.""" return hash(vector_id) % num_shards
def query_sharded_indexes(query_vector: List[float], indexes: List, top_k: int = 10): """Query all shards and merge results.""" all_results = []
for index_name in indexes:
    idx = pc.Index(index_name)
    results = idx.query(
        vector=query_vector,
        top_k=top_k,
        include_metadata=True
    )
    all_results.extend(results.matches)

# Sort by score and return top_k
all_results.sort(key=lambda x: x.score, reverse=True)
return all_results[:top_k]
undefined

Production Best Practices

生产环境最佳实践

Error Handling & Retries

错误处理与重试

python
import time
from typing import Optional, Callable
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class PineconeRetryHandler:
    """Robust error handling for Pinecone operations."""

    def __init__(self, max_retries: int = 3, base_delay: float = 1.0):
        self.max_retries = max_retries
        self.base_delay = base_delay

    def retry_with_backoff(
        self,
        operation: Callable,
        *args,
        **kwargs
    ) -> Optional[any]:
        """Retry operation with exponential backoff."""
        for attempt in range(self.max_retries):
            try:
                return operation(*args, **kwargs)
            except Exception as e:
                if attempt == self.max_retries - 1:
                    logger.error(f"Operation failed after {self.max_retries} attempts: {e}")
                    raise

                delay = self.base_delay * (2 ** attempt)
                logger.warning(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay}s...")
                time.sleep(delay)

        return None
python
import time
from typing import Optional, Callable
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class PineconeRetryHandler:
    """Robust error handling for Pinecone operations."""

    def __init__(self, max_retries: int = 3, base_delay: float = 1.0):
        self.max_retries = max_retries
        self.base_delay = base_delay

    def retry_with_backoff(
        self,
        operation: Callable,
        *args,
        **kwargs
    ) -> Optional[any]:
        """Retry operation with exponential backoff."""
        for attempt in range(self.max_retries):
            try:
                return operation(*args, **kwargs)
            except Exception as e:
                if attempt == self.max_retries - 1:
                    logger.error(f"Operation failed after {self.max_retries} attempts: {e}")
                    raise

                delay = self.base_delay * (2 ** attempt)
                logger.warning(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay}s...")
                time.sleep(delay)

        return None

Usage

Usage

retry_handler = PineconeRetryHandler(max_retries=3)
retry_handler = PineconeRetryHandler(max_retries=3)

Upsert with retry

Upsert with retry

def safe_upsert(vectors, namespace="documents"): return retry_handler.retry_with_backoff( index.upsert, vectors=vectors, namespace=namespace )
def safe_upsert(vectors, namespace="documents"): return retry_handler.retry_with_backoff( index.upsert, vectors=vectors, namespace=namespace )

Query with retry

Query with retry

def safe_query(vector, top_k=10, **kwargs): return retry_handler.retry_with_backoff( index.query, vector=vector, top_k=top_k, **kwargs )
def safe_query(vector, top_k=10, **kwargs): return retry_handler.retry_with_backoff( index.query, vector=vector, top_k=top_k, **kwargs )

Example

Example

try: results = safe_query(query_vector, top_k=10, include_metadata=True) except Exception as e: logger.error(f"Query failed permanently: {e}")
undefined
try: results = safe_query(query_vector, top_k=10, include_metadata=True) except Exception as e: logger.error(f"Query failed permanently: {e}")
undefined

Monitoring & Observability

监控与可观测性

python
import time
from dataclasses import dataclass
from typing import Dict, List
from datetime import datetime

@dataclass
class QueryMetrics:
    """Track query performance metrics."""
    query_time: float
    result_count: int
    top_score: float
    timestamp: datetime
    namespace: str
    filter_used: bool

class VectorDBMonitor:
    """Monitor vector database operations."""

    def __init__(self):
        self.metrics: List[QueryMetrics] = []

    def track_query(
        self,
        query_func: Callable,
        *args,
        **kwargs
    ):
        """Track query execution and metrics."""
        start_time = time.time()
        results = query_func(*args, **kwargs)
        elapsed = time.time() - start_time

        metrics = QueryMetrics(
            query_time=elapsed,
            result_count=len(results.matches),
            top_score=results.matches[0].score if results.matches else 0.0,
            timestamp=datetime.now(),
            namespace=kwargs.get('namespace', 'default'),
            filter_used='filter' in kwargs
        )

        self.metrics.append(metrics)

        # Alert on slow queries
        if elapsed > 1.0:  # 1 second threshold
            logger.warning(f"Slow query detected: {elapsed:.2f}s")

        return results

    def get_stats(self) -> Dict:
        """Get aggregate statistics."""
        if not self.metrics:
            return {}

        query_times = [m.query_time for m in self.metrics]

        return {
            "total_queries": len(self.metrics),
            "avg_query_time": sum(query_times) / len(query_times),
            "p95_query_time": sorted(query_times)[int(len(query_times) * 0.95)],
            "p99_query_time": sorted(query_times)[int(len(query_times) * 0.99)],
            "avg_results": sum(m.result_count for m in self.metrics) / len(self.metrics),
            "filtered_queries_pct": sum(1 for m in self.metrics if m.filter_used) / len(self.metrics) * 100
        }
python
import time
from dataclasses import dataclass
from typing import Dict, List
from datetime import datetime

@dataclass
class QueryMetrics:
    """Track query performance metrics."""
    query_time: float
    result_count: int
    top_score: float
    timestamp: datetime
    namespace: str
    filter_used: bool

class VectorDBMonitor:
    """Monitor vector database operations."""

    def __init__(self):
        self.metrics: List[QueryMetrics] = []

    def track_query(
        self,
        query_func: Callable,
        *args,
        **kwargs
    ):
        """Track query execution and metrics."""
        start_time = time.time()
        results = query_func(*args, **kwargs)
        elapsed = time.time() - start_time

        metrics = QueryMetrics(
            query_time=elapsed,
            result_count=len(results.matches),
            top_score=results.matches[0].score if results.matches else 0.0,
            timestamp=datetime.now(),
            namespace=kwargs.get('namespace', 'default'),
            filter_used='filter' in kwargs
        )

        self.metrics.append(metrics)

        # Alert on slow queries
        if elapsed > 1.0:  # 1 second threshold
            logger.warning(f"Slow query detected: {elapsed:.2f}s")

        return results

    def get_stats(self) -> Dict:
        """Get aggregate statistics."""
        if not self.metrics:
            return {}

        query_times = [m.query_time for m in self.metrics]

        return {
            "total_queries": len(self.metrics),
            "avg_query_time": sum(query_times) / len(query_times),
            "p95_query_time": sorted(query_times)[int(len(query_times) * 0.95)],
            "p99_query_time": sorted(query_times)[int(len(query_times) * 0.99)],
            "avg_results": sum(m.result_count for m in self.metrics) / len(self.metrics),
            "filtered_queries_pct": sum(1 for m in self.metrics if m.filter_used) / len(self.metrics) * 100
        }

Usage

Usage

monitor = VectorDBMonitor()
monitor = VectorDBMonitor()

Wrap queries

Wrap queries

results = monitor.track_query( index.query, vector=query_vector, top_k=10, namespace="documents", filter={"category": "education"} )
results = monitor.track_query( index.query, vector=query_vector, top_k=10, namespace="documents", filter={"category": "education"} )

Get statistics

Get statistics

stats = monitor.get_stats() print(f"Average query time: {stats['avg_query_time']:.3f}s") print(f"P95 query time: {stats['p95_query_time']:.3f}s")
undefined
stats = monitor.get_stats() print(f"Average query time: {stats['avg_query_time']:.3f}s") print(f"P95 query time: {stats['p95_query_time']:.3f}s")
undefined

Data Validation

数据验证

python
from typing import List, Dict
import numpy as np

class VectorValidator:
    """Validate vectors and metadata before operations."""

    def __init__(self, expected_dimension: int):
        self.expected_dimension = expected_dimension

    def validate_vector(self, vector: List[float]) -> tuple[bool, str]:
        """Validate vector format and content."""
        # Check type
        if not isinstance(vector, (list, np.ndarray)):
            return False, "Vector must be list or numpy array"

        # Check dimension
        if len(vector) != self.expected_dimension:
            return False, f"Expected {self.expected_dimension} dimensions, got {len(vector)}"

        # Check for NaN or Inf
        if any(not np.isfinite(v) for v in vector):
            return False, "Vector contains NaN or Inf values"

        # Check for zero vector
        if all(v == 0 for v in vector):
            return False, "Zero vector not allowed"

        return True, "Valid"

    def validate_metadata(self, metadata: Dict) -> tuple[bool, str]:
        """Validate metadata format."""
        # Check type
        if not isinstance(metadata, dict):
            return False, "Metadata must be dictionary"

        # Check metadata size (Pinecone limit: 40KB)
        metadata_str = str(metadata)
        if len(metadata_str.encode('utf-8')) > 40_000:
            return False, "Metadata exceeds 40KB limit"

        # Check for required fields (customize as needed)
        required_fields = ["title", "category"]
        for field in required_fields:
            if field not in metadata:
                return False, f"Missing required field: {field}"

        return True, "Valid"

    def validate_batch(
        self,
        vectors: List[Dict]
    ) -> tuple[List[Dict], List[str]]:
        """Validate batch of vectors, return valid ones and errors."""
        valid_vectors = []
        errors = []

        for i, item in enumerate(vectors):
            # Validate vector
            is_valid, error = self.validate_vector(item.get('values', []))
            if not is_valid:
                errors.append(f"Vector {i} ({item.get('id', 'unknown')}): {error}")
                continue

            # Validate metadata
            if 'metadata' in item:
                is_valid, error = self.validate_metadata(item['metadata'])
                if not is_valid:
                    errors.append(f"Metadata {i} ({item.get('id', 'unknown')}): {error}")
                    continue

            valid_vectors.append(item)

        return valid_vectors, errors
python
from typing import List, Dict
import numpy as np

class VectorValidator:
    """Validate vectors and metadata before operations."""

    def __init__(self, expected_dimension: int):
        self.expected_dimension = expected_dimension

    def validate_vector(self, vector: List[float]) -> tuple[bool, str]:
        """Validate vector format and content."""
        # Check type
        if not isinstance(vector, (list, np.ndarray)):
            return False, "Vector must be list or numpy array"

        # Check dimension
        if len(vector) != self.expected_dimension:
            return False, f"Expected {self.expected_dimension} dimensions, got {len(vector)}"

        # Check for NaN or Inf
        if any(not np.isfinite(v) for v in vector):
            return False, "Vector contains NaN or Inf values"

        # Check for zero vector
        if all(v == 0 for v in vector):
            return False, "Zero vector not allowed"

        return True, "Valid"

    def validate_metadata(self, metadata: Dict) -> tuple[bool, str]:
        """Validate metadata format."""
        # Check type
        if not isinstance(metadata, dict):
            return False, "Metadata must be dictionary"

        # Check metadata size (Pinecone limit: 40KB)
        metadata_str = str(metadata)
        if len(metadata_str.encode('utf-8')) > 40_000:
            return False, "Metadata exceeds 40KB limit"

        # Check for required fields (customize as needed)
        required_fields = ["title", "category"]
        for field in required_fields:
            if field not in metadata:
                return False, f"Missing required field: {field}"

        return True, "Valid"

    def validate_batch(
        self,
        vectors: List[Dict]
    ) -> tuple[List[Dict], List[str]]:
        """Validate batch of vectors, return valid ones and errors."""
        valid_vectors = []
        errors = []

        for i, item in enumerate(vectors):
            # Validate vector
            is_valid, error = self.validate_vector(item.get('values', []))
            if not is_valid:
                errors.append(f"Vector {i} ({item.get('id', 'unknown')}): {error}")
                continue

            # Validate metadata
            if 'metadata' in item:
                is_valid, error = self.validate_metadata(item['metadata'])
                if not is_valid:
                    errors.append(f"Metadata {i} ({item.get('id', 'unknown')}): {error}")
                    continue

            valid_vectors.append(item)

        return valid_vectors, errors

Usage

Usage

validator = VectorValidator(expected_dimension=1536)
validator = VectorValidator(expected_dimension=1536)

Validate single vector

Validate single vector

is_valid, error = validator.validate_vector(embedding) if not is_valid: print(f"Invalid vector: {error}")
is_valid, error = validator.validate_vector(embedding) if not is_valid: print(f"Invalid vector: {error}")

Validate batch

Validate batch

valid_vectors, errors = validator.validate_batch(vectors_to_upsert) if errors: for error in errors: logger.error(error)
valid_vectors, errors = validator.validate_batch(vectors_to_upsert) if errors: for error in errors: logger.error(error)

Upsert only valid vectors

Upsert only valid vectors

if valid_vectors: index.upsert(vectors=valid_vectors)
undefined
if valid_vectors: index.upsert(vectors=valid_vectors)
undefined

Backup & Disaster Recovery

备份与灾难恢复

python
import json
import gzip
from datetime import datetime
from pathlib import Path

class VectorDBBackup:
    """Backup and restore vector database data."""

    def __init__(self, index, backup_dir: str = "./backups"):
        self.index = index
        self.backup_dir = Path(backup_dir)
        self.backup_dir.mkdir(exist_ok=True)

    def backup_namespace(
        self,
        namespace: str = "documents",
        compress: bool = True
    ) -> str:
        """Backup all vectors in a namespace."""
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        filename = f"backup_{namespace}_{timestamp}.json"

        if compress:
            filename += ".gz"

        filepath = self.backup_dir / filename

        # Fetch all vectors (in batches)
        all_vectors = []
        batch_size = 100

        # Get all IDs first (would need to be tracked separately)
        # This is a simplified example
        stats = self.index.describe_index_stats()

        # For actual implementation, you'd need to track IDs
        # or use fetch with known IDs

        # Save to file
        data = {
            "namespace": namespace,
            "timestamp": timestamp,
            "vector_count": len(all_vectors),
            "vectors": all_vectors
        }

        if compress:
            with gzip.open(filepath, 'wt', encoding='utf-8') as f:
                json.dump(data, f)
        else:
            with open(filepath, 'w') as f:
                json.dump(data, f, indent=2)

        logger.info(f"Backed up {len(all_vectors)} vectors to {filepath}")
        return str(filepath)

    def restore_from_backup(
        self,
        backup_file: str,
        target_namespace: str = None
    ):
        """Restore vectors from backup file."""
        filepath = Path(backup_file)

        # Load backup
        if filepath.suffix == '.gz':
            with gzip.open(filepath, 'rt', encoding='utf-8') as f:
                data = json.load(f)
        else:
            with open(filepath, 'r') as f:
                data = json.load(f)

        namespace = target_namespace or data['namespace']
        vectors = data['vectors']

        # Restore in batches
        batch_size = 100
        for i in range(0, len(vectors), batch_size):
            batch = vectors[i:i + batch_size]
            self.index.upsert(vectors=batch, namespace=namespace)
            logger.info(f"Restored {len(batch)} vectors")

        logger.info(f"Restored {len(vectors)} vectors to namespace '{namespace}'")
python
import json
import gzip
from datetime import datetime
from pathlib import Path

class VectorDBBackup:
    """Backup and restore vector database data."""

    def __init__(self, index, backup_dir: str = "./backups"):
        self.index = index
        self.backup_dir = Path(backup_dir)
        self.backup_dir.mkdir(exist_ok=True)

    def backup_namespace(
        self,
        namespace: str = "documents",
        compress: bool = True
    ) -> str:
        """Backup all vectors in a namespace."""
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        filename = f"backup_{namespace}_{timestamp}.json"

        if compress:
            filename += ".gz"

        filepath = self.backup_dir / filename

        # Fetch all vectors (in batches)
        all_vectors = []
        batch_size = 100

        # Get all IDs first (would need to be tracked separately)
        # This is a simplified example
        stats = self.index.describe_index_stats()

        # For actual implementation, you'd need to track IDs
        # or use fetch with known IDs

        # Save to file
        data = {
            "namespace": namespace,
            "timestamp": timestamp,
            "vector_count": len(all_vectors),
            "vectors": all_vectors
        }

        if compress:
            with gzip.open(filepath, 'wt', encoding='utf-8') as f:
                json.dump(data, f)
        else:
            with open(filepath, 'w') as f:
                json.dump(data, f, indent=2)

        logger.info(f"Backed up {len(all_vectors)} vectors to {filepath}")
        return str(filepath)

    def restore_from_backup(
        self,
        backup_file: str,
        target_namespace: str = None
    ):
        """Restore vectors from backup file."""
        filepath = Path(backup_file)

        # Load backup
        if filepath.suffix == '.gz':
            with gzip.open(filepath, 'rt', encoding='utf-8') as f:
                data = json.load(f)
        else:
            with open(filepath, 'r') as f:
                data = json.load(f)

        namespace = target_namespace or data['namespace']
        vectors = data['vectors']

        # Restore in batches
        batch_size = 100
        for i in range(0, len(vectors), batch_size):
            batch = vectors[i:i + batch_size]
            self.index.upsert(vectors=batch, namespace=namespace)
            logger.info(f"Restored {len(batch)} vectors")

        logger.info(f"Restored {len(vectors)} vectors to namespace '{namespace}'")

Usage

Usage

backup_manager = VectorDBBackup(index)
backup_manager = VectorDBBackup(index)

Backup

Backup

backup_file = backup_manager.backup_namespace("production")
backup_file = backup_manager.backup_namespace("production")

Restore

Restore

backup_manager.restore_from_backup(backup_file, target_namespace="production-restored")
undefined
backup_manager.restore_from_backup(backup_file, target_namespace="production-restored")
undefined

Cost Optimization

成本优化

Storage Optimization

存储优化

python
undefined
python
undefined

1. Reduce metadata size

1. Reduce metadata size

Bad: Storing full content in metadata

Bad: Storing full content in metadata

bad_metadata = { "title": "Long document title", "full_content": "...<entire document>...", # Wastes space "description": "...<long description>...", "extra_field_1": "...", "extra_field_2": "..." }
bad_metadata = { "title": "Long document title", "full_content": "...<entire document>...", # Wastes space "description": "...<long description>...", "extra_field_1": "...", "extra_field_2": "..." }

Good: Store only necessary metadata

Good: Store only necessary metadata

good_metadata = { "title": "Long document title", "doc_id": "doc-123", # Reference to external store "category": "education", "created_at": "2024-01-15" }
good_metadata = { "title": "Long document title", "doc_id": "doc-123", # Reference to external store "category": "education", "created_at": "2024-01-15" }

2. Use selective metadata indexing

2. Use selective metadata indexing

Only index fields you'll filter on

Only index fields you'll filter on

pc.create_index( name="optimized-index", dimension=1536, metric="cosine", spec=ServerlessSpec( cloud="aws", region="us-east-1", schema={ "fields": { "category": {"filterable": True}, # Need to filter "created_at": {"filterable": True}, # Need to filter "title": {"filterable": False}, # Just for display "description": {"filterable": False} # Just for display } } ) )
pc.create_index( name="optimized-index", dimension=1536, metric="cosine", spec=ServerlessSpec( cloud="aws", region="us-east-1", schema={ "fields": { "category": {"filterable": True}, # Need to filter "created_at": {"filterable": True}, # Need to filter "title": {"filterable": False}, # Just for display "description": {"filterable": False} # Just for display } } ) )

3. Regular cleanup of unused vectors

3. Regular cleanup of unused vectors

def cleanup_old_vectors(days_old: int = 90): """Delete vectors older than specified days.""" from datetime import datetime, timedelta
cutoff_date = (datetime.now() - timedelta(days=days_old)).isoformat()

# Delete by filter
index.delete(
    filter={"created_at": {"$lt": cutoff_date}},
    namespace="documents"
)
def cleanup_old_vectors(days_old: int = 90): """Delete vectors older than specified days.""" from datetime import datetime, timedelta
cutoff_date = (datetime.now() - timedelta(days=days_old)).isoformat()

# Delete by filter
index.delete(
    filter={"created_at": {"$lt": cutoff_date}},
    namespace="documents"
)

4. Compress dimensions for smaller models

4. Compress dimensions for smaller models

text-embedding-3-small: 1536 dimensions

text-embedding-3-small: 1536 dimensions

all-MiniLM-L6-v2: 384 dimensions (75% storage reduction)

all-MiniLM-L6-v2: 384 dimensions (75% storage reduction)

Trade-off: slightly lower accuracy for significant cost savings

Trade-off: slightly lower accuracy for significant cost savings

undefined
undefined

Query Cost Optimization

查询成本优化

python
undefined
python
undefined

1. Batch queries instead of individual

1. Batch queries instead of individual

Bad: Multiple individual queries

Bad: Multiple individual queries

for query in queries: results = index.query(vector=query, top_k=10) # N API calls
for query in queries: results = index.query(vector=query, top_k=10) # N API calls

Good: Single batch query

Good: Single batch query

results = index.query( queries=query_vectors, # 1 API call top_k=10 )
results = index.query( queries=query_vectors, # 1 API call top_k=10 )

2. Use appropriate top_k

2. Use appropriate top_k

Larger top_k = more expensive

Larger top_k = more expensive

results = index.query( vector=query_vector, top_k=10, # Usually sufficient # top_k=1000 # Much more expensive )
results = index.query( vector=query_vector, top_k=10, # Usually sufficient # top_k=1000 # Much more expensive )

3. Minimize data transfer

3. Minimize data transfer

results = index.query( vector=query_vector, top_k=10, include_values=False, # Save bandwidth include_metadata=False # Save bandwidth if not needed )
results = index.query( vector=query_vector, top_k=10, include_values=False, # Save bandwidth include_metadata=False # Save bandwidth if not needed )

4. Use caching for repeated queries

4. Use caching for repeated queries

from functools import lru_cache
@lru_cache(maxsize=1000) def cached_search(query_text: str, top_k: int = 10): """Cache search results for identical queries.""" embedding = generate_embedding(query_text) results = index.query( vector=embedding, top_k=top_k, include_metadata=True ) return results
from functools import lru_cache
@lru_cache(maxsize=1000) def cached_search(query_text: str, top_k: int = 10): """Cache search results for identical queries.""" embedding = generate_embedding(query_text) results = index.query( vector=embedding, top_k=top_k, include_metadata=True ) return results

5. Choose serverless vs pods appropriately

5. Choose serverless vs pods appropriately

Serverless: Low/variable traffic (pay per query)

Serverless: Low/variable traffic (pay per query)

Pods: High consistent traffic (fixed cost)

Pods: High consistent traffic (fixed cost)

def choose_deployment_type( queries_per_month: int, avg_response_time_requirement: float = 100 # ms ) -> str: """Recommend deployment type based on usage.""" # Rough cost calculations (update with current pricing) serverless_cost_per_query = 0.0001 # Example pod_cost_per_month = 70 # p1.x1 pod
serverless_monthly_cost = queries_per_month * serverless_cost_per_query

if serverless_monthly_cost < pod_cost_per_month:
    return "serverless"
else:
    return "pods"
undefined
def choose_deployment_type( queries_per_month: int, avg_response_time_requirement: float = 100 # ms ) -> str: """Recommend deployment type based on usage.""" # Rough cost calculations (update with current pricing) serverless_cost_per_query = 0.0001 # Example pod_cost_per_month = 70 # p1.x1 pod
serverless_monthly_cost = queries_per_month * serverless_cost_per_query

if serverless_monthly_cost < pod_cost_per_month:
    return "serverless"
else:
    return "pods"
undefined

Cost Monitoring

成本监控

python
import json
from datetime import datetime, timedelta
from collections import defaultdict

class CostMonitor:
    """Monitor and estimate vector database costs."""

    def __init__(self):
        self.operations = defaultdict(int)
        self.pricing = {
            "serverless_write_units": 0.0000025,  # per write unit
            "serverless_read_units": 0.00000625,  # per read unit
            "serverless_storage_gb": 0.095,  # per GB per month
            "p1_x1_pod": 0.096,  # per hour
            "p2_x1_pod": 0.240,  # per hour
        }

    def track_operation(self, operation_type: str, units: int = 1):
        """Track database operations."""
        self.operations[operation_type] += units

    def estimate_monthly_cost(
        self,
        deployment_type: str,
        storage_gb: float = 0,
        pod_type: str = None
    ) -> Dict:
        """Estimate monthly costs."""
        costs = {}

        if deployment_type == "serverless":
            # Storage cost
            storage_cost = storage_gb * self.pricing["serverless_storage_gb"]

            # Operation costs
            write_cost = (
                self.operations["upsert"] *
                self.pricing["serverless_write_units"]
            )
            read_cost = (
                self.operations["query"] *
                self.pricing["serverless_read_units"]
            )

            costs = {
                "storage": storage_cost,
                "writes": write_cost,
                "reads": read_cost,
                "total": storage_cost + write_cost + read_cost
            }

        elif deployment_type == "pods":
            # Fixed pod cost
            hours_per_month = 730
            pod_cost = self.pricing.get(f"{pod_type}_pod", 0) * hours_per_month

            costs = {
                "pod": pod_cost,
                "total": pod_cost
            }

        return costs

    def get_cost_report(self) -> str:
        """Generate cost report."""
        report = f"\n{'=' * 50}\n"
        report += "VECTOR DATABASE COST REPORT\n"
        report += f"{'=' * 50}\n\n"
        report += "Operations Summary:\n"

        for operation, count in self.operations.items():
            report += f"  {operation}: {count:,}\n"

        report += f"\n{'=' * 50}\n"
        return report
python
import json
from datetime import datetime, timedelta
from collections import defaultdict

class CostMonitor:
    """Monitor and estimate vector database costs."""

    def __init__(self):
        self.operations = defaultdict(int)
        self.pricing = {
            "serverless_write_units": 0.0000025,  # per write unit
            "serverless_read_units": 0.00000625,  # per read unit
            "serverless_storage_gb": 0.095,  # per GB per month
            "p1_x1_pod": 0.096,  # per hour
            "p2_x1_pod": 0.240,  # per hour
        }

    def track_operation(self, operation_type: str, units: int = 1):
        """Track database operations."""
        self.operations[operation_type] += units

    def estimate_monthly_cost(
        self,
        deployment_type: str,
        storage_gb: float = 0,
        pod_type: str = None
    ) -> Dict:
        """Estimate monthly costs."""
        costs = {}

        if deployment_type == "serverless":
            # Storage cost
            storage_cost = storage_gb * self.pricing["serverless_storage_gb"]

            # Operation costs
            write_cost = (
                self.operations["upsert"] *
                self.pricing["serverless_write_units"]
            )
            read_cost = (
                self.operations["query"] *
                self.pricing["serverless_read_units"]
            )

            costs = {
                "storage": storage_cost,
                "writes": write_cost,
                "reads": read_cost,
                "total": storage_cost + write_cost + read_cost
            }

        elif deployment_type == "pods":
            # Fixed pod cost
            hours_per_month = 730
            pod_cost = self.pricing.get(f"{pod_type}_pod", 0) * hours_per_month

            costs = {
                "pod": pod_cost,
                "total": pod_cost
            }

        return costs

    def get_cost_report(self) -> str:
        """Generate cost report."""
        report = f"\n{'=' * 50}\n"
        report += "VECTOR DATABASE COST REPORT\n"
        report += f"{'=' * 50}\n\n"
        report += "Operations Summary:\n"

        for operation, count in self.operations.items():
            report += f"  {operation}: {count:,}\n"

        report += f"\n{'=' * 50}\n"
        return report

Usage

Usage

cost_monitor = CostMonitor()
cost_monitor = CostMonitor()

Track operations

Track operations

def monitored_upsert(vectors, **kwargs): cost_monitor.track_operation("upsert", len(vectors)) return index.upsert(vectors=vectors, **kwargs)
def monitored_query(vector, **kwargs): cost_monitor.track_operation("query", 1) return index.query(vector=vector, **kwargs)
def monitored_upsert(vectors, **kwargs): cost_monitor.track_operation("upsert", len(vectors)) return index.upsert(vectors=vectors, **kwargs)
def monitored_query(vector, **kwargs): cost_monitor.track_operation("query", 1) return index.query(vector=vector, **kwargs)

Get cost estimate

Get cost estimate

monthly_cost = cost_monitor.estimate_monthly_cost( deployment_type="serverless", storage_gb=10.5 )
print(f"Estimated monthly cost: ${monthly_cost['total']:.2f}") print(cost_monitor.get_cost_report())

---
monthly_cost = cost_monitor.estimate_monthly_cost( deployment_type="serverless", storage_gb=10.5 )
print(f"Estimated monthly cost: ${monthly_cost['total']:.2f}") print(cost_monitor.get_cost_report())

---

Summary

总结

This comprehensive guide covers all aspects of vector database management across Pinecone, Weaviate, and Chroma. Key takeaways:
  1. Choose the right database: Pinecone for production scale, Weaviate for knowledge graphs, Chroma for local development
  2. Optimize embeddings: Balance dimension size with accuracy and cost
  3. Use metadata filtering: Combine vector similarity with structured filtering for powerful search
  4. Implement hybrid search: Combine dense and sparse vectors for best results
  5. Scale efficiently: Use batching, caching, and appropriate index configurations
  6. Monitor and optimize costs: Track usage and choose the right deployment type
For more information:
这份综合指南涵盖了Pinecone、Weaviate和Chroma向量数据库管理的所有方面。核心要点:
  1. 选择合适的数据库:Pinecone适用于生产级规模,Weaviate适用于知识图谱,Chroma适用于本地开发
  2. 优化向量嵌入:在维度大小、准确性和成本之间取得平衡
  3. 使用元数据过滤:将向量相似度与结构化过滤相结合,实现强大的搜索功能
  4. 实现混合搜索:结合稠密和稀疏向量以获得最佳结果
  5. 高效扩容:使用批量处理、缓存和合适的索引配置
  6. 监控并优化成本:跟踪使用情况并选择合适的部署类型
更多信息: