recommendation-system
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseRecommendation System
推荐系统
Production-ready architecture for scalable recommendation systems with feature stores, multi-tier caching, A/B testing, and comprehensive monitoring.
具备特征存储、多层缓存、A/B测试和全面监控的可扩展推荐系统生产就绪架构。
When to Use This Skill
何时使用该技能
Load this skill when:
- Building Recommendation APIs: Serving personalized recommendations at scale
- Implementing Caching: Multi-tier caching for sub-millisecond latency
- Running A/B Tests: Experimenting with recommendation algorithms
- Monitoring Quality: Tracking CTR, conversion, diversity, coverage
- Optimizing Performance: Reducing latency, increasing throughput
- Feature Engineering: Managing user/item features with feature stores
在以下场景加载该技能:
- 构建推荐API:大规模提供个性化推荐
- 实现缓存机制:通过多层缓存实现亚毫秒级延迟
- 运行A/B测试:对推荐算法进行实验
- 监控服务质量:追踪CTR、转化率、多样性、覆盖率
- 优化性能:降低延迟、提升吞吐量
- 特征工程:通过特征存储管理用户/物品特征
Quick Start: Recommendation API in 5 Steps
快速开始:5步搭建推荐API
bash
undefinedbash
undefined1. Install dependencies
1. Install dependencies
pip install fastapi==0.109.0 redis==5.0.0 prometheus-client==0.19.0
pip install fastapi==0.109.0 redis==5.0.0 prometheus-client==0.19.0
2. Start Redis (for caching and feature store)
2. Start Redis (for caching and feature store)
docker run -d -p 6379:6379 redis:alpine
docker run -d -p 6379:6379 redis:alpine
3. Create recommendation service: app.py
3. Create recommendation service: app.py
cat > app.py << 'EOF'
from fastapi import FastAPI
from pydantic import BaseModel
from typing import List
import redis
import json
app = FastAPI()
cache = redis.Redis(host='localhost', port=6379, decode_responses=True)
class RecommendationResponse(BaseModel):
user_id: str
items: List[str]
cached: bool
@app.post("/recommendations", response_model=RecommendationResponse)
async def get_recommendations(user_id: str, n: int = 10):
# Check cache
cache_key = f"recs:{user_id}:{n}"
cached = cache.get(cache_key)
if cached:
return RecommendationResponse(
user_id=user_id,
items=json.loads(cached),
cached=True
)
# Generate recommendations (simplified)
items = [f"item_{i}" for i in range(n)]
# Cache for 5 minutes
cache.setex(cache_key, 300, json.dumps(items))
return RecommendationResponse(
user_id=user_id,
items=items,
cached=False
)@app.get("/health")
async def health():
return {"status": "healthy"}
EOF
cat > app.py << 'EOF'
from fastapi import FastAPI
from pydantic import BaseModel
from typing import List
import redis
import json
app = FastAPI()
cache = redis.Redis(host='localhost', port=6379, decode_responses=True)
class RecommendationResponse(BaseModel):
user_id: str
items: List[str]
cached: bool
@app.post("/recommendations", response_model=RecommendationResponse)
async def get_recommendations(user_id: str, n: int = 10):
# Check cache
cache_key = f"recs:{user_id}:{n}"
cached = cache.get(cache_key)
if cached:
return RecommendationResponse(
user_id=user_id,
items=json.loads(cached),
cached=True
)
# Generate recommendations (simplified)
items = [f"item_{i}" for i in range(n)]
# Cache for 5 minutes
cache.setex(cache_key, 300, json.dumps(items))
return RecommendationResponse(
user_id=user_id,
items=items,
cached=False
)@app.get("/health")
async def health():
return {"status": "healthy"}
EOF
4. Run API
4. Run API
uvicorn app:app --host 0.0.0.0 --port 8000
uvicorn app:app --host 0.0.0.0 --port 8000
5. Test
5. Test
curl -X POST "http://localhost:8000/recommendations?user_id=user_123&n=10"
**Result**: Working recommendation API with caching in under 5 minutes.curl -X POST "http://localhost:8000/recommendations?user_id=user_123&n=10"
**结果**:在5分钟内搭建好带有缓存功能的可用推荐API。System Architecture
系统架构
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ User Events │────▶│ Feature │────▶│ Model │
│ (clicks, │ │ Store │ │ Serving │
│ purchases) │ │ (Redis) │ │ │
└─────────────┘ └─────────────┘ └─────────────┘
│ │
▼ ▼
┌─────────────┐ ┌─────────────┐
│ Training │ │ API │
│ Pipeline │ │ (FastAPI) │
└─────────────┘ └─────────────┘
│
▼
┌─────────────┐
│ Monitoring │
│ (Prometheus)│
└─────────────┘┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ User Events │────▶│ Feature │────▶│ Model │
│ (clicks, │ │ Store │ │ Serving │
│ purchases) │ │ (Redis) │ │ │
└─────────────┘ └─────────────┘ └─────────────┘
│ │
▼ ▼
┌─────────────┐ ┌─────────────┐
│ Training │ │ API │
│ Pipeline │ │ (FastAPI) │
└─────────────┘ └─────────────┘
│
▼
┌─────────────┐
│ Monitoring │
│ (Prometheus)│
└─────────────┘Core Components
核心组件
1. Feature Store
1. Feature Store
Centralized storage for user and item features:
python
import redis
import json
class FeatureStore:
"""Fast feature access with Redis caching."""
def __init__(self, redis_client):
self.redis = redis_client
self.ttl = 3600 # 1 hour
def get_user_features(self, user_id: str) -> dict:
cache_key = f"user_features:{user_id}"
cached = self.redis.get(cache_key)
if cached:
return json.loads(cached)
# Fetch from database
features = fetch_from_db(user_id)
# Cache
self.redis.setex(cache_key, self.ttl, json.dumps(features))
return features用于存储用户和物品特征的集中式存储系统:
python
import redis
import json
class FeatureStore:
"""Fast feature access with Redis caching."""
def __init__(self, redis_client):
self.redis = redis_client
self.ttl = 3600 # 1 hour
def get_user_features(self, user_id: str) -> dict:
cache_key = f"user_features:{user_id}"
cached = self.redis.get(cache_key)
if cached:
return json.loads(cached)
# Fetch from database
features = fetch_from_db(user_id)
# Cache
self.redis.setex(cache_key, self.ttl, json.dumps(features))
return features2. Model Serving
2. Model Serving
Serve multiple models for A/B testing:
python
class ModelServing:
"""Serve multiple recommendation models."""
def __init__(self):
self.models = {}
def register_model(self, name: str, model, is_default: bool = False):
self.models[name] = model
if is_default:
self.default_model = name
def predict(self, user_features: dict, item_features: list, model_name: str = None):
model = self.models.get(model_name or self.default_model)
return model.predict(user_features, item_features)为A/B测试提供多模型服务:
python
class ModelServing:
"""Serve multiple recommendation models."""
def __init__(self):
self.models = {}
def register_model(self, name: str, model, is_default: bool = False):
self.models[name] = model
if is_default:
self.default_model = name
def predict(self, user_features: dict, item_features: list, model_name: str = None):
model = self.models.get(model_name or self.default_model)
return model.predict(user_features, item_features)3. Caching Layer
3. Caching Layer
Multi-tier caching for low latency:
python
class TieredCache:
"""L1 (memory) -> L2 (Redis) -> L3 (database)."""
def __init__(self, redis_client):
self.l1_cache = {} # In-memory
self.redis = redis_client # L2
def get(self, key: str):
# L1: In-memory (fastest)
if key in self.l1_cache:
return self.l1_cache[key]
# L2: Redis
cached = self.redis.get(key)
if cached:
value = json.loads(cached)
self.l1_cache[key] = value # Promote to L1
return value
# L3: Miss (fetch from database)
return None实现低延迟的多层缓存:
python
class TieredCache:
"""L1 (memory) -> L2 (Redis) -> L3 (database)."""
def __init__(self, redis_client):
self.l1_cache = {} # In-memory
self.redis = redis_client # L2
def get(self, key: str):
# L1: In-memory (fastest)
if key in self.l1_cache:
return self.l1_cache[key]
# L2: Redis
cached = self.redis.get(key)
if cached:
value = json.loads(cached)
self.l1_cache[key] = value # Promote to L1
return value
# L3: Miss (fetch from database)
return NoneKey Metrics
关键指标
| Metric | Description | Target |
|---|---|---|
| CTR | Click-through rate | >5% |
| Conversion Rate | Purchases from recs | >2% |
| P95 Latency | 95th percentile response time | <200ms |
| Cache Hit Rate | % served from cache | >80% |
| Coverage | % of catalog recommended | >50% |
| Diversity | Variety in recommendations | >0.7 |
| 指标 | 描述 | 目标值 |
|---|---|---|
| CTR | 点击率 | >5% |
| Conversion Rate | 推荐带来的购买转化率 | >2% |
| P95 Latency | 95分位响应时间 | <200ms |
| Cache Hit Rate | 缓存命中占比 | >80% |
| Coverage | 被推荐的商品目录占比 | >50% |
| Diversity | 推荐结果多样性 | >0.7 |
Known Issues Prevention
常见问题预防
1. Cold Start for New Users
1. 新用户冷启动问题
Problem: No recommendations for users without history, poor initial experience.
Solution: Use popularity-based fallback:
python
def get_recommendations(user_id: str, n: int = 10):
user_features = feature_store.get_user_features(user_id)
# Check if new user (no purchase history)
if user_features.get('total_purchases', 0) == 0:
# Fallback to popular items
return get_popular_items(n)
# Personalized recommendations
return generate_personalized_recs(user_id, n)问题:无历史记录的新用户无法获得推荐,初始体验差。
解决方案:基于热度的兜底策略:
python
def get_recommendations(user_id: str, n: int = 10):
user_features = feature_store.get_user_features(user_id)
# Check if new user (no purchase history)
if user_features.get('total_purchases', 0) == 0:
# Fallback to popular items
return get_popular_items(n)
# Personalized recommendations
return generate_personalized_recs(user_id, n)2. Cache Invalidation on User Actions
2. 用户行为导致的缓存失效问题
Problem: User makes purchase, cache still shows purchased item in recommendations.
Solution: Invalidate cache on relevant actions:
python
INVALIDATING_ACTIONS = {'purchase', 'rating', 'add_to_cart'}
def on_user_action(user_id: str, action: str):
if action in INVALIDATING_ACTIONS:
cache_key = f"recs:{user_id}:*"
redis_client.delete(cache_key)
logger.info(f"Invalidated cache for {user_id} due to {action}")问题:用户完成购买后,缓存中仍显示已购商品的推荐。
解决方案:在相关行为发生时失效缓存:
python
INVALIDATING_ACTIONS = {'purchase', 'rating', 'add_to_cart'}
def on_user_action(user_id: str, action: str):
if action in INVALIDATING_ACTIONS:
cache_key = f"recs:{user_id}:*"
redis_client.delete(cache_key)
logger.info(f"Invalidated cache for {user_id} due to {action}")3. Thundering Herd on Cache Expiry
3. 缓存过期导致的流量突增问题
Problem: Many users' caches expire simultaneously, overload database/model.
Solution: Add random jitter to TTL:
python
import random
def set_cache(key: str, value: dict, base_ttl: int = 300):
# Add ±10% jitter
jitter = random.uniform(-0.1, 0.1) * base_ttl
ttl = int(base_ttl + jitter)
redis_client.setex(key, ttl, json.dumps(value))问题:大量用户的缓存同时过期,导致数据库/模型过载。
解决方案:为TTL添加随机抖动:
python
import random
def set_cache(key: str, value: dict, base_ttl: int = 300):
# Add ±10% jitter
jitter = random.uniform(-0.1, 0.1) * base_ttl
ttl = int(base_ttl + jitter)
redis_client.setex(key, ttl, json.dumps(value))4. Poor Diversity = Filter Bubble
4. 推荐多样性不足导致的过滤气泡问题
Problem: Recommendations too similar, users only see same category.
Solution: Implement diversity constraint:
python
def rank_with_diversity(items: list, scores: list, n: int = 10):
selected = []
category_counts = {}
for item, score in sorted(zip(items, scores), key=lambda x: -x[1]):
category = item['category']
# Limit 3 items per category
if category_counts.get(category, 0) >= 3:
continue
selected.append(item)
category_counts[category] = category_counts.get(category, 0) + 1
if len(selected) >= n:
break
return selected问题:推荐结果过于相似,用户仅能看到同一品类的商品。
解决方案:实现多样性约束:
python
def rank_with_diversity(items: list, scores: list, n: int = 10):
selected = []
category_counts = {}
for item, score in sorted(zip(items, scores), key=lambda x: -x[1]):
category = item['category']
# Limit 3 items per category
if category_counts.get(category, 0) >= 3:
continue
selected.append(item)
category_counts[category] = category_counts.get(category, 0) + 1
if len(selected) >= n:
break
return selected5. No Monitoring = Silent Degradation
5. 无监控导致的性能静默退化问题
Problem: Recommendation quality drops, nobody notices until users complain.
Solution: Continuous monitoring with alerts:
python
from prometheus_client import Counter, Histogram
recommendation_clicks = Counter('recommendation_clicks_total')
recommendation_latency = Histogram('recommendation_latency_seconds')
@app.post("/recommendations")
async def get_recommendations(user_id: str):
start = time.time()
recs = generate_recs(user_id)
latency = time.time() - start
recommendation_latency.observe(latency)
return recs
@app.post("/track/click")
async def track_click(user_id: str, item_id: str):
recommendation_clicks.inc()
# Alert if CTR drops below 3%问题:推荐质量下降,但无人察觉,直到用户投诉。
解决方案:持续监控并设置告警:
python
from prometheus_client import Counter, Histogram
recommendation_clicks = Counter('recommendation_clicks_total')
recommendation_latency = Histogram('recommendation_latency_seconds')
@app.post("/recommendations")
async def get_recommendations(user_id: str):
start = time.time()
recs = generate_recs(user_id)
latency = time.time() - start
recommendation_latency.observe(latency)
return recs
@app.post("/track/click")
async def track_click(user_id: str, item_id: str):
recommendation_clicks.inc()
# Alert if CTR drops below 3%6. Stale Features = Outdated Recommendations
6. 特征过时导致的推荐失效问题
Problem: User preferences change but features don't update, recommendations irrelevant.
Solution: Set appropriate TTLs and update triggers:
python
class FeatureStore:
def __init__(self, redis_client):
self.redis = redis_client
# Shorter TTL for frequently changing features
self.user_ttl = 300 # 5 minutes
self.item_ttl = 3600 # 1 hour
def update_on_event(self, user_id: str, event: str):
# Invalidate on important events
if event in ['purchase', 'rating']:
self.redis.delete(f"user_features:{user_id}")
logger.info(f"Refreshed features for {user_id}")问题:用户偏好变化但特征未更新,推荐结果无关。
解决方案:设置合适的TTL和更新触发机制:
python
class FeatureStore:
def __init__(self, redis_client):
self.redis = redis_client
# Shorter TTL for frequently changing features
self.user_ttl = 300 # 5 minutes
self.item_ttl = 3600 # 1 hour
def update_on_event(self, user_id: str, event: str):
# Invalidate on important events
if event in ['purchase', 'rating']:
self.redis.delete(f"user_features:{user_id}")
logger.info(f"Refreshed features for {user_id}")7. A/B Test Sample Size Too Small
7. A/B测试样本量过小问题
Problem: Declare winner too early, results not statistically significant.
Solution: Calculate required sample size first:
python
def calculate_sample_size(
baseline_rate: float,
min_detectable_effect: float,
alpha: float = 0.05,
power: float = 0.8
) -> int:
"""Calculate required sample size per variant."""
from scipy import stats
z_alpha = stats.norm.ppf(1 - alpha/2)
z_beta = stats.norm.ppf(power)
p1 = baseline_rate
p2 = baseline_rate * (1 + min_detectable_effect)
p_avg = (p1 + p2) / 2
n = (
(z_alpha + z_beta)**2 * 2 * p_avg * (1 - p_avg) /
(p2 - p1)**2
)
return int(n)问题:过早宣布测试获胜者,结果不具备统计显著性。
解决方案:先计算所需样本量:
python
def calculate_sample_size(
baseline_rate: float,
min_detectable_effect: float,
alpha: float = 0.05,
power: float = 0.8
) -> int:
"""Calculate required sample size per variant."""
from scipy import stats
z_alpha = stats.norm.ppf(1 - alpha/2)
z_beta = stats.norm.ppf(power)
p1 = baseline_rate
p2 = baseline_rate * (1 + min_detectable_effect)
p_avg = (p1 + p2) / 2
n = (
(z_alpha + z_beta)**2 * 2 * p_avg * (1 - p_avg) /
(p2 - p1)**2
)
return int(n)Example: detect 10% lift with baseline CTR=5%
Example: detect 10% lift with baseline CTR=5%
n_required = calculate_sample_size(
baseline_rate=0.05,
min_detectable_effect=0.10
)
print(f"Required sample size: {n_required} per variant")
n_required = calculate_sample_size(
baseline_rate=0.05,
min_detectable_effect=0.10
)
print(f"Required sample size: {n_required} per variant")
Wait until both variants reach this size before concluding
Wait until both variants reach this size before concluding
undefinedundefinedWhen to Load References
何时加载参考文档
Load reference files for detailed production implementations:
-
Production Architecture: Loadfor complete FeatureStore, ModelServing, and RecommendationService implementations with batch fetching, caching integration, and FastAPI deployment patterns.
references/production-architecture.md -
Caching Strategies: Loadwhen implementing multi-tier caching (L1/L2/L3), cache warming, invalidation strategies, probabilistic refresh, or thundering herd prevention.
references/caching-strategies.md -
A/B Testing Framework: Loadfor deterministic variant assignment, Thompson sampling (multi-armed bandits), Bayesian and frequentist significance testing, and experiment tracking.
references/ab-testing-framework.md -
Monitoring & Alerting: Loadfor Prometheus metrics integration, dashboard endpoints, alert rules, and quality monitoring (diversity, coverage).
references/monitoring-alerting.md
加载参考文件以获取详细的生产级实现方案:
- 生产架构:加载以获取完整的FeatureStore、ModelServing和RecommendationService实现,包括批量获取、缓存集成和FastAPI部署模式。
references/production-architecture.md - 缓存策略:在实现多层缓存(L1/L2/L3)、缓存预热、失效策略、概率性刷新或流量突增预防时,加载。
references/caching-strategies.md - A/B测试框架:加载以获取确定性变体分配、Thompson采样(多臂老虎机)、贝叶斯和频率统计显著性测试,以及实验追踪方案。
references/ab-testing-framework.md - 监控与告警:加载以获取Prometheus指标集成、仪表盘端点、告警规则和质量监控(多样性、覆盖率)方案。
references/monitoring-alerting.md
Best Practices
最佳实践
- Feature Precomputation: Compute features offline, serve from cache
- Batch Fetching: Use Redis MGET for multiple users/items
- Cache Aggressively: 5-15 minute TTL for user recommendations
- Fail Gracefully: Return popular items if personalization fails
- Monitor Everything: Track CTR, latency, diversity, coverage
- A/B Test Continuously: Always be experimenting with new algorithms
- Diversity Constraint: Ensure varied recommendations
- Explain Recommendations: Provide reasons ("Highly rated", "Popular")
- 特征预计算:离线计算特征,从缓存中提供服务
- 批量获取:使用Redis MGET获取多个用户/物品的特征
- 积极缓存:用户推荐设置5-15分钟的TTL
- 优雅降级:个性化推荐失败时返回热门商品
- 全面监控:追踪CTR、延迟、多样性、覆盖率
- 持续A/B测试:持续实验新算法
- 多样性约束:确保推荐结果多样化
- 推荐解释:提供推荐理由(如"高评分"、"热门商品")
Common Patterns
常见模式
Recommendation Service
推荐服务
python
class RecommendationService:
def __init__(self, feature_store, model_serving, cache):
self.feature_store = feature_store
self.model_serving = model_serving
self.cache = cache
def get_recommendations(self, user_id: str, n: int = 10):
# 1. Check cache
cached = self.cache.get(f"recs:{user_id}:{n}")
if cached:
return cached
# 2. Get features
user_features = self.feature_store.get_user_features(user_id)
candidates = self.get_candidates(user_id)
# 3. Score candidates
scores = self.model_serving.predict(user_features, candidates)
# 4. Rank with diversity
recommendations = self.rank_with_diversity(candidates, scores, n)
# 5. Cache
self.cache.set(f"recs:{user_id}:{n}", recommendations, ttl=300)
return recommendationspython
class RecommendationService:
def __init__(self, feature_store, model_serving, cache):
self.feature_store = feature_store
self.model_serving = model_serving
self.cache = cache
def get_recommendations(self, user_id: str, n: int = 10):
# 1. Check cache
cached = self.cache.get(f"recs:{user_id}:{n}")
if cached:
return cached
# 2. Get features
user_features = self.feature_store.get_user_features(user_id)
candidates = self.get_candidates(user_id)
# 3. Score candidates
scores = self.model_serving.predict(user_features, candidates)
# 4. Rank with diversity
recommendations = self.rank_with_diversity(candidates, scores, n)
# 5. Cache
self.cache.set(f"recs:{user_id}:{n}", recommendations, ttl=300)
return recommendationsA/B Testing
A/B测试
python
def assign_variant(user_id: str, experiment_id: str) -> str:
"""Deterministic assignment - same user always gets same variant."""
import hashlib
hash_input = f"{user_id}:{experiment_id}"
hash_value = int(hashlib.md5(hash_input.encode()).hexdigest(), 16)
# 50/50 split
return 'control' if hash_value % 2 == 0 else 'treatment'python
def assign_variant(user_id: str, experiment_id: str) -> str:
"""Deterministic assignment - same user always gets same variant."""
import hashlib
hash_input = f"{user_id}:{experiment_id}"
hash_value = int(hashlib.md5(hash_input.encode()).hexdigest(), 16)
# 50/50 split
return 'control' if hash_value % 2 == 0 else 'treatment'Usage
Usage
variant = assign_variant('user_123', 'rec_algo_v2')
model_name = 'main' if variant == 'control' else 'experimental'
recs = get_recommendations(user_id, model_name=model_name)
undefinedvariant = assign_variant('user_123', 'rec_algo_v2')
model_name = 'main' if variant == 'control' else 'experimental'
recs = get_recommendations(user_id, model_name=model_name)
undefinedMonitoring
监控
python
from prometheus_client import Counter, Histogram
requests_total = Counter('recommendation_requests_total', ['status'])
latency_seconds = Histogram('recommendation_latency_seconds')
@app.post("/recommendations")
async def get_recommendations(user_id: str):
with latency_seconds.time():
try:
recs = generate_recs(user_id)
requests_total.labels(status='success').inc()
return recs
except Exception as e:
requests_total.labels(status='error').inc()
raisepython
from prometheus_client import Counter, Histogram
requests_total = Counter('recommendation_requests_total', ['status'])
latency_seconds = Histogram('recommendation_latency_seconds')
@app.post("/recommendations")
async def get_recommendations(user_id: str):
with latency_seconds.time():
try:
recs = generate_recs(user_id)
requests_total.labels(status='success').inc()
return recs
except Exception as e:
requests_total.labels(status='error').inc()
raise