redis-state-management
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseRedis State Management
Redis状态管理
A comprehensive skill for mastering Redis state management patterns in distributed systems. This skill covers caching strategies, session management, pub/sub messaging, distributed locks, data structures, and production-ready patterns using redis-py.
这是一份掌握分布式系统中Redis状态管理模式的全面指南,涵盖缓存策略、会话管理、发布/订阅消息、分布式锁、数据结构,以及基于redis-py的生产就绪模式。
When to Use This Skill
适用场景
Use this skill when:
- Implementing high-performance caching layers for web applications
- Managing user sessions in distributed environments
- Building real-time messaging and event distribution systems
- Coordinating distributed processes with locks and synchronization
- Storing and querying structured data with Redis data structures
- Optimizing application performance with Redis
- Scaling applications horizontally with shared state
- Implementing rate limiting, counters, and analytics
- Building microservices with Redis as a communication layer
- Managing temporary data with automatic expiration (TTL)
- Implementing leaderboards, queues, and real-time features
在以下场景中使用本指南:
- 为Web应用实现高性能缓存层
- 在分布式环境中管理用户会话
- 构建实时消息与事件分发系统
- 通过锁和同步协调分布式进程
- 使用Redis数据结构存储和查询结构化数据
- 利用Redis优化应用性能
- 通过共享状态实现应用水平扩展
- 实现速率限制、计数器与分析功能
- 以Redis为通信层构建微服务
- 管理带自动过期(TTL)的临时数据
- 实现排行榜、队列与实时功能
Core Concepts
核心概念
Redis Fundamentals
Redis基础
Redis (Remote Dictionary Server) is an in-memory data structure store used as:
- Database: Persistent key-value storage
- Cache: High-speed data layer
- Message Broker: Pub/sub and stream messaging
- Session Store: Distributed session management
Key Characteristics:
- In-memory storage (microsecond latency)
- Optional persistence (RDB snapshots, AOF logs)
- Rich data structures beyond key-value
- Atomic operations on complex data types
- Built-in replication and clustering
- Pub/sub messaging support
- Lua scripting for complex operations
- Pipelining for batch operations
Redis(Remote Dictionary Server)是一款内存数据结构存储,可用于:
- 数据库:持久化键值存储
- 缓存:高速数据层
- 消息中间件:发布/订阅与流消息
- 会话存储:分布式会话管理
核心特性:
- 内存存储(微秒级延迟)
- 可选持久化(RDB快照、AOF日志)
- 丰富的键值之外的数据结构
- 复杂数据类型的原子操作
- 内置复制与集群功能
- 发布/订阅消息支持
- Lua脚本实现复杂操作
- 流水线批量操作
Redis Data Structures
Redis数据结构
Redis provides multiple data types for different use cases:
-
Strings: Simple key-value pairs, binary safe
- Use for: Cache values, counters, flags, JSON objects
- Max size: 512 MB
- Commands: SET, GET, INCR, APPEND
-
Hashes: Field-value maps (objects)
- Use for: User profiles, configuration objects, small entities
- Efficient for storing objects with multiple fields
- Commands: HSET, HGET, HMGET, HINCRBY
-
Lists: Ordered collections (linked lists)
- Use for: Queues, activity feeds, recent items
- Operations at head/tail are O(1)
- Commands: LPUSH, RPUSH, LPOP, RPOP, LRANGE
-
Sets: Unordered unique collections
- Use for: Tags, unique visitors, relationships
- Set operations: union, intersection, difference
- Commands: SADD, SMEMBERS, SISMEMBER, SINTER
-
Sorted Sets: Ordered sets with scores
- Use for: Leaderboards, time-series, priority queues
- Range queries by score or rank
- Commands: ZADD, ZRANGE, ZRANGEBYSCORE, ZRANK
-
Streams: Append-only logs with consumer groups
- Use for: Event sourcing, activity logs, message queues
- Built-in consumer group support
- Commands: XADD, XREAD, XREADGROUP
Redis提供多种数据类型以适配不同场景:
-
字符串:简单键值对,支持二进制安全
- 适用场景:缓存值、计数器、标志位、JSON对象
- 最大容量:512 MB
- 命令:SET, GET, INCR, APPEND
-
哈希:字段-值映射(对象)
- 适用场景:用户资料、配置对象、小型实体
- 高效存储多字段对象
- 命令:HSET, HGET, HMGET, HINCRBY
-
列表:有序集合(链表)
- 适用场景:队列、活动流、最近项
- 头尾操作时间复杂度O(1)
- 命令:LPUSH, RPUSH, LPOP, RPOP, LRANGE
-
集合:无序唯一集合
- 适用场景:标签、唯一访客、关系管理
- 支持集合操作:并集、交集、差集
- 命令:SADD, SMEMBERS, SISMEMBER, SINTER
-
有序集合:带分数的有序集合
- 适用场景:排行榜、时间序列、优先级队列
- 支持按分数或排名范围查询
- 命令:ZADD, ZRANGE, ZRANGEBYSCORE, ZRANK
-
流:追加式日志与消费者组
- 适用场景:事件溯源、活动日志、消息队列
- 内置消费者组支持
- 命令:XADD, XREAD, XREADGROUP
Connection Management
连接管理
Connection Pools:
Redis connections are expensive to create. Always use connection pools:
python
import redis连接池:
Redis连接创建成本较高,应始终使用连接池:
python
import redisConnection pool (recommended)
连接池(推荐)
pool = redis.ConnectionPool(host='localhost', port=6379, db=0, max_connections=10)
r = redis.Redis(connection_pool=pool)
pool = redis.ConnectionPool(host='localhost', port=6379, db=0, max_connections=10)
r = redis.Redis(connection_pool=pool)
Direct connection (avoid in production)
直接连接(生产环境避免使用)
r = redis.Redis(host='localhost', port=6379, db=0)
**Best Practices:**
- Use connection pools for all applications
- Set appropriate max_connections based on workload
- Enable decode_responses=True for string data
- Configure socket_timeout and socket_keepalive
- Handle connection errors with retriesr = redis.Redis(host='localhost', port=6379, db=0)
**最佳实践:**
- 所有应用均使用连接池
- 根据工作负载设置合适的max_connections
- 对字符串数据启用decode_responses=True
- 配置socket_timeout与socket_keepalive
- 处理连接错误时加入重试机制Data Persistence
数据持久化
Redis offers two persistence mechanisms:
RDB (Redis Database): Point-in-time snapshots
- Compact binary format
- Fast restart times
- Lower disk I/O
- Potential data loss between snapshots
AOF (Append-Only File): Log of write operations
- Better durability (fsync policies)
- Larger files, slower restarts
- Can be automatically rewritten/compacted
- Minimal data loss potential
Hybrid Approach: RDB + AOF for best of both worlds
Redis提供两种持久化机制:
RDB(Redis数据库):时间点快照
- 紧凑的二进制格式
- 重启速度快
- 磁盘I/O低
- 快照间隔期间可能丢失数据
AOF(追加文件):写操作日志
- 更好的持久性(支持fsync策略)
- 文件更大,重启速度较慢
- 可自动重写/压缩
- 数据丢失风险极低
混合方式:RDB + AOF兼顾两者优势
RESP 3 Protocol
RESP 3协议
Redis Serialization Protocol version 3 offers:
- Client-side caching support
- Better data type support
- Push notifications
- Performance improvements
python
import redis
from redis.cache import CacheConfigRedis序列化协议版本3提供:
- 客户端缓存支持
- 更好的数据类型支持
- 推送通知
- 性能提升
python
import redis
from redis.cache import CacheConfigEnable RESP3 with client-side caching
启用RESP3与客户端缓存
r = redis.Redis(host='localhost', port=6379, protocol=3,
cache_config=CacheConfig())
undefinedr = redis.Redis(host='localhost', port=6379, protocol=3,
cache_config=CacheConfig())
undefinedCaching Strategies
缓存策略
Cache-Aside (Lazy Loading)
缓存旁路(懒加载)
Pattern: Application checks cache first, loads from database on miss
python
import redis
import json
from typing import Optional, Dict, Any
r = redis.Redis(decode_responses=True)
def get_user(user_id: int) -> Optional[Dict[str, Any]]:
"""Cache-aside pattern for user data."""
cache_key = f"user:{user_id}"
# Try cache first
cached_data = r.get(cache_key)
if cached_data:
return json.loads(cached_data)
# Cache miss - load from database
user_data = database.get_user(user_id) # Your DB query
if user_data:
# Store in cache with 1 hour TTL
r.setex(cache_key, 3600, json.dumps(user_data))
return user_dataAdvantages:
- Only requested data is cached (efficient memory usage)
- Cache failures don't break the application
- Simple to implement
Disadvantages:
- Cache miss penalty (latency spike)
- Thundering herd on popular items
- Stale data until cache expiration
模式:应用先检查缓存,缓存未命中时从数据库加载
python
import redis
import json
from typing import Optional, Dict, Any
r = redis.Redis(decode_responses=True)
def get_user(user_id: int) -> Optional[Dict[str, Any]]:
"""用户数据的缓存旁路模式。"""
cache_key = f"user:{user_id}"
# 先尝试从缓存获取
cached_data = r.get(cache_key)
if cached_data:
return json.loads(cached_data)
# 缓存未命中 - 从数据库加载
user_data = database.get_user(user_id) # 你的数据库查询
if user_data:
# 存入缓存并设置1小时TTL
r.setex(cache_key, 3600, json.dumps(user_data))
return user_data优势:
- 仅缓存请求过的数据(内存使用高效)
- 缓存故障不会导致应用崩溃
- 实现简单
劣势:
- 缓存未命中时存在延迟峰值
- 热门数据可能出现缓存击穿
- 缓存过期前数据可能过期
Write-Through Cache
写穿缓存
Pattern: Write to cache and database simultaneously
python
def update_user(user_id: int, user_data: Dict[str, Any]) -> bool:
"""Write-through pattern for user updates."""
cache_key = f"user:{user_id}"
# Write to database first
success = database.update_user(user_id, user_data)
if success:
# Update cache immediately
r.setex(cache_key, 3600, json.dumps(user_data))
return successAdvantages:
- Cache always consistent with database
- No read penalty for recently written data
Disadvantages:
- Write latency increases
- Unused data may be cached
- Extra cache write overhead
模式:同时写入缓存与数据库
python
def update_user(user_id: int, user_data: Dict[str, Any]) -> bool:
"""用户数据更新的写穿模式。"""
cache_key = f"user:{user_id}"
# 先写入数据库
success = database.update_user(user_id, user_data)
if success:
# 立即更新缓存
r.setex(cache_key, 3600, json.dumps(user_data))
return success优势:
- 缓存始终与数据库保持一致
- 最近写入的数据读取无延迟
劣势:
- 写入延迟增加
- 可能缓存未使用的数据
- 额外的缓存写入开销
Write-Behind (Write-Back) Cache
写回缓存(写后异步同步)
Pattern: Write to cache immediately, sync to database asynchronously
python
import redis
import json
from queue import Queue
from threading import Thread
r = redis.Redis(decode_responses=True)
write_queue = Queue()
def async_writer():
"""Background worker to sync cache to database."""
while True:
user_id, user_data = write_queue.get()
try:
database.update_user(user_id, user_data)
except Exception as e:
# Log error, potentially retry
print(f"Failed to write user {user_id}: {e}")
finally:
write_queue.task_done()模式:立即写入缓存,异步同步到数据库
python
import redis
import json
from queue import Queue
from threading import Thread
r = redis.Redis(decode_responses=True)
write_queue = Queue()
def async_writer():
"""后台工作线程,将缓存同步到数据库。"""
while True:
user_id, user_data = write_queue.get()
try:
database.update_user(user_id, user_data)
except Exception as e:
# 记录错误,可重试
print(f"写入用户 {user_id} 失败: {e}")
finally:
write_queue.task_done()Start background writer
启动后台写入线程
Thread(target=async_writer, daemon=True).start()
def update_user_fast(user_id: int, user_data: Dict[str, Any]):
"""Write-behind pattern for fast writes."""
cache_key = f"user:{user_id}"
# Write to cache immediately (fast)
r.setex(cache_key, 3600, json.dumps(user_data))
# Queue database write (async)
write_queue.put((user_id, user_data))
**Advantages:**
- Minimal write latency
- Can batch database writes
- Handles write spikes
**Disadvantages:**
- Risk of data loss if cache fails
- Complex error handling
- Consistency challengesThread(target=async_writer, daemon=True).start()
def update_user_fast(user_id: int, user_data: Dict[str, Any]):
"""用户数据更新的写回模式(高速写入)。"""
cache_key = f"user:{user_id}"
# 立即写入缓存(速度快)
r.setex(cache_key, 3600, json.dumps(user_data))
# 加入数据库写入队列(异步)
write_queue.put((user_id, user_data))
**优势:**
- 写入延迟极低
- 可批量写入数据库
- 应对写入峰值
**劣势:**
- 缓存故障可能导致数据丢失
- 错误处理复杂
- 一致性挑战Cache Invalidation Strategies
缓存失效策略
Time-based Expiration (TTL):
python
undefined基于时间的过期(TTL):
python
undefinedSet key with expiration
设置带过期时间的键
r.setex("session:abc123", 1800, session_data) # 30 minutes
r.setex("session:abc123", 1800, session_data) # 30分钟
Or set TTL on existing key
或为已有键设置TTL
r.expire("user:profile:123", 3600) # 1 hour
r.expire("user:profile:123", 3600) # 1小时
Check remaining TTL
检查剩余TTL
ttl = r.ttl("user:profile:123")
**Event-based Invalidation:**
```python
def update_product(product_id: int, product_data: dict):
"""Invalidate cache on update."""
# Update database
database.update_product(product_id, product_data)
# Invalidate related caches
r.delete(f"product:{product_id}")
r.delete(f"product_list:category:{product_data['category']}")
r.delete("products:featured")Pattern-based Invalidation:
python
undefinedttl = r.ttl("user:profile:123")
**基于事件的失效:**
```python
def update_product(product_id: int, product_data: dict):
"""更新时失效缓存。"""
# 更新数据库
database.update_product(product_id, product_data)
# 失效相关缓存
r.delete(f"product:{product_id}")
r.delete(f"product_list:category:{product_data['category']}")
r.delete("products:featured")基于模式的失效:
python
undefinedDelete all keys matching pattern
删除所有匹配模式的键
def invalidate_user_cache(user_id: int):
"""Invalidate all cache entries for a user."""
pattern = f"user:{user_id}:*"
# Find and delete matching keys
for key in r.scan_iter(match=pattern, count=100):
r.delete(key)undefineddef invalidate_user_cache(user_id: int):
"""失效用户的所有缓存条目。"""
pattern = f"user:{user_id}:*"
# 查找并删除匹配的键
for key in r.scan_iter(match=pattern, count=100):
r.delete(key)undefinedCache Stampede Prevention
缓存击穿预防
Problem: Multiple requests simultaneously miss cache and query database
Solution 1: Probabilistic Early Expiration
python
import time
import random
def get_with_early_expiration(key: str, ttl: int = 3600, beta: float = 1.0):
"""Prevent stampede with probabilistic early recomputation."""
value = r.get(key)
if value is None:
# Cache miss - compute and cache
value = compute_value(key)
r.setex(key, ttl, value)
return value
# Check if we should recompute early
current_time = time.time()
delta = current_time - float(r.get(f"{key}:timestamp") or 0)
expiry = ttl * random.random() * beta
if delta > expiry:
# Recompute in background
value = compute_value(key)
r.setex(key, ttl, value)
r.set(f"{key}:timestamp", current_time)
return valueSolution 2: Locking
python
from contextlib import contextmanager
@contextmanager
def cache_lock(key: str, timeout: int = 10):
"""Acquire lock for cache computation."""
lock_key = f"{key}:lock"
identifier = str(time.time())
# Try to acquire lock
if r.set(lock_key, identifier, nx=True, ex=timeout):
try:
yield True
finally:
# Release lock
if r.get(lock_key) == identifier:
r.delete(lock_key)
else:
yield False
def get_with_lock(key: str):
"""Use lock to prevent stampede."""
value = r.get(key)
if value is None:
with cache_lock(key) as acquired:
if acquired:
# We got the lock - compute value
value = compute_value(key)
r.setex(key, 3600, value)
else:
# Someone else is computing - wait and retry
time.sleep(0.1)
value = r.get(key) or compute_value(key)
return value问题:多个请求同时缓存未命中,并发查询数据库
解决方案1:概率性提前过期
python
import time
import random
def get_with_early_expiration(key: str, ttl: int = 3600, beta: float = 1.0):
"""通过概率性提前计算预防缓存击穿。"""
value = r.get(key)
if value is None:
# 缓存未命中 - 计算并存入缓存
value = compute_value(key)
r.setex(key, ttl, value)
return value
# 检查是否需要提前重新计算
current_time = time.time()
delta = current_time - float(r.get(f"{key}:timestamp") or 0)
expiry = ttl * random.random() * beta
if delta > expiry:
# 后台重新计算
value = compute_value(key)
r.setex(key, ttl, value)
r.set(f"{key}:timestamp", current_time)
return value解决方案2:加锁
python
from contextlib import contextmanager
@contextmanager
def cache_lock(key: str, timeout: int = 10):
"""为缓存计算获取锁。"""
lock_key = f"{key}:lock"
identifier = str(time.time())
# 尝试获取锁
if r.set(lock_key, identifier, nx=True, ex=timeout):
try:
yield True
finally:
# 释放锁
if r.get(lock_key) == identifier:
r.delete(lock_key)
else:
yield False
def get_with_lock(key: str):
"""使用锁预防缓存击穿。"""
value = r.get(key)
if value is None:
with cache_lock(key) as acquired:
if acquired:
# 获取到锁 - 计算值
value = compute_value(key)
r.setex(key, 3600, value)
else:
# 其他进程正在计算 - 等待后重试
time.sleep(0.1)
value = r.get(key) or compute_value(key)
return valueSession Management
会话管理
Distributed Session Storage
分布式会话存储
Basic Session Management:
python
import redis
import json
import uuid
from datetime import datetime, timedelta
r = redis.Redis(decode_responses=True)
class SessionManager:
def __init__(self, ttl: int = 1800):
"""Session manager with Redis backend.
Args:
ttl: Session timeout in seconds (default 30 minutes)
"""
self.ttl = ttl
def create_session(self, user_id: int, data: dict = None) -> str:
"""Create new session and return session ID."""
session_id = str(uuid.uuid4())
session_key = f"session:{session_id}"
session_data = {
"user_id": user_id,
"created_at": datetime.utcnow().isoformat(),
"data": data or {}
}
r.setex(session_key, self.ttl, json.dumps(session_data))
return session_id
def get_session(self, session_id: str) -> dict:
"""Retrieve session data and refresh TTL."""
session_key = f"session:{session_id}"
session_data = r.get(session_key)
if session_data:
# Refresh TTL on access (sliding expiration)
r.expire(session_key, self.ttl)
return json.loads(session_data)
return None
def update_session(self, session_id: str, data: dict) -> bool:
"""Update session data."""
session_key = f"session:{session_id}"
session_data = self.get_session(session_id)
if session_data:
session_data["data"].update(data)
r.setex(session_key, self.ttl, json.dumps(session_data))
return True
return False
def delete_session(self, session_id: str) -> bool:
"""Delete session (logout)."""
session_key = f"session:{session_id}"
return r.delete(session_key) > 0基础会话管理:
python
import redis
import json
import uuid
from datetime import datetime, timedelta
r = redis.Redis(decode_responses=True)
class SessionManager:
def __init__(self, ttl: int = 1800):
"""基于Redis的会话管理器。
参数:
ttl: 会话超时时间(秒,默认30分钟)
"""
self.ttl = ttl
def create_session(self, user_id: int, data: dict = None) -> str:
"""创建新会话并返回会话ID。"""
session_id = str(uuid.uuid4())
session_key = f"session:{session_id}"
session_data = {
"user_id": user_id,
"created_at": datetime.utcnow().isoformat(),
"data": data or {}
}
r.setex(session_key, self.ttl, json.dumps(session_data))
return session_id
def get_session(self, session_id: str) -> dict:
"""获取会话数据并刷新TTL。"""
session_key = f"session:{session_id}"
session_data = r.get(session_key)
if session_data:
# 访问时刷新TTL(滑动过期)
r.expire(session_key, self.ttl)
return json.loads(session_data)
return None
def update_session(self, session_id: str, data: dict) -> bool:
"""更新会话数据。"""
session_key = f"session:{session_id}"
session_data = self.get_session(session_id)
if session_data:
session_data["data"].update(data)
r.setex(session_key, self.ttl, json.dumps(session_data))
return True
return False
def delete_session(self, session_id: str) -> bool:
"""删除会话(登出)。"""
session_key = f"session:{session_id}"
return r.delete(session_key) > 0Session with Hash Storage
基于哈希的会话存储
More efficient for session objects:
python
class HashSessionManager:
"""Session manager using Redis hashes for better performance."""
def __init__(self, ttl: int = 1800):
self.ttl = ttl
def create_session(self, user_id: int, **kwargs) -> str:
"""Create session using hash."""
session_id = str(uuid.uuid4())
session_key = f"session:{session_id}"
# Store as hash for efficient field access
session_fields = {
"user_id": str(user_id),
"created_at": datetime.utcnow().isoformat(),
**{k: str(v) for k, v in kwargs.items()}
}
r.hset(session_key, mapping=session_fields)
r.expire(session_key, self.ttl)
return session_id
def get_field(self, session_id: str, field: str) -> str:
"""Get single session field efficiently."""
session_key = f"session:{session_id}"
value = r.hget(session_key, field)
if value:
r.expire(session_key, self.ttl) # Refresh TTL
return value
def set_field(self, session_id: str, field: str, value: str) -> bool:
"""Update single session field."""
session_key = f"session:{session_id}"
if r.exists(session_key):
r.hset(session_key, field, value)
r.expire(session_key, self.ttl)
return True
return False
def get_all(self, session_id: str) -> dict:
"""Get all session fields."""
session_key = f"session:{session_id}"
data = r.hgetall(session_key)
if data:
r.expire(session_key, self.ttl)
return data更高效的会话对象存储:
python
class HashSessionManager:
"""使用Redis哈希的会话管理器,性能更优。"""
def __init__(self, ttl: int = 1800):
self.ttl = ttl
def create_session(self, user_id: int, **kwargs) -> str:
"""使用哈希创建会话。"""
session_id = str(uuid.uuid4())
session_key = f"session:{session_id}"
# 以哈希形式存储,支持高效字段访问
session_fields = {
"user_id": str(user_id),
"created_at": datetime.utcnow().isoformat(),
**{k: str(v) for k, v in kwargs.items()}
}
r.hset(session_key, mapping=session_fields)
r.expire(session_key, self.ttl)
return session_id
def get_field(self, session_id: str, field: str) -> str:
"""高效获取单个会话字段。"""
session_key = f"session:{session_id}"
value = r.hget(session_key, field)
if value:
r.expire(session_key, self.ttl) # 刷新TTL
return value
def set_field(self, session_id: str, field: str, value: str) -> bool:
"""更新单个会话字段。"""
session_key = f"session:{session_id}"
if r.exists(session_key):
r.hset(session_key, field, value)
r.expire(session_key, self.ttl)
return True
return False
def get_all(self, session_id: str) -> dict:
"""获取所有会话字段。"""
session_key = f"session:{session_id}"
data = r.hgetall(session_key)
if data:
r.expire(session_key, self.ttl)
return dataUser Activity Tracking
用户活动追踪
python
def track_user_activity(user_id: int, action: str):
"""Track user activity with automatic expiration."""
activity_key = f"user:{user_id}:activity"
timestamp = datetime.utcnow().isoformat()
# Add activity to list
r.lpush(activity_key, json.dumps({"action": action, "timestamp": timestamp}))
# Keep only last 100 activities
r.ltrim(activity_key, 0, 99)
# Set expiration (30 days)
r.expire(activity_key, 2592000)
def get_recent_activity(user_id: int, limit: int = 10) -> list:
"""Get recent user activities."""
activity_key = f"user:{user_id}:activity"
activities = r.lrange(activity_key, 0, limit - 1)
return [json.loads(a) for a in activities]python
def track_user_activity(user_id: int, action: str):
"""追踪用户活动,带自动过期。"""
activity_key = f"user:{user_id}:activity"
timestamp = datetime.utcnow().isoformat()
# 将活动加入列表
r.lpush(activity_key, json.dumps({"action": action, "timestamp": timestamp}))
# 仅保留最近100条活动记录
r.ltrim(activity_key, 0, 99)
# 设置过期时间(30天)
r.expire(activity_key, 2592000)
def get_recent_activity(user_id: int, limit: int = 10) -> list:
"""获取用户最近的活动记录。"""
activity_key = f"user:{user_id}:activity"
activities = r.lrange(activity_key, 0, limit - 1)
return [json.loads(a) for a in activities]Pub/Sub Patterns
发布/订阅模式
Basic Publisher/Subscriber
基础发布者/订阅者
Publisher:
python
import redis
r = redis.Redis(decode_responses=True)
def publish_event(channel: str, message: dict):
"""Publish event to channel."""
import json
r.publish(channel, json.dumps(message))发布者:
python
import redis
r = redis.Redis(decode_responses=True)
def publish_event(channel: str, message: dict):
"""向频道发布事件。"""
import json
r.publish(channel, json.dumps(message))Example usage
使用示例
publish_event("notifications", {
"type": "user_signup",
"user_id": 12345,
"timestamp": datetime.utcnow().isoformat()
})
**Subscriber:**
```python
import redis
import json
def handle_message(message):
"""Process received message."""
data = json.loads(message['data'])
print(f"Received: {data}")publish_event("notifications", {
"type": "user_signup",
"user_id": 12345,
"timestamp": datetime.utcnow().isoformat()
})
**订阅者:**
```python
import redis
import json
def handle_message(message):
"""处理接收到的消息。"""
data = json.loads(message['data'])
print(f"收到消息: {data}")Initialize pubsub
初始化pubsub
r = redis.Redis(decode_responses=True)
p = r.pubsub()
r = redis.Redis(decode_responses=True)
p = r.pubsub()
Subscribe to channels
订阅频道
p.subscribe('notifications', 'alerts')
p.subscribe('notifications', 'alerts')
Listen for messages
监听消息
for message in p.listen():
if message['type'] == 'message':
handle_message(message)
undefinedfor message in p.listen():
if message['type'] == 'message':
handle_message(message)
undefinedPattern-Based Subscriptions
基于模式的订阅
python
undefinedpython
undefinedSubscribe to multiple channels with patterns
订阅多个匹配模式的频道
p = r.pubsub()
p.psubscribe('user:', 'notification:')
p = r.pubsub()
p.psubscribe('user:', 'notification:')
Get messages from pattern subscriptions
获取模式订阅的消息
for message in p.listen():
if message['type'] == 'pmessage':
channel = message['channel']
pattern = message['pattern']
data = message['data']
print(f"Pattern {pattern} matched {channel}: {data}")
undefinedfor message in p.listen():
if message['type'] == 'pmessage':
channel = message['channel']
pattern = message['pattern']
data = message['data']
print(f"模式 {pattern} 匹配频道 {channel}: {data}")
undefinedAsync Pub/Sub with Background Thread
带后台线程的异步发布/订阅
python
import redis
import time
r = redis.Redis(decode_responses=True)
p = r.pubsub()
def message_handler(message):
"""Handle messages in background thread."""
print(f"Handler received: {message['data']}")python
import redis
import time
r = redis.Redis(decode_responses=True)
p = r.pubsub()
def message_handler(message):
"""在后台线程中处理消息。"""
print(f"处理器收到消息: {message['data']}")Subscribe with handler
订阅并指定处理器
p.subscribe(**{'notifications': message_handler, 'alerts': message_handler})
p.subscribe(**{'notifications': message_handler, 'alerts': message_handler})
Run in background thread
在后台线程运行
thread = p.run_in_thread(sleep_time=0.001)
thread = p.run_in_thread(sleep_time=0.001)
Publish some messages
发布一些消息
r.publish('notifications', 'Hello!')
r.publish('alerts', 'Warning!')
time.sleep(1)
r.publish('notifications', '你好!')
r.publish('alerts', '警告!')
time.sleep(1)
Stop background thread
停止后台线程
thread.stop()
undefinedthread.stop()
undefinedAsync Pub/Sub with asyncio
基于asyncio的异步发布/订阅
python
import asyncio
import redis.asyncio as redis
async def reader(channel: redis.client.PubSub):
"""Async message reader."""
while True:
message = await channel.get_message(ignore_subscribe_messages=True, timeout=None)
if message is not None:
print(f"Received: {message}")
# Stop on specific message
if message["data"].decode() == "STOP":
break
async def pubsub_example():
"""Async pub/sub example."""
r = await redis.from_url("redis://localhost")
async with r.pubsub() as pubsub:
# Subscribe to channels
await pubsub.subscribe("channel:1", "channel:2")
# Create reader task
reader_task = asyncio.create_task(reader(pubsub))
# Publish messages
await r.publish("channel:1", "Hello")
await r.publish("channel:2", "World")
await r.publish("channel:1", "STOP")
# Wait for reader to finish
await reader_task
await r.close()python
import asyncio
import redis.asyncio as redis
async def reader(channel: redis.client.PubSub):
"""异步消息读取器。"""
while True:
message = await channel.get_message(ignore_subscribe_messages=True, timeout=None)
if message is not None:
print(f"收到消息: {message}")
# 收到特定消息时停止
if message["data"].decode() == "STOP":
break
async def pubsub_example():
"""异步发布/订阅示例。"""
r = await redis.from_url("redis://localhost")
async with r.pubsub() as pubsub:
# 订阅频道
await pubsub.subscribe("channel:1", "channel:2")
# 创建读取任务
reader_task = asyncio.create_task(reader(pubsub))
# 发布消息
await r.publish("channel:1", "你好")
await r.publish("channel:2", "世界")
await r.publish("channel:1", "STOP")
# 等待读取器完成
await reader_task
await r.close()Run async example
运行异步示例
asyncio.run(pubsub_example())
undefinedasyncio.run(pubsub_example())
undefinedSharded Pub/Sub (Redis 7.0+)
分片发布/订阅(Redis 7.0+)
python
from redis.cluster import RedisCluster, ClusterNodepython
from redis.cluster import RedisCluster, ClusterNodeConnect to cluster
连接集群
rc = RedisCluster(startup_nodes=[
ClusterNode('localhost', 6379),
ClusterNode('localhost', 6380)
])
rc = RedisCluster(startup_nodes=[
ClusterNode('localhost', 6379),
ClusterNode('localhost', 6380)
])
Create sharded pubsub
创建分片pubsub
p = rc.pubsub()
p.ssubscribe('foo')
p = rc.pubsub()
p.ssubscribe('foo')
Get message from specific node
从特定节点获取消息
message = p.get_sharded_message(target_node=ClusterNode('localhost', 6379))
undefinedmessage = p.get_sharded_message(target_node=ClusterNode('localhost', 6379))
undefinedDistributed Locks
分布式锁
Simple Lock Implementation
简单锁实现
python
import redis
import time
import uuid
class RedisLock:
"""Simple distributed lock using Redis."""
def __init__(self, redis_client: redis.Redis, key: str, timeout: int = 10):
self.redis = redis_client
self.key = f"lock:{key}"
self.timeout = timeout
self.identifier = str(uuid.uuid4())
def acquire(self, blocking: bool = True, timeout: float = None) -> bool:
"""Acquire lock."""
end_time = time.time() + (timeout or self.timeout)
while True:
# Try to set lock with NX (only if not exists) and EX (expiration)
if self.redis.set(self.key, self.identifier, nx=True, ex=self.timeout):
return True
if not blocking:
return False
if timeout and time.time() > end_time:
return False
# Wait before retry
time.sleep(0.01)
def release(self) -> bool:
"""Release lock only if we own it."""
# Use Lua script for atomic check-and-delete
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
result = self.redis.eval(lua_script, 1, self.key, self.identifier)
return result == 1
def __enter__(self):
"""Context manager support."""
self.acquire()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager cleanup."""
self.release()python
import redis
import time
import uuid
class RedisLock:
"""基于Redis的简单分布式锁。"""
def __init__(self, redis_client: redis.Redis, key: str, timeout: int = 10):
self.redis = redis_client
self.key = f"lock:{key}"
self.timeout = timeout
self.identifier = str(uuid.uuid4())
def acquire(self, blocking: bool = True, timeout: float = None) -> bool:
"""获取锁。"""
end_time = time.time() + (timeout or self.timeout)
while True:
# 尝试设置锁,使用NX(仅当键不存在时)和EX(过期时间)
if self.redis.set(self.key, self.identifier, nx=True, ex=self.timeout):
return True
if not blocking:
return False
if timeout and time.time() > end_time:
return False
# 重试前等待
time.sleep(0.01)
def release(self) -> bool:
"""仅释放自己持有的锁。"""
# 使用Lua脚本实现原子性检查与删除
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
result = self.redis.eval(lua_script, 1, self.key, self.identifier)
return result == 1
def __enter__(self):
"""上下文管理器支持。"""
self.acquire()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""上下文管理器清理。"""
self.release()Usage example
使用示例
r = redis.Redis()
lock = RedisLock(r, "resource:123", timeout=5)
with lock:
# Critical section - only one process at a time
print("Processing resource 123")
process_resource()
undefinedr = redis.Redis()
lock = RedisLock(r, "resource:123", timeout=5)
with lock:
# 临界区 - 同一时间仅一个进程执行
print("处理资源123")
process_resource()
undefinedAdvanced Lock with Auto-Renewal
带自动续约的高级锁
python
import threading
class RenewableLock:
"""Distributed lock with automatic renewal."""
def __init__(self, redis_client: redis.Redis, key: str, timeout: int = 10):
self.redis = redis_client
self.key = f"lock:{key}"
self.timeout = timeout
self.identifier = str(uuid.uuid4())
self.renewal_thread = None
self.stop_renewal = threading.Event()
def _renew_lock(self):
"""Background task to renew lock."""
while not self.stop_renewal.is_set():
time.sleep(self.timeout / 3) # Renew at 1/3 of timeout
# Renew only if we still own the lock
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("expire", KEYS[1], ARGV[2])
else
return 0
end
"""
result = self.redis.eval(lua_script, 1, self.key,
self.identifier, self.timeout)
if result == 0:
# We lost the lock
self.stop_renewal.set()
def acquire(self, blocking: bool = True, timeout: float = None) -> bool:
"""Acquire lock and start auto-renewal."""
if self.redis.set(self.key, self.identifier, nx=True, ex=self.timeout):
# Start renewal thread
self.stop_renewal.clear()
self.renewal_thread = threading.Thread(target=self._renew_lock, daemon=True)
self.renewal_thread.start()
return True
return False
def release(self) -> bool:
"""Release lock and stop renewal."""
self.stop_renewal.set()
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
result = self.redis.eval(lua_script, 1, self.key, self.identifier)
return result == 1python
import threading
class RenewableLock:
"""带自动续约的分布式锁。"""
def __init__(self, redis_client: redis.Redis, key: str, timeout: int = 10):
self.redis = redis_client
self.key = f"lock:{key}"
self.timeout = timeout
self.identifier = str(uuid.uuid4())
self.renewal_thread = None
self.stop_renewal = threading.Event()
def _renew_lock(self):
"""后台任务,自动续约锁。"""
while not self.stop_renewal.is_set():
time.sleep(self.timeout / 3) # 每1/3超时时间续约一次
# 仅当锁仍属于自己时续约
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("expire", KEYS[1], ARGV[2])
else
return 0
end
"""
result = self.redis.eval(lua_script, 1, self.key,
self.identifier, self.timeout)
if result == 0:
# 锁已丢失
self.stop_renewal.set()
def acquire(self, blocking: bool = True, timeout: float = None) -> bool:
"""获取锁并启动自动续约。"""
if self.redis.set(self.key, self.identifier, nx=True, ex=self.timeout):
# 启动续约线程
self.stop_renewal.clear()
self.renewal_thread = threading.Thread(target=self._renew_lock, daemon=True)
self.renewal_thread.start()
return True
return False
def release(self) -> bool:
"""释放锁并停止续约。"""
self.stop_renewal.set()
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
result = self.redis.eval(lua_script, 1, self.key, self.identifier)
return result == 1Redlock Algorithm (Multiple Redis Instances)
Redlock算法(多Redis实例)
python
class Redlock:
"""Redlock algorithm for distributed locking across multiple Redis instances."""
def __init__(self, redis_instances: list):
"""
Args:
redis_instances: List of Redis client connections
"""
self.instances = redis_instances
self.quorum = len(redis_instances) // 2 + 1
def acquire(self, resource: str, ttl: int = 10000) -> tuple:
"""
Acquire lock across multiple Redis instances.
Returns:
(success: bool, lock_identifier: str)
"""
identifier = str(uuid.uuid4())
start_time = int(time.time() * 1000)
# Try to acquire lock on all instances
acquired = 0
for instance in self.instances:
try:
if instance.set(f"lock:{resource}", identifier,
nx=True, px=ttl):
acquired += 1
except Exception:
pass
# Calculate elapsed time
elapsed = int(time.time() * 1000) - start_time
validity_time = ttl - elapsed - 100 # drift compensation
# Check if we got quorum
if acquired >= self.quorum and validity_time > 0:
return True, identifier
else:
# Release locks if we didn't get quorum
self._release_all(resource, identifier)
return False, None
def _release_all(self, resource: str, identifier: str):
"""Release lock on all instances."""
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
for instance in self.instances:
try:
instance.eval(lua_script, 1, f"lock:{resource}", identifier)
except Exception:
passpython
class Redlock:
"""跨多Redis实例的分布式锁Redlock算法。"""
def __init__(self, redis_instances: list):
"""
参数:
redis_instances: Redis客户端连接列表
"""
self.instances = redis_instances
self.quorum = len(redis_instances) // 2 + 1
def acquire(self, resource: str, ttl: int = 10000) -> tuple:
"""
跨多Redis实例获取锁。
返回:
(success: bool, lock_identifier: str)
"""
identifier = str(uuid.uuid4())
start_time = int(time.time() * 1000)
# 尝试在所有实例上获取锁
acquired = 0
for instance in self.instances:
try:
if instance.set(f"lock:{resource}", identifier,
nx=True, px=ttl):
acquired += 1
except Exception:
pass
# 计算耗时
elapsed = int(time.time() * 1000) - start_time
validity_time = ttl - elapsed - 100 # 补偿时钟漂移
# 检查是否达到法定票数
if acquired >= self.quorum and validity_time > 0:
return True, identifier
else:
# 未达到法定票数,释放已获取的锁
self._release_all(resource, identifier)
return False, None
def _release_all(self, resource: str, identifier: str):
"""在所有实例上释放锁。"""
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
for instance in self.instances:
try:
instance.eval(lua_script, 1, f"lock:{resource}", identifier)
except Exception:
passData Structures and Operations
数据结构与操作
Working with Hashes
哈希的使用
python
undefinedpython
undefinedUser profile storage
用户资料存储
def save_user_profile(user_id: int, profile: dict):
"""Save user profile as hash."""
key = f"user:profile:{user_id}"
r.hset(key, mapping=profile)
r.expire(key, 86400) # 24 hour TTL
def get_user_profile(user_id: int) -> dict:
"""Get complete user profile."""
key = f"user:profile:{user_id}"
return r.hgetall(key)
def update_user_field(user_id: int, field: str, value: str):
"""Update single profile field."""
key = f"user:profile:{user_id}"
r.hset(key, field, value)
def save_user_profile(user_id: int, profile: dict):
"""将用户资料存储为哈希。"""
key = f"user:profile:{user_id}"
r.hset(key, mapping=profile)
r.expire(key, 86400) # 24小时TTL
def get_user_profile(user_id: int) -> dict:
"""获取完整的用户资料。"""
key = f"user:profile:{user_id}"
return r.hgetall(key)
def update_user_field(user_id: int, field: str, value: str):
"""更新单个资料字段。"""
key = f"user:profile:{user_id}"
r.hset(key, field, value)
Example usage
使用示例
save_user_profile(123, {
"username": "alice",
"email": "alice@example.com",
"age": "30"
})
save_user_profile(123, {
"username": "alice",
"email": "alice@example.com",
"age": "30"
})
Atomic increment
原子递增
r.hincrby("user:profile:123", "login_count", 1)
undefinedr.hincrby("user:profile:123", "login_count", 1)
undefinedWorking with Lists
列表的使用
python
undefinedpython
undefinedJob queue implementation
任务队列实现
def enqueue_job(queue_name: str, job_data: dict):
"""Add job to queue."""
key = f"queue:{queue_name}"
r.rpush(key, json.dumps(job_data))
def dequeue_job(queue_name: str, timeout: int = 0) -> dict:
"""Get job from queue (blocking)."""
key = f"queue:{queue_name}"
if timeout > 0:
# Blocking pop with timeout
result = r.blpop(key, timeout=timeout)
if result:
_, job_data = result
return json.loads(job_data)
else:
# Non-blocking pop
job_data = r.lpop(key)
if job_data:
return json.loads(job_data)
return Nonedef enqueue_job(queue_name: str, job_data: dict):
"""将任务加入队列。"""
key = f"queue:{queue_name}"
r.rpush(key, json.dumps(job_data))
def dequeue_job(queue_name: str, timeout: int = 0) -> dict:
"""从队列获取任务(支持阻塞)。"""
key = f"queue:{queue_name}"
if timeout > 0:
# 阻塞式弹出,带超时
result = r.blpop(key, timeout=timeout)
if result:
_, job_data = result
return json.loads(job_data)
else:
# 非阻塞式弹出
job_data = r.lpop(key)
if job_data:
return json.loads(job_data)
return NoneActivity feed
活动流
def add_to_feed(user_id: int, activity: dict):
"""Add activity to user feed."""
key = f"feed:{user_id}"
r.lpush(key, json.dumps(activity))
r.ltrim(key, 0, 99) # Keep only latest 100 items
r.expire(key, 604800) # 7 days
def get_feed(user_id: int, start: int = 0, end: int = 19) -> list:
"""Get user feed with pagination."""
key = f"feed:{user_id}"
items = r.lrange(key, start, end)
return [json.loads(item) for item in items]
undefineddef add_to_feed(user_id: int, activity: dict):
"""将活动加入用户流。"""
key = f"feed:{user_id}"
r.lpush(key, json.dumps(activity))
r.ltrim(key, 0, 99) # 仅保留最近100条
r.expire(key, 604800) # 7天
def get_feed(user_id: int, start: int = 0, end: int = 19) -> list:
"""分页获取用户活动流。"""
key = f"feed:{user_id}"
items = r.lrange(key, start, end)
return [json.loads(item) for item in items]
undefinedWorking with Sets
集合的使用
python
undefinedpython
undefinedTags and relationships
标签与关系管理
def add_tags(item_id: int, tags: list):
"""Add tags to item."""
key = f"item:{item_id}:tags"
r.sadd(key, *tags)
def get_tags(item_id: int) -> set:
"""Get all tags for item."""
key = f"item:{item_id}:tags"
return r.smembers(key)
def find_items_with_all_tags(tags: list) -> set:
"""Find items having all specified tags."""
keys = [f"item:*:tags" for _ in tags]
# This is simplified - in practice, you'd need to track item IDs differently
return r.sinter(*keys)
def add_tags(item_id: int, tags: list):
"""为项目添加标签。"""
key = f"item:{item_id}:tags"
r.sadd(key, *tags)
def get_tags(item_id: int) -> set:
"""获取项目的所有标签。"""
key = f"item:{item_id}:tags"
return r.smembers(key)
def find_items_with_all_tags(tags: list) -> set:
"""查找包含所有指定标签的项目。"""
keys = [f"item:*:tags" for _ in tags]
# 此处为简化实现,实际中需以不同方式跟踪项目ID
return r.sinter(*keys)
Online users tracking
在线用户追踪
def user_online(user_id: int):
"""Mark user as online."""
r.sadd("users:online", user_id)
r.expire(f"user:{user_id}:heartbeat", 60)
def user_offline(user_id: int):
"""Mark user as offline."""
r.srem("users:online", user_id)
def get_online_users() -> set:
"""Get all online users."""
return r.smembers("users:online")
def get_online_count() -> int:
"""Get count of online users."""
return r.scard("users:online")
undefineddef user_online(user_id: int):
"""标记用户为在线。"""
r.sadd("users:online", user_id)
r.expire(f"user:{user_id}:heartbeat", 60)
def user_offline(user_id: int):
"""标记用户为离线。"""
r.srem("users:online", user_id)
def get_online_users() -> set:
"""获取所有在线用户。"""
return r.smembers("users:online")
def get_online_count() -> int:
"""获取在线用户数量。"""
return r.scard("users:online")
undefinedWorking with Sorted Sets
有序集合的使用
python
undefinedpython
undefinedLeaderboard implementation
排行榜实现
def update_score(leaderboard: str, user_id: int, score: float):
"""Update user score in leaderboard."""
key = f"leaderboard:{leaderboard}"
r.zadd(key, {user_id: score})
def get_leaderboard(leaderboard: str, start: int = 0, end: int = 9) -> list:
"""Get top players (descending order)."""
key = f"leaderboard:{leaderboard}"
# ZREVRANGE for descending order (highest scores first)
return r.zrevrange(key, start, end, withscores=True)
def get_user_rank(leaderboard: str, user_id: int) -> int:
"""Get user's rank (0-indexed)."""
key = f"leaderboard:{leaderboard}"
# ZREVRANK for descending rank
rank = r.zrevrank(key, user_id)
return rank if rank is not None else -1
def get_user_score(leaderboard: str, user_id: int) -> float:
"""Get user's score."""
key = f"leaderboard:{leaderboard}"
score = r.zscore(key, user_id)
return score if score is not None else 0.0
def get_score_range(leaderboard: str, min_score: float, max_score: float) -> list:
"""Get users within score range."""
key = f"leaderboard:{leaderboard}"
return r.zrangebyscore(key, min_score, max_score, withscores=True)
def update_score(leaderboard: str, user_id: int, score: float):
"""更新用户在排行榜中的分数。"""
key = f"leaderboard:{leaderboard}"
r.zadd(key, {user_id: score})
def get_leaderboard(leaderboard: str, start: int = 0, end: int = 9) -> list:
"""获取排行榜前N名(降序)。"""
key = f"leaderboard:{leaderboard}"
# ZREVRANGE用于降序排列(分数从高到低)
return r.zrevrange(key, start, end, withscores=True)
def get_user_rank(leaderboard: str, user_id: int) -> int:
"""获取用户排名(0起始)。"""
key = f"leaderboard:{leaderboard}"
# ZREVRANK用于降序排名
rank = r.zrevrank(key, user_id)
return rank if rank is not None else -1
def get_user_score(leaderboard: str, user_id: int) -> float:
"""获取用户分数。"""
key = f"leaderboard:{leaderboard}"
score = r.zscore(key, user_id)
return score if score is not None else 0.0
def get_score_range(leaderboard: str, min_score: float, max_score: float) -> list:
"""获取分数范围内的用户。"""
key = f"leaderboard:{leaderboard}"
return r.zrangebyscore(key, min_score, max_score, withscores=True)
Time-based sorted set (activity stream)
基于时间的有序集合(活动流)
def add_activity(user_id: int, activity: str):
"""Add timestamped activity."""
key = f"user:{user_id}:activities"
timestamp = time.time()
r.zadd(key, {activity: timestamp})
# Keep only last 24 hours
cutoff = timestamp - 86400
r.zremrangebyscore(key, '-inf', cutoff)def get_recent_activities(user_id: int, count: int = 10) -> list:
"""Get recent activities."""
key = f"user:{user_id}:activities"
# Get most recent (highest timestamps)
return r.zrevrange(key, 0, count - 1, withscores=True)
undefineddef add_activity(user_id: int, activity: str):
"""添加带时间戳的活动。"""
key = f"user:{user_id}:activities"
timestamp = time.time()
r.zadd(key, {activity: timestamp})
# 仅保留最近24小时的活动
cutoff = timestamp - 86400
r.zremrangebyscore(key, '-inf', cutoff)def get_recent_activities(user_id: int, count: int = 10) -> list:
"""获取最近的活动。"""
key = f"user:{user_id}:activities"
# 获取最近的活动(时间戳最高)
return r.zrevrange(key, 0, count - 1, withscores=True)
undefinedWorking with Streams
流的使用
python
undefinedpython
undefinedEvent stream
事件流
def add_event(stream_key: str, event_data: dict) -> str:
"""Add event to stream."""
# Returns auto-generated ID (timestamp-sequence)
event_id = r.xadd(stream_key, event_data)
return event_id
def read_events(stream_key: str, count: int = 10, start_id: str = '0') -> list:
"""Read events from stream."""
events = r.xread({stream_key: start_id}, count=count)
# events format: [(stream_name, [(id, data), (id, data), ...])]
if events:
_, event_list = events[0]
return event_list
return []def add_event(stream_key: str, event_data: dict) -> str:
"""向流中添加事件。"""
# 返回自动生成的ID(时间戳-序列号)
event_id = r.xadd(stream_key, event_data)
return event_id
def read_events(stream_key: str, count: int = 10, start_id: str = '0') -> list:
"""从流中读取事件。"""
events = r.xread({stream_key: start_id}, count=count)
# events格式: [(流名称, [(ID, 数据), (ID, 数据), ...])]
if events:
_, event_list = events[0]
return event_list
return []Consumer groups
消费者组
def create_consumer_group(stream_key: str, group_name: str):
"""Create consumer group for stream."""
try:
r.xgroup_create(name=stream_key, groupname=group_name, id='0')
except redis.ResponseError as e:
if "BUSYGROUP" not in str(e):
raise
def read_from_group(stream_key: str, group_name: str,
consumer_name: str, count: int = 10) -> list:
"""Read events as consumer in group."""
# Read new messages with '>'
events = r.xreadgroup(
groupname=group_name,
consumername=consumer_name,
streams={stream_key: '>'},
count=count,
block=5000 # 5 second timeout
)
if events:
_, event_list = events[0]
return event_list
return []def acknowledge_event(stream_key: str, group_name: str, event_id: str):
"""Acknowledge processed event."""
r.xack(stream_key, group_name, event_id)
def create_consumer_group(stream_key: str, group_name: str):
"""为流创建消费者组。"""
try:
r.xgroup_create(name=stream_key, groupname=group_name, id='0')
except redis.ResponseError as e:
if "BUSYGROUP" not in str(e):
raise
def read_from_group(stream_key: str, group_name: str,
consumer_name: str, count: int = 10) -> list:
"""以组内消费者身份读取事件。"""
# 使用'>'读取新消息
events = r.xreadgroup(
groupname=group_name,
consumername=consumer_name,
streams={stream_key: '>'},
count=count,
block=5000 # 5秒超时
)
if events:
_, event_list = events[0]
return event_list
return []def acknowledge_event(stream_key: str, group_name: str, event_id: str):
"""确认已处理的事件。"""
r.xack(stream_key, group_name, event_id)
Example: Processing events with consumer group
示例:使用消费者组处理事件
def process_events(stream_key: str, group_name: str, consumer_name: str):
"""Process events from stream."""
create_consumer_group(stream_key, group_name)
while True:
events = read_from_group(stream_key, group_name, consumer_name, count=10)
for event_id, event_data in events:
try:
# Process event
process_event(event_data)
# Acknowledge successful processing
acknowledge_event(stream_key, group_name, event_id)
except Exception as e:
print(f"Failed to process event {event_id}: {e}")
# Event remains unacknowledged for retryundefineddef process_events(stream_key: str, group_name: str, consumer_name: str):
"""处理流中的事件。"""
create_consumer_group(stream_key, group_name)
while True:
events = read_from_group(stream_key, group_name, consumer_name, count=10)
for event_id, event_data in events:
try:
# 处理事件
process_event(event_data)
# 确认处理成功
acknowledge_event(stream_key, group_name, event_id)
except Exception as e:
print(f"处理事件 {event_id} 失败: {e}")
# 事件保持未确认状态,以便重试undefinedPerformance Optimization
性能优化
Pipelining for Batch Operations
批量操作流水线
python
undefinedpython
undefinedWithout pipelining (slow - multiple round trips)
不使用流水线(慢 - 多次往返)
for i in range(1000):
r.set(f"key:{i}", f"value:{i}")
for i in range(1000):
r.set(f"key:{i}", f"value:{i}")
With pipelining (fast - single round trip)
使用流水线(快 - 单次往返)
pipe = r.pipeline()
for i in range(1000):
pipe.set(f"key:{i}", f"value:{i}")
results = pipe.execute()
pipe = r.pipeline()
for i in range(1000):
pipe.set(f"key:{i}", f"value:{i}")
results = pipe.execute()
Pipelining with reads
带读取操作的流水线
pipe = r.pipeline()
for i in range(100):
pipe.get(f"key:{i}")
values = pipe.execute()
pipe = r.pipeline()
for i in range(100):
pipe.get(f"key:{i}")
values = pipe.execute()
Builder pattern with pipeline
流水线构建器模式
class DataLoader:
def init(self):
self.pipeline = r.pipeline()
def add_user(self, user_id: int, user_data: dict):
"""Add user data."""
self.pipeline.hset(f"user:{user_id}", mapping=user_data)
return self
def add_to_set(self, set_name: str, value: str):
"""Add to set."""
self.pipeline.sadd(set_name, value)
return self
def execute(self):
"""Execute all pipelined commands."""
return self.pipeline.execute()class DataLoader:
def init(self):
self.pipeline = r.pipeline()
def add_user(self, user_id: int, user_data: dict):
"""添加用户数据。"""
self.pipeline.hset(f"user:{user_id}", mapping=user_data)
return self
def add_to_set(self, set_name: str, value: str):
"""添加到集合。"""
self.pipeline.sadd(set_name, value)
return self
def execute(self):
"""执行所有流水线命令。"""
return self.pipeline.execute()Usage
使用
loader = DataLoader()
results = (loader
.add_user(1, {"name": "Alice", "email": "alice@example.com"})
.add_user(2, {"name": "Bob", "email": "bob@example.com"})
.add_to_set("active_users", "1")
.add_to_set("active_users", "2")
.execute())
undefinedloader = DataLoader()
results = (loader
.add_user(1, {"name": "Alice", "email": "alice@example.com"})
.add_user(2, {"name": "Bob", "email": "bob@example.com"})
.add_to_set("active_users", "1")
.add_to_set("active_users", "2")
.execute())
undefinedTransactions with WATCH
带WATCH的事务
python
undefinedpython
undefinedOptimistic locking with WATCH
基于WATCH的乐观锁
def transfer_credits(from_user: int, to_user: int, amount: int) -> bool:
"""Transfer credits between users with optimistic locking."""
with r.pipeline() as pipe:
while True:
try:
# Watch the keys we're going to modify
pipe.watch(f"user:{from_user}:credits", f"user:{to_user}:credits")
# Get current values
from_credits = int(pipe.get(f"user:{from_user}:credits") or 0)
to_credits = int(pipe.get(f"user:{to_user}:credits") or 0)
# Check if transfer is possible
if from_credits < amount:
pipe.unwatch()
return False
# Start transaction
pipe.multi()
pipe.set(f"user:{from_user}:credits", from_credits - amount)
pipe.set(f"user:{to_user}:credits", to_credits + amount)
# Execute transaction
pipe.execute()
return True
except redis.WatchError:
# Key was modified by another client - retry
continuedef transfer_credits(from_user: int, to_user: int, amount: int) -> bool:
"""使用乐观锁在用户间转移积分。"""
with r.pipeline() as pipe:
while True:
try:
# 监视要修改的键
pipe.watch(f"user:{from_user}:credits", f"user:{to_user}:credits")
# 获取当前值
from_credits = int(pipe.get(f"user:{from_user}:credits") or 0)
to_credits = int(pipe.get(f"user:{to_user}:credits") or 0)
# 检查是否可以转移
if from_credits < amount:
pipe.unwatch()
return False
# 开始事务
pipe.multi()
pipe.set(f"user:{from_user}:credits", from_credits - amount)
pipe.set(f"user:{to_user}:credits", to_credits + amount)
# 执行事务
pipe.execute()
return True
except redis.WatchError:
# 键被其他客户端修改 - 重试
continueLua scripts for atomic operations
用于原子操作的Lua脚本
increment_script = """
local current = redis.call('GET', KEYS[1])
if not current then
current = 0
end
local new_val = tonumber(current) + tonumber(ARGV[1])
redis.call('SET', KEYS[1], new_val)
return new_val
"""
increment_script = """
local current = redis.call('GET', KEYS[1])
if not current then
current = 0
end
local new_val = tonumber(current) + tonumber(ARGV[1])
redis.call('SET', KEYS[1], new_val)
return new_val
"""
Register and use Lua script
注册并使用Lua脚本
increment = r.register_script(increment_script)
new_value = increment(keys=['counter:views'], args=[1])
undefinedincrement = r.register_script(increment_script)
new_value = increment(keys=['counter:views'], args=[1])
undefinedLua Scripts for Complex Operations
复杂操作的Lua脚本
python
undefinedpython
undefinedRate limiting with Lua
基于Lua的速率限制
rate_limit_script = """
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local current = redis.call('INCR', key)
if current == 1 then
redis.call('EXPIRE', key, window)
end
if current > limit then
return 0
else
return 1
end
"""
rate_limiter = r.register_script(rate_limit_script)
def is_allowed(user_id: int, limit: int = 100, window: int = 60) -> bool:
"""Check if user is within rate limit."""
key = f"rate_limit:{user_id}"
result = rate_limiter(keys=[key], args=[limit, window])
return result == 1
rate_limit_script = """
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local current = redis.call('INCR', key)
if current == 1 then
redis.call('EXPIRE', key, window)
end
if current > limit then
return 0
else
return 1
end
"""
rate_limiter = r.register_script(rate_limit_script)
def is_allowed(user_id: int, limit: int = 100, window: int = 60) -> bool:
"""检查用户是否在速率限制内。"""
key = f"rate_limit:{user_id}"
result = rate_limiter(keys=[key], args=[limit, window])
return result == 1
Get-or-set pattern with Lua
基于Lua的获取或设置模式
get_or_set_script = """
local value = redis.call('GET', KEYS[1])
if value then
return value
else
redis.call('SET', KEYS[1], ARGV[1])
redis.call('EXPIRE', KEYS[1], ARGV[2])
return ARGV[1]
end
"""
get_or_set = r.register_script(get_or_set_script)
def get_or_compute(key: str, compute_fn, ttl: int = 3600):
"""Get value from cache or compute and cache it."""
value = get_or_set(keys=[key], args=["COMPUTING", ttl])
if value == "__COMPUTING__":
# We set the placeholder - compute the real value
computed = compute_fn()
r.setex(key, ttl, computed)
return computed
return valueundefinedget_or_set_script = """
local value = redis.call('GET', KEYS[1])
if value then
return value
else
redis.call('SET', KEYS[1], ARGV[1])
redis.call('EXPIRE', KEYS[1], ARGV[2])
return ARGV[1]
end
"""
get_or_set = r.register_script(get_or_set_script)
def get_or_compute(key: str, compute_fn, ttl: int = 3600):
"""从缓存获取值,或计算并存入缓存。"""
value = get_or_set(keys=[key], args=["COMPUTING", ttl])
if value == "__COMPUTING__":
# 我们设置了占位符 - 计算实际值
computed = compute_fn()
r.setex(key, ttl, computed)
return computed
return valueundefinedProduction Patterns
生产模式
High Availability with Sentinel
基于Sentinel的高可用
python
from redis.sentinel import Sentinelpython
from redis.sentinel import SentinelConnect to Sentinel
连接到Sentinel
sentinel = Sentinel([
('sentinel1', 26379),
('sentinel2', 26379),
('sentinel3', 26379)
], socket_timeout=0.5)
sentinel = Sentinel([
('sentinel1', 26379),
('sentinel2', 26379),
('sentinel3', 26379)
], socket_timeout=0.5)
Get master connection
获取主节点连接
master = sentinel.master_for('mymaster', socket_timeout=0.5)
master = sentinel.master_for('mymaster', socket_timeout=0.5)
Get replica connection (for read-only operations)
获取从节点连接(用于只读操作)
replica = sentinel.slave_for('mymaster', socket_timeout=0.5)
replica = sentinel.slave_for('mymaster', socket_timeout=0.5)
Use master for writes
使用主节点进行写入
master.set('key', 'value')
master.set('key', 'value')
Use replica for reads (optional, for load distribution)
使用从节点进行读取(可选,用于负载分发)
value = replica.get('key')
undefinedvalue = replica.get('key')
undefinedAsync Redis with asyncio
基于asyncio的异步Redis
python
import asyncio
import redis.asyncio as redis
async def async_redis_operations():
"""Async Redis operations example."""
# Create async connection
r = await redis.from_url("redis://localhost")
try:
# Async operations
await r.set("async_key", "async_value")
value = await r.get("async_key")
print(f"Value: {value}")
# Async pipeline
async with r.pipeline(transaction=True) as pipe:
await pipe.set("key1", "value1")
await pipe.set("key2", "value2")
await pipe.get("key1")
results = await pipe.execute()
print(f"Pipeline results: {results}")
finally:
await r.close()python
import asyncio
import redis.asyncio as redis
async def async_redis_operations():
"""异步Redis操作示例。"""
# 创建异步连接
r = await redis.from_url("redis://localhost")
try:
# 异步操作
await r.set("async_key", "async_value")
value = await r.get("async_key")
print(f"值: {value}")
# 异步流水线
async with r.pipeline(transaction=True) as pipe:
await pipe.set("key1", "value1")
await pipe.set("key2", "value2")
await pipe.get("key1")
results = await pipe.execute()
print(f"流水线结果: {results}")
finally:
await r.close()Run async operations
运行异步操作
asyncio.run(async_redis_operations())
undefinedasyncio.run(async_redis_operations())
undefinedConnection Pool Configuration
连接池配置
python
undefinedpython
undefinedProduction-ready connection pool
生产环境就绪的连接池
pool = redis.ConnectionPool(
host='localhost',
port=6379,
db=0,
max_connections=50, # Max pool size
socket_timeout=5, # Socket timeout
socket_connect_timeout=5, # Connection timeout
socket_keepalive=True, # Keep TCP connection alive
socket_keepalive_options={
socket.TCP_KEEPIDLE: 60,
socket.TCP_KEEPINTVL: 10,
socket.TCP_KEEPCNT: 3
},
retry_on_timeout=True, # Retry on timeout
health_check_interval=30, # Health check every 30s
decode_responses=True # Auto-decode bytes to strings
)
r = redis.Redis(connection_pool=pool)
undefinedpool = redis.ConnectionPool(
host='localhost',
port=6379,
db=0,
max_connections=50, # 连接池最大容量
socket_timeout=5, # 套接字超时
socket_connect_timeout=5, # 连接超时
socket_keepalive=True, # 保持TCP连接
socket_keepalive_options={
socket.TCP_KEEPIDLE: 60,
socket.TCP_KEEPINTVL: 10,
socket.TCP_KEEPCNT: 3
},
retry_on_timeout=True, # 超时重试
health_check_interval=30, # 每30秒进行健康检查
decode_responses=True # 自动将字节解码为字符串
)
r = redis.Redis(connection_pool=pool)
undefinedError Handling and Resilience
错误处理与弹性
python
import redis
from redis.exceptions import ConnectionError, TimeoutError
import time
class ResilientRedisClient:
"""Redis client with retry logic and circuit breaker."""
def __init__(self, max_retries: int = 3, backoff: float = 0.1):
self.redis = redis.Redis(
host='localhost',
port=6379,
socket_timeout=5,
retry_on_timeout=True
)
self.max_retries = max_retries
self.backoff = backoff
def get_with_retry(self, key: str, default=None):
"""Get value with exponential backoff retry."""
for attempt in range(self.max_retries):
try:
return self.redis.get(key) or default
except (ConnectionError, TimeoutError) as e:
if attempt == self.max_retries - 1:
# Log error and return default
print(f"Redis error after {self.max_retries} attempts: {e}")
return default
# Exponential backoff
wait_time = self.backoff * (2 ** attempt)
time.sleep(wait_time)
def set_with_retry(self, key: str, value: str, ttl: int = None) -> bool:
"""Set value with retry logic."""
for attempt in range(self.max_retries):
try:
if ttl:
return self.redis.setex(key, ttl, value)
else:
return self.redis.set(key, value)
except (ConnectionError, TimeoutError) as e:
if attempt == self.max_retries - 1:
print(f"Redis error after {self.max_retries} attempts: {e}")
return False
wait_time = self.backoff * (2 ** attempt)
time.sleep(wait_time)python
import redis
from redis.exceptions import ConnectionError, TimeoutError
import time
class ResilientRedisClient:
"""带重试逻辑与断路器的Redis客户端。"""
def __init__(self, max_retries: int = 3, backoff: float = 0.1):
self.redis = redis.Redis(
host='localhost',
port=6379,
socket_timeout=5,
retry_on_timeout=True
)
self.max_retries = max_retries
self.backoff = backoff
def get_with_retry(self, key: str, default=None):
"""带指数退避重试的获取操作。"""
for attempt in range(self.max_retries):
try:
return self.redis.get(key) or default
except (ConnectionError, TimeoutError) as e:
if attempt == self.max_retries - 1:
# 记录错误并返回默认值
print(f"经过{self.max_retries}次尝试后Redis出错: {e}")
return default
# 指数退避
wait_time = self.backoff * (2 ** attempt)
time.sleep(wait_time)
def set_with_retry(self, key: str, value: str, ttl: int = None) -> bool:
"""带重试逻辑的设置操作。"""
for attempt in range(self.max_retries):
try:
if ttl:
return self.redis.setex(key, ttl, value)
else:
return self.redis.set(key, value)
except (ConnectionError, TimeoutError) as e:
if attempt == self.max_retries - 1:
print(f"经过{self.max_retries}次尝试后Redis出错: {e}")
return False
wait_time = self.backoff * (2 ** attempt)
time.sleep(wait_time)Monitoring and Metrics
监控与指标
python
def get_redis_info(section: str = None) -> dict:
"""Get Redis server information."""
return r.info(section=section)
def monitor_memory_usage():
"""Monitor Redis memory usage."""
info = r.info('memory')
used_memory = info['used_memory_human']
peak_memory = info['used_memory_peak_human']
memory_fragmentation = info['mem_fragmentation_ratio']
print(f"Used Memory: {used_memory}")
print(f"Peak Memory: {peak_memory}")
print(f"Fragmentation Ratio: {memory_fragmentation}")
return info
def monitor_stats():
"""Monitor Redis statistics."""
info = r.info('stats')
total_connections = info['total_connections_received']
total_commands = info['total_commands_processed']
ops_per_sec = info['instantaneous_ops_per_sec']
print(f"Total Connections: {total_connections}")
print(f"Total Commands: {total_commands}")
print(f"Ops/sec: {ops_per_sec}")
return info
def get_slow_log(count: int = 10):
"""Get slow query log."""
slow_log = r.slowlog_get(count)
for entry in slow_log:
print(f"Command: {entry['command']}")
print(f"Duration: {entry['duration']} microseconds")
print(f"Time: {entry['start_time']}")
print("---")
return slow_logpython
def get_redis_info(section: str = None) -> dict:
"""获取Redis服务器信息。"""
return r.info(section=section)
def monitor_memory_usage():
"""监控Redis内存使用情况。"""
info = r.info('memory')
used_memory = info['used_memory_human']
peak_memory = info['used_memory_peak_human']
memory_fragmentation = info['mem_fragmentation_ratio']
print(f"已用内存: {used_memory}")
print(f"峰值内存: {peak_memory}")
print(f"内存碎片率: {memory_fragmentation}")
return info
def monitor_stats():
"""监控Redis统计信息。"""
info = r.info('stats')
total_connections = info['total_connections_received']
total_commands = info['total_commands_processed']
ops_per_sec = info['instantaneous_ops_per_sec']
print(f"总连接数: {total_connections}")
print(f"总命令数: {total_commands}")
print(f"每秒操作数: {ops_per_sec}")
return info
def get_slow_log(count: int = 10):
"""获取慢查询日志。"""
slow_log = r.slowlog_get(count)
for entry in slow_log:
print(f"命令: {entry['command']}")
print(f"耗时: {entry['duration']} 微秒")
print(f"时间: {entry['start_time']}")
print("---")
return slow_logBest Practices
最佳实践
Key Naming Conventions
键命名规范
Use consistent, hierarchical naming:
python
undefined使用一致的分层命名:
python
undefinedGood naming patterns
良好的命名规范
user:123:profile # User profile data
user:123:sessions:abc # User session
cache:product:456 # Cached product
queue:emails:pending # Email queue
lock:resource:789 # Resource lock
counter:api:requests:daily # Daily API request counter
leaderboard:global:score # Global leaderboard
user:123:profile # 用户资料数据
user:123:sessions:abc # 用户会话
cache:product:456 # 缓存的产品数据
queue:emails:pending # 待发送邮件队列
lock:resource:789 # 资源锁
counter:api:requests:daily # 每日API请求计数器
leaderboard:global:score # 全球排行榜
Avoid
避免使用
u123 # Too cryptic
user_profile_123 # Underscores less common
123:user # Wrong hierarchy
undefinedu123 # 过于晦涩
user_profile_123 # 下划线使用较少
123:user # 层级错误
undefinedMemory Management
内存管理
python
undefinedpython
undefinedSet TTL on all temporary data
为所有临时数据设置TTL
r.setex("temp:data", 3600, value) # Expires in 1 hour
r.setex("temp:data", 3600, value) # 1小时后过期
Limit collection sizes
限制集合大小
r.lpush("activity_log", entry)
r.ltrim("activity_log", 0, 999) # Keep only 1000 items
r.lpush("activity_log", entry)
r.ltrim("activity_log", 0, 999) # 仅保留1000条
Use appropriate data structures
使用合适的数据结构
Hash is more memory-efficient than multiple keys
哈希比多个独立键更节省内存
r.hset("user:123", mapping={"name": "Alice", "email": "alice@example.com"})
r.hset("user:123", mapping={"name": "Alice", "email": "alice@example.com"})
vs
对比
r.set("user:123:name", "Alice")
r.set("user:123:email", "alice@example.com")
r.set("user:123:name", "Alice")
r.set("user:123:email", "alice@example.com")
Monitor memory usage
监控内存使用
if r.info('memory')['used_memory'] > threshold:
# Implement eviction or cleanup
cleanup_old_data()
undefinedif r.info('memory')['used_memory'] > threshold:
# 实现淘汰或清理逻辑
cleanup_old_data()
undefinedSecurity
安全
python
undefinedpython
undefinedUse authentication
使用身份验证
r = redis.Redis(
host='localhost',
port=6379,
password='your-secure-password',
username='your-username' # Redis 6+
)
r = redis.Redis(
host='localhost',
port=6379,
password='your-secure-password',
username='your-username' # Redis 6+
)
Use SSL/TLS for production
生产环境使用SSL/TLS
pool = redis.ConnectionPool(
host='redis.example.com',
port=6380,
connection_class=redis.SSLConnection,
ssl_cert_reqs='required',
ssl_ca_certs='/path/to/ca-cert.pem'
)
pool = redis.ConnectionPool(
host='redis.example.com',
port=6380,
connection_class=redis.SSLConnection,
ssl_cert_reqs='required',
ssl_ca_certs='/path/to/ca-cert.pem'
)
Credential provider pattern
凭证提供者模式
from redis import UsernamePasswordCredentialProvider
creds_provider = UsernamePasswordCredentialProvider("username", "password")
r = redis.Redis(
host="localhost",
port=6379,
credential_provider=creds_provider
)
undefinedfrom redis import UsernamePasswordCredentialProvider
creds_provider = UsernamePasswordCredentialProvider("username", "password")
r = redis.Redis(
host="localhost",
port=6379,
credential_provider=creds_provider
)
undefinedTesting
测试
python
import fakeredis
import pytest
@pytest.fixture
def redis_client():
"""Provide fake Redis client for testing."""
return fakeredis.FakeRedis(decode_responses=True)
def test_caching(redis_client):
"""Test caching logic."""
# Test cache miss
assert redis_client.get("test_key") is None
# Test cache set
redis_client.setex("test_key", 60, "test_value")
assert redis_client.get("test_key") == "test_value"
# Test expiration
assert redis_client.ttl("test_key") <= 60
def test_session_management(redis_client):
"""Test session operations."""
session_manager = SessionManager(redis_client)
# Create session
session_id = session_manager.create_session(user_id=123)
assert session_id is not None
# Get session
session = session_manager.get_session(session_id)
assert session['user_id'] == 123
# Delete session
assert session_manager.delete_session(session_id) is True
assert session_manager.get_session(session_id) is Nonepython
import fakeredis
import pytest
@pytest.fixture
def redis_client():
"""为测试提供模拟Redis客户端。"""
return fakeredis.FakeRedis(decode_responses=True)
def test_caching(redis_client):
"""测试缓存逻辑。"""
# 测试缓存未命中
assert redis_client.get("test_key") is None
# 测试缓存设置
redis_client.setex("test_key", 60, "test_value")
assert redis_client.get("test_key") == "test_value"
# 测试过期
assert redis_client.ttl("test_key") <= 60
def test_session_management(redis_client):
"""测试会话操作。"""
session_manager = SessionManager(redis_client)
# 创建会话
session_id = session_manager.create_session(user_id=123)
assert session_id is not None
# 获取会话
session = session_manager.get_session(session_id)
assert session['user_id'] == 123
# 删除会话
assert session_manager.delete_session(session_id) is True
assert session_manager.get_session(session_id) is NoneExamples
示例
Example 1: User Session Management with Redis
示例1:基于Redis的用户会话管理
python
import redis
import json
import uuid
from datetime import datetime, timedelta
class UserSessionManager:
"""Complete user session management with Redis."""
def __init__(self, redis_client: redis.Redis, ttl: int = 1800):
self.redis = redis_client
self.ttl = ttl
def create_session(self, user_id: int, user_data: dict = None) -> str:
"""Create new user session."""
session_id = str(uuid.uuid4())
session_key = f"session:{session_id}"
session_data = {
"user_id": user_id,
"created_at": datetime.utcnow().isoformat(),
"last_accessed": datetime.utcnow().isoformat(),
"data": user_data or {}
}
# Store session with TTL
self.redis.setex(session_key, self.ttl, json.dumps(session_data))
# Track user's active sessions
self.redis.sadd(f"user:{user_id}:sessions", session_id)
return session_id
def get_session(self, session_id: str) -> dict:
"""Get session and refresh TTL."""
session_key = f"session:{session_id}"
session_data = self.redis.get(session_key)
if session_data:
session = json.loads(session_data)
session['last_accessed'] = datetime.utcnow().isoformat()
# Refresh TTL
self.redis.setex(session_key, self.ttl, json.dumps(session))
return session
return None
def delete_session(self, session_id: str) -> bool:
"""Delete session."""
session = self.get_session(session_id)
if not session:
return False
user_id = session['user_id']
# Remove session
self.redis.delete(f"session:{session_id}")
# Remove from user's session set
self.redis.srem(f"user:{user_id}:sessions", session_id)
return True
def delete_all_user_sessions(self, user_id: int):
"""Delete all sessions for a user."""
sessions_key = f"user:{user_id}:sessions"
session_ids = self.redis.smembers(sessions_key)
for session_id in session_ids:
self.redis.delete(f"session:{session_id}")
self.redis.delete(sessions_key)
def get_user_sessions(self, user_id: int) -> list:
"""Get all active sessions for a user."""
sessions_key = f"user:{user_id}:sessions"
session_ids = self.redis.smembers(sessions_key)
sessions = []
for session_id in session_ids:
session = self.get_session(session_id)
if session:
session['session_id'] = session_id
sessions.append(session)
return sessionspython
import redis
import json
import uuid
from datetime import datetime, timedelta
class UserSessionManager:
"""基于Redis的完整用户会话管理器。"""
def __init__(self, redis_client: redis.Redis, ttl: int = 1800):
self.redis = redis_client
self.ttl = ttl
def create_session(self, user_id: int, user_data: dict = None) -> str:
"""创建新用户会话。"""
session_id = str(uuid.uuid4())
session_key = f"session:{session_id}"
session_data = {
"user_id": user_id,
"created_at": datetime.utcnow().isoformat(),
"last_accessed": datetime.utcnow().isoformat(),
"data": user_data or {}
}
# 存储会话并设置TTL
self.redis.setex(session_key, self.ttl, json.dumps(session_data))
# 跟踪用户的活跃会话
self.redis.sadd(f"user:{user_id}:sessions", session_id)
return session_id
def get_session(self, session_id: str) -> dict:
"""获取会话并刷新TTL。"""
session_key = f"session:{session_id}"
session_data = self.redis.get(session_key)
if session_data:
session = json.loads(session_data)
session['last_accessed'] = datetime.utcnow().isoformat()
# 刷新TTL
self.redis.setex(session_key, self.ttl, json.dumps(session))
return session
return None
def delete_session(self, session_id: str) -> bool:
"""删除会话。"""
session = self.get_session(session_id)
if not session:
return False
user_id = session['user_id']
# 删除会话
self.redis.delete(f"session:{session_id}")
# 从用户会话集合中移除
self.redis.srem(f"user:{user_id}:sessions", session_id)
return True
def delete_all_user_sessions(self, user_id: int):
"""删除用户的所有会话。"""
sessions_key = f"user:{user_id}:sessions"
session_ids = self.redis.smembers(sessions_key)
for session_id in session_ids:
self.redis.delete(f"session:{session_id}")
self.redis.delete(sessions_key)
def get_user_sessions(self, user_id: int) -> list:
"""获取用户的所有活跃会话。"""
sessions_key = f"user:{user_id}:sessions"
session_ids = self.redis.smembers(sessions_key)
sessions = []
for session_id in session_ids:
session = self.get_session(session_id)
if session:
session['session_id'] = session_id
sessions.append(session)
return sessionsUsage
使用
r = redis.Redis(decode_responses=True)
session_mgr = UserSessionManager(r)
r = redis.Redis(decode_responses=True)
session_mgr = UserSessionManager(r)
Create session
创建会话
session_id = session_mgr.create_session(
user_id=123,
user_data={"role": "admin", "permissions": ["read", "write"]}
)
session_id = session_mgr.create_session(
user_id=123,
user_data={"role": "admin", "permissions": ["read", "write"]}
)
Get session
获取会话
session = session_mgr.get_session(session_id)
print(f"User ID: {session['user_id']}")
session = session_mgr.get_session(session_id)
print(f"用户ID: {session['user_id']}")
List all user sessions
列出用户所有会话
sessions = session_mgr.get_user_sessions(123)
print(f"Active sessions: {len(sessions)}")
sessions = session_mgr.get_user_sessions(123)
print(f"活跃会话数: {len(sessions)}")
Logout (delete session)
登出(删除会话)
session_mgr.delete_session(session_id)
undefinedsession_mgr.delete_session(session_id)
undefinedExample 2: Real-Time Leaderboard
示例2:实时排行榜
python
import redis
import time
class Leaderboard:
"""Real-time leaderboard using Redis sorted sets."""
def __init__(self, redis_client: redis.Redis, name: str):
self.redis = redis_client
self.key = f"leaderboard:{name}"
def add_score(self, player_id: str, score: float):
"""Add or update player score."""
self.redis.zadd(self.key, {player_id: score})
def increment_score(self, player_id: str, increment: float):
"""Increment player score."""
self.redis.zincrby(self.key, increment, player_id)
def get_top(self, count: int = 10) -> list:
"""Get top players."""
# ZREVRANGE for highest scores first
players = self.redis.zrevrange(self.key, 0, count - 1, withscores=True)
return [
{
"rank": idx + 1,
"player_id": player_id,
"score": score
}
for idx, (player_id, score) in enumerate(players)
]
def get_rank(self, player_id: str) -> dict:
"""Get player rank and score."""
score = self.redis.zscore(self.key, player_id)
if score is None:
return None
# ZREVRANK for rank (0-indexed, highest first)
rank = self.redis.zrevrank(self.key, player_id)
return {
"player_id": player_id,
"rank": rank + 1 if rank is not None else None,
"score": score
}
def get_around(self, player_id: str, count: int = 5) -> list:
"""Get players around a specific player."""
rank = self.redis.zrevrank(self.key, player_id)
if rank is None:
return []
# Get players before and after
start = max(0, rank - count)
end = rank + count
players = self.redis.zrevrange(self.key, start, end, withscores=True)
return [
{
"rank": start + idx + 1,
"player_id": pid,
"score": score,
"is_current": pid == player_id
}
for idx, (pid, score) in enumerate(players)
]
def get_total_players(self) -> int:
"""Get total number of players."""
return self.redis.zcard(self.key)
def remove_player(self, player_id: str) -> bool:
"""Remove player from leaderboard."""
return self.redis.zrem(self.key, player_id) > 0python
import redis
import time
class Leaderboard:
"""基于Redis有序集合的实时排行榜。"""
def __init__(self, redis_client: redis.Redis, name: str):
self.redis = redis_client
self.key = f"leaderboard:{name}"
def add_score(self, player_id: str, score: float):
"""添加或更新玩家分数。"""
self.redis.zadd(self.key, {player_id: score})
def increment_score(self, player_id: str, increment: float):
"""递增玩家分数。"""
self.redis.zincrby(self.key, increment, player_id)
def get_top(self, count: int = 10) -> list:
"""获取排行榜前N名。"""
# ZREVRANGE用于降序排列(分数从高到低)
players = self.redis.zrevrange(self.key, 0, count - 1, withscores=True)
return [
{
"rank": idx + 1,
"player_id": player_id,
"score": score
}
for idx, (player_id, score) in enumerate(players)
]
def get_rank(self, player_id: str) -> dict:
"""获取玩家排名与分数。"""
score = self.redis.zscore(self.key, player_id)
if score is None:
return None
# ZREVRANK用于降序排名
rank = self.redis.zrevrank(self.key, player_id)
return {
"player_id": player_id,
"rank": rank + 1 if rank is not None else None,
"score": score
}
def get_around(self, player_id: str, count: int = 5) -> list:
"""获取指定玩家附近的玩家。"""
rank = self.redis.zrevrank(self.key, player_id)
if rank is None:
return []
# 获取前后的玩家
start = max(0, rank - count)
end = rank + count
players = self.redis.zrevrange(self.key, start, end, withscores=True)
return [
{
"rank": start + idx + 1,
"player_id": pid,
"score": score,
"is_current": pid == player_id
}
for idx, (pid, score) in enumerate(players)
]
def get_total_players(self) -> int:
"""获取总玩家数。"""
return self.redis.zcard(self.key)
def remove_player(self, player_id: str) -> bool:
"""从排行榜移除玩家。"""
return self.redis.zrem(self.key, player_id) > 0Usage
使用
r = redis.Redis(decode_responses=True)
leaderboard = Leaderboard(r, "global")
r = redis.Redis(decode_responses=True)
leaderboard = Leaderboard(r, "global")
Add scores
添加分数
leaderboard.add_score("alice", 1500)
leaderboard.add_score("bob", 2000)
leaderboard.add_score("charlie", 1800)
leaderboard.increment_score("alice", 200) # alice now at 1700
leaderboard.add_score("alice", 1500)
leaderboard.add_score("bob", 2000)
leaderboard.add_score("charlie", 1800)
leaderboard.increment_score("alice", 200) # alice现在1700分
Get top 10
获取前10名
top_players = leaderboard.get_top(10)
for player in top_players:
print(f"#{player['rank']}: {player['player_id']} - {player['score']}")
top_players = leaderboard.get_top(10)
for player in top_players:
print(f"#{player['rank']}: {player['player_id']} - {player['score']}")
Get player rank
获取玩家排名
alice_stats = leaderboard.get_rank("alice")
print(f"Alice is rank {alice_stats['rank']} with {alice_stats['score']} points")
alice_stats = leaderboard.get_rank("alice")
print(f"Alice排名 {alice_stats['rank']},分数 {alice_stats['score']}")
Get players around alice
获取Alice附近的玩家
nearby = leaderboard.get_around("alice", count=2)
for player in nearby:
marker = " <-- YOU" if player['is_current'] else ""
print(f"#{player['rank']}: {player['player_id']} - {player['score']}{marker}")
undefinednearby = leaderboard.get_around("alice", count=2)
for player in nearby:
marker = " <-- 你" if player['is_current'] else ""
print(f"#{player['rank']}: {player['player_id']} - {player['score']}{marker}")
undefinedExample 3: Distributed Rate Limiter
示例3:分布式速率限制器
python
import redis
import time
class RateLimiter:
"""Distributed rate limiter using Redis."""
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
# Lua script for atomic rate limiting
self.rate_limit_script = self.redis.register_script("""
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local current = redis.call('INCR', key)
if current == 1 then
redis.call('EXPIRE', key, window)
end
if current > limit then
return {0, limit, current - 1}
else
return {1, limit, current}
end
""")
def check_rate_limit(self, identifier: str, limit: int, window: int) -> dict:
"""
Check if request is within rate limit.
Args:
identifier: User ID, IP address, or API key
limit: Maximum requests allowed
window: Time window in seconds
Returns:
dict with allowed (bool), limit, current, remaining
"""
key = f"rate_limit:{identifier}:{int(time.time() // window)}"
allowed, max_limit, current = self.rate_limit_script(
keys=[key],
args=[limit, window]
)
return {
"allowed": bool(allowed),
"limit": max_limit,
"current": current,
"remaining": max(0, max_limit - current),
"reset_at": (int(time.time() // window) + 1) * window
}
def sliding_window_check(self, identifier: str, limit: int, window: int) -> dict:
"""
Sliding window rate limiter using sorted sets.
More accurate but slightly more expensive.
"""
key = f"rate_limit:sliding:{identifier}"
now = time.time()
window_start = now - window
# Remove old entries
self.redis.zremrangebyscore(key, 0, window_start)
# Count current requests
current = self.redis.zcard(key)
if current < limit:
# Add new request
self.redis.zadd(key, {str(now): now})
self.redis.expire(key, window)
return {
"allowed": True,
"limit": limit,
"current": current + 1,
"remaining": limit - current - 1
}
else:
return {
"allowed": False,
"limit": limit,
"current": current,
"remaining": 0
}python
import redis
import time
class RateLimiter:
"""基于Redis的分布式速率限制器。"""
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
# 用于原子速率限制的Lua脚本
self.rate_limit_script = self.redis.register_script("""
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local current = redis.call('INCR', key)
if current == 1 then
redis.call('EXPIRE', key, window)
end
if current > limit then
return {0, limit, current - 1}
else
return {1, limit, current}
end
""")
def check_rate_limit(self, identifier: str, limit: int, window: int) -> dict:
"""
检查请求是否在速率限制内。
参数:
identifier: 用户ID、IP地址或API密钥
limit: 允许的最大请求数
window: 时间窗口(秒)
返回:
包含allowed(布尔值)、limit、current、remaining的字典
"""
key = f"rate_limit:{identifier}:{int(time.time() // window)}"
allowed, max_limit, current = self.rate_limit_script(
keys=[key],
args=[limit, window]
)
return {
"allowed": bool(allowed),
"limit": max_limit,
"current": current,
"remaining": max(0, max_limit - current),
"reset_at": (int(time.time() // window) + 1) * window
}
def sliding_window_check(self, identifier: str, limit: int, window: int) -> dict:
"""
基于有序集合的滑动窗口速率限制器。
精度更高但开销略大。
"""
key = f"rate_limit:sliding:{identifier}"
now = time.time()
window_start = now - window
# 删除旧条目
self.redis.zremrangebyscore(key, 0, window_start)
# 统计当前请求数
current = self.redis.zcard(key)
if current < limit:
# 添加新请求
self.redis.zadd(key, {str(now): now})
self.redis.expire(key, window)
return {
"allowed": True,
"limit": limit,
"current": current + 1,
"remaining": limit - current - 1
}
else:
return {
"allowed": False,
"limit": limit,
"current": current,
"remaining": 0
}Usage
使用
r = redis.Redis(decode_responses=True)
limiter = RateLimiter(r)
r = redis.Redis(decode_responses=True)
limiter = RateLimiter(r)
API rate limiting: 100 requests per minute
API速率限制:每分钟100次请求
user_id = "user_123"
result = limiter.check_rate_limit(user_id, limit=100, window=60)
if result["allowed"]:
print(f"Request allowed. {result['remaining']} requests remaining.")
# Process request
else:
print(f"Rate limit exceeded. Try again at {result['reset_at']}")
# Return 429 Too Many Requests
user_id = "user_123"
result = limiter.check_rate_limit(user_id, limit=100, window=60)
if result["allowed"]:
print(f"请求允许。剩余请求数: {result['remaining']}")
# 处理请求
else:
print(f"超出速率限制。请在 {result['reset_at']} 后重试")
# 返回429 Too Many Requests
More accurate sliding window
更精确的滑动窗口
result = limiter.sliding_window_check(user_id, limit=100, window=60)
undefinedresult = limiter.sliding_window_check(user_id, limit=100, window=60)
undefinedExample 4: Distributed Job Queue
示例4:分布式任务队列
python
import redis
import json
import time
import uuid
from typing import Optional, Callable
class JobQueue:
"""Distributed job queue with Redis."""
def __init__(self, redis_client: redis.Redis, queue_name: str = "default"):
self.redis = redis_client
self.queue_name = queue_name
self.queue_key = f"queue:{queue_name}"
self.processing_key = f"queue:{queue_name}:processing"
def enqueue(self, job_type: str, payload: dict, priority: int = 0) -> str:
"""
Add job to queue.
Args:
job_type: Type of job (for routing to workers)
payload: Job data
priority: Higher priority = processed first (0 = normal)
Returns:
job_id
"""
job_id = str(uuid.uuid4())
job_data = {
"id": job_id,
"type": job_type,
"payload": payload,
"enqueued_at": time.time(),
"attempts": 0
}
# Add to queue (use ZADD for priority queue)
score = -priority # Negative for higher priority first
self.redis.zadd(self.queue_key, {json.dumps(job_data): score})
return job_id
def dequeue(self, timeout: int = 0) -> Optional[dict]:
"""
Get next job from queue.
Args:
timeout: Block for this many seconds (0 = no blocking)
Returns:
Job data or None
"""
# Get highest priority job (lowest score)
jobs = self.redis.zrange(self.queue_key, 0, 0)
if not jobs:
if timeout > 0:
time.sleep(min(timeout, 1))
return self.dequeue(timeout - 1)
return None
job_json = jobs[0]
# Move to processing set atomically
pipe = self.redis.pipeline()
pipe.zrem(self.queue_key, job_json)
pipe.zadd(self.processing_key, {job_json: time.time()})
pipe.execute()
job_data = json.loads(job_json)
job_data['attempts'] += 1
return job_data
def complete(self, job_data: dict):
"""Mark job as completed."""
job_json = json.dumps({
k: v for k, v in job_data.items()
if k != 'attempts'
})
# Remove from processing
self.redis.zrem(self.processing_key, job_json)
def retry(self, job_data: dict, delay: int = 0):
"""Retry failed job."""
job_json = json.dumps({
k: v for k, v in job_data.items()
if k != 'attempts'
})
# Remove from processing
self.redis.zrem(self.processing_key, job_json)
# Re-enqueue with delay
if delay > 0:
time.sleep(delay)
self.redis.zadd(self.queue_key, {job_json: 0})
def get_stats(self) -> dict:
"""Get queue statistics."""
return {
"queued": self.redis.zcard(self.queue_key),
"processing": self.redis.zcard(self.processing_key)
}python
import redis
import json
import time
import uuid
from typing import Optional, Callable
class JobQueue:
"""基于Redis的分布式任务队列。"""
def __init__(self, redis_client: redis.Redis, queue_name: str = "default"):
self.redis = redis_client
self.queue_name = queue_name
self.queue_key = f"queue:{queue_name}"
self.processing_key = f"queue:{queue_name}:processing"
def enqueue(self, job_type: str, payload: dict, priority: int = 0) -> str:
"""
将任务加入队列。
参数:
job_type: 任务类型(用于路由到对应工作器)
payload: 任务数据
priority: 优先级越高越先处理(0为普通)
返回:
job_id
"""
job_id = str(uuid.uuid4())
job_data = {
"id": job_id,
"type": job_type,
"payload": payload,
"enqueued_at": time.time(),
"attempts": 0
}
# 加入队列(使用ZADD实现优先级队列)
score = -priority # 负数实现优先级高的先处理
self.redis.zadd(self.queue_key, {json.dumps(job_data): score})
return job_id
def dequeue(self, timeout: int = 0) -> Optional[dict]:
"""
从队列获取下一个任务。
参数:
timeout: 阻塞等待时间(秒,0为非阻塞)
返回:
任务数据或None
"""
# 获取最高优先级任务(分数最低)
jobs = self.redis.zrange(self.queue_key, 0, 0)
if not jobs:
if timeout > 0:
time.sleep(min(timeout, 1))
return self.dequeue(timeout - 1)
return None
job_json = jobs[0]
# 原子性地移至处理中集合
pipe = self.redis.pipeline()
pipe.zrem(self.queue_key, job_json)
pipe.zadd(self.processing_key, {job_json: time.time()})
pipe.execute()
job_data = json.loads(job_json)
job_data['attempts'] += 1
return job_data
def complete(self, job_data: dict):
"""标记任务为已完成。"""
job_json = json.dumps({
k: v for k, v in job_data.items()
if k != 'attempts'
})
# 从处理中集合移除
self.redis.zrem(self.processing_key, job_json)
def retry(self, job_data: dict, delay: int = 0):
"""重试失败的任务。"""
job_json = json.dumps({
k: v for k, v in job_data.items()
if k != 'attempts'
})
# 从处理中集合移除
self.redis.zrem(self.processing_key, job_json)
# 延迟后重新加入队列
if delay > 0:
time.sleep(delay)
self.redis.zadd(self.queue_key, {job_json: 0})
def get_stats(self) -> dict:
"""获取队列统计信息。"""
return {
"queued": self.redis.zcard(self.queue_key),
"processing": self.redis.zcard(self.processing_key)
}Worker example
工作器示例
class Worker:
"""Job worker."""
def __init__(self, queue: JobQueue, handlers: dict):
self.queue = queue
self.handlers = handlers
def process_jobs(self):
"""Process jobs from queue."""
print("Worker started. Waiting for jobs...")
while True:
job = self.queue.dequeue(timeout=5)
if job:
print(f"Processing job {job['id']} (type: {job['type']})")
try:
# Get handler for job type
handler = self.handlers.get(job['type'])
if handler:
handler(job['payload'])
self.queue.complete(job)
print(f"Job {job['id']} completed")
else:
print(f"No handler for job type: {job['type']}")
self.queue.complete(job)
except Exception as e:
print(f"Job {job['id']} failed: {e}")
if job['attempts'] < 3:
# Retry with exponential backoff
delay = 2 ** job['attempts']
print(f"Retrying in {delay}s...")
self.queue.retry(job, delay=delay)
else:
print(f"Job {job['id']} failed permanently")
self.queue.complete(job)class Worker:
"""任务工作器。"""
def __init__(self, queue: JobQueue, handlers: dict):
self.queue = queue
self.handlers = handlers
def process_jobs(self):
"""处理队列中的任务。"""
print("工作器已启动。等待任务...")
while True:
job = self.queue.dequeue(timeout=5)
if job:
print(f"处理任务 {job['id']}(类型: {job['type']})")
try:
# 获取对应任务类型的处理器
handler = self.handlers.get(job['type'])
if handler:
handler(job['payload'])
self.queue.complete(job)
print(f"任务 {job['id']} 已完成")
else:
print(f"无对应任务类型的处理器: {job['type']}")
self.queue.complete(job)
except Exception as e:
print(f"任务 {job['id']} 处理失败: {e}")
if job['attempts'] < 3:
# 指数退避重试
delay = 2 ** job['attempts']
print(f"{delay}秒后重试...")
self.queue.retry(job, delay=delay)
else:
print(f"任务 {job['id']} 永久失败")
self.queue.complete(job)Usage
使用
r = redis.Redis(decode_responses=True)
queue = JobQueue(r, "email_queue")
r = redis.Redis(decode_responses=True)
queue = JobQueue(r, "email_queue")
Enqueue jobs
加入任务
job_id = queue.enqueue("send_email", {
"to": "user@example.com",
"subject": "Welcome!",
"body": "Thanks for signing up."
}, priority=1)
job_id = queue.enqueue("send_email", {
"to": "user@example.com",
"subject": "欢迎!",
"body": "感谢注册。"
}, priority=1)
Define handlers
定义处理器
def send_email_handler(payload):
print(f"Sending email to {payload['to']}")
# Email sending logic here
time.sleep(1) # Simulate work
handlers = {
"send_email": send_email_handler
}
def send_email_handler(payload):
print(f"向 {payload['to']} 发送邮件")
# 邮件发送逻辑
time.sleep(1) # 模拟工作
handlers = {
"send_email": send_email_handler
}
Start worker
启动工作器
worker = Worker(queue, handlers)
worker = Worker(queue, handlers)
worker.process_jobs() # This blocks - run in separate process
worker.process_jobs() # 阻塞运行 - 在独立进程中执行
undefinedundefinedExample 5: Real-Time Event Streaming
示例5:实时事件流
python
import redis
import json
import time
from typing import Callable, Optional
class EventStream:
"""Real-time event streaming with Redis Streams."""
def __init__(self, redis_client: redis.Redis, stream_name: str):
self.redis = redis_client
self.stream_name = stream_name
def publish(self, event_type: str, data: dict) -> str:
"""Publish event to stream."""
event = {
"type": event_type,
"data": json.dumps(data),
"timestamp": time.time()
}
# Add to stream (returns auto-generated ID)
event_id = self.redis.xadd(self.stream_name, event, maxlen=10000)
return event_id
def read_events(self, last_id: str = '0', count: int = 10) -> list:
"""Read events from stream."""
events = self.redis.xread(
{self.stream_name: last_id},
count=count,
block=1000 # 1 second timeout
)
if not events:
return []
_, event_list = events[0]
return [
{
"id": event_id,
"type": event_data[b'type'].decode(),
"data": json.loads(event_data[b'data'].decode()),
"timestamp": float(event_data[b'timestamp'])
}
for event_id, event_data in event_list
]
def create_consumer_group(self, group_name: str):
"""Create consumer group for parallel processing."""
try:
self.redis.xgroup_create(
name=self.stream_name,
groupname=group_name,
id='0',
mkstream=True
)
except redis.ResponseError as e:
if "BUSYGROUP" not in str(e):
raise
def consume_events(self, group_name: str, consumer_name: str,
count: int = 10) -> list:
"""Consume events as part of consumer group."""
events = self.redis.xreadgroup(
groupname=group_name,
consumername=consumer_name,
streams={self.stream_name: '>'},
count=count,
block=5000
)
if not events:
return []
_, event_list = events[0]
return [
{
"id": event_id,
"type": event_data[b'type'].decode(),
"data": json.loads(event_data[b'data'].decode()),
"timestamp": float(event_data[b'timestamp'])
}
for event_id, event_data in event_list
]
def acknowledge(self, group_name: str, event_id: str):
"""Acknowledge processed event."""
self.redis.xack(self.stream_name, group_name, event_id)
def get_pending(self, group_name: str) -> list:
"""Get pending (unacknowledged) events."""
pending = self.redis.xpending_range(
name=self.stream_name,
groupname=group_name,
min='-',
max='+',
count=100
)
return pendingpython
import redis
import json
import time
from typing import Callable, Optional
class EventStream:
"""基于Redis Streams的实时事件流。"""
def __init__(self, redis_client: redis.Redis, stream_name: str):
self.redis = redis_client
self.stream_name = stream_name
def publish(self, event_type: str, data: dict) -> str:
"""向流发布事件。"""
event = {
"type": event_type,
"data": json.dumps(data),
"timestamp": time.time()
}
# 加入流(返回自动生成的ID)
event_id = self.redis.xadd(self.stream_name, event, maxlen=10000)
return event_id
def read_events(self, last_id: str = '0', count: int = 10) -> list:
"""从流中读取事件。"""
events = self.redis.xread(
{self.stream_name: last_id},
count=count,
block=1000 # 1秒超时
)
if not events:
return []
_, event_list = events[0]
return [
{
"id": event_id,
"type": event_data[b'type'].decode(),
"data": json.loads(event_data[b'data'].decode()),
"timestamp": float(event_data[b'timestamp'])
}
for event_id, event_data in event_list
]
def create_consumer_group(self, group_name: str):
"""创建消费者组以实现并行处理。"""
try:
self.redis.xgroup_create(
name=self.stream_name,
groupname=group_name,
id='0',
mkstream=True
)
except redis.ResponseError as e:
if "BUSYGROUP" not in str(e):
raise
def consume_events(self, group_name: str, consumer_name: str,
count: int = 10) -> list:
"""以组内消费者身份消费事件。"""
events = self.redis.xreadgroup(
groupname=group_name,
consumername=consumer_name,
streams={self.stream_name: '>'},
count=count,
block=5000
)
if not events:
return []
_, event_list = events[0]
return [
{
"id": event_id,
"type": event_data[b'type'].decode(),
"data": json.loads(event_data[b'data'].decode()),
"timestamp": float(event_data[b'timestamp'])
}
for event_id, event_data in event_list
]
def acknowledge(self, group_name: str, event_id: str):
"""确认已处理的事件。"""
self.redis.xack(self.stream_name, group_name, event_id)
def get_pending(self, group_name: str) -> list:
"""获取未确认的事件。"""
pending = self.redis.xpending_range(
name=self.stream_name,
groupname=group_name,
min='-',
max='+',
count=100
)
return pendingUsage Example: Activity Feed
使用示例:活动流
r = redis.Redis()
activity_stream = EventStream(r, "user_activity")
r = redis.Redis()
activity_stream = EventStream(r, "user_activity")
Publish events
发布事件
activity_stream.publish("user_signup", {
"user_id": 123,
"email": "alice@example.com"
})
activity_stream.publish("post_created", {
"user_id": 123,
"post_id": 456,
"title": "My First Post"
})
activity_stream.publish("user_signup", {
"user_id": 123,
"email": "alice@example.com"
})
activity_stream.publish("post_created", {
"user_id": 123,
"post_id": 456,
"title": "我的第一篇帖子"
})
Read events (simple consumer)
读取事件(简单消费者)
last_id = '0'
while True:
events = activity_stream.read_events(last_id, count=10)
for event in events:
print(f"Event: {event['type']}")
print(f"Data: {event['data']}")
last_id = event['id']
if not events:
breaklast_id = '0'
while True:
events = activity_stream.read_events(last_id, count=10)
for event in events:
print(f"事件: {event['type']}")
print(f"数据: {event['data']}")
last_id = event['id']
if not events:
breakConsumer group example
消费者组示例
activity_stream.create_consumer_group("processors")
activity_stream.create_consumer_group("processors")
Worker consuming events
工作器消费事件
while True:
events = activity_stream.consume_events(
group_name="processors",
consumer_name="worker-1",
count=10
)
for event in events:
try:
# Process event
process_event(event)
# Acknowledge
activity_stream.acknowledge("processors", event['id'])
except Exception as e:
print(f"Failed to process event {event['id']}: {e}")
# Event remains unacknowledged for retryundefinedwhile True:
events = activity_stream.consume_events(
group_name="processors",
consumer_name="worker-1",
count=10
)
for event in events:
try:
# 处理事件
process_event(event)
# 确认处理
activity_stream.acknowledge("processors", event['id'])
except Exception as e:
print(f"处理事件 {event['id']} 失败: {e}")
# 事件保持未确认状态以便重试undefinedExample 6: Cache-Aside Pattern with Multi-Level Caching
示例6:带多级缓存的缓存旁路模式
python
import redis
import json
import hashlib
from typing import Optional, Any, Callable
class MultiLevelCache:
"""Multi-level caching with Redis and local cache."""
def __init__(self, redis_client: redis.Redis,
local_cache_size: int = 100,
local_ttl: int = 60,
redis_ttl: int = 3600):
self.redis = redis_client
self.local_cache = {}
self.local_cache_size = local_cache_size
self.local_ttl = local_ttl
self.redis_ttl = redis_ttl
def _make_key(self, namespace: str, key: str) -> str:
"""Generate cache key."""
return f"cache:{namespace}:{key}"
def get(self, namespace: str, key: str,
compute_fn: Optional[Callable] = None) -> Optional[Any]:
"""
Get value from cache with fallback to compute function.
Lookup order: Local cache → Redis → Compute function
"""
cache_key = self._make_key(namespace, key)
# Level 1: Local cache
if cache_key in self.local_cache:
entry = self.local_cache[cache_key]
if time.time() < entry['expires_at']:
return entry['value']
else:
del self.local_cache[cache_key]
# Level 2: Redis cache
redis_value = self.redis.get(cache_key)
if redis_value:
value = json.loads(redis_value)
# Populate local cache
self._set_local(cache_key, value)
return value
# Level 3: Compute function
if compute_fn:
value = compute_fn()
if value is not None:
self.set(namespace, key, value)
return value
return None
def set(self, namespace: str, key: str, value: Any):
"""Set value in both cache levels."""
cache_key = self._make_key(namespace, key)
serialized = json.dumps(value)
# Set in Redis
self.redis.setex(cache_key, self.redis_ttl, serialized)
# Set in local cache
self._set_local(cache_key, value)
def _set_local(self, key: str, value: Any):
"""Set value in local cache with LRU eviction."""
# Simple LRU: remove oldest if at capacity
if len(self.local_cache) >= self.local_cache_size:
# Remove oldest entry
oldest_key = min(
self.local_cache.keys(),
key=lambda k: self.local_cache[k]['expires_at']
)
del self.local_cache[oldest_key]
self.local_cache[key] = {
'value': value,
'expires_at': time.time() + self.local_ttl
}
def delete(self, namespace: str, key: str):
"""Delete from all cache levels."""
cache_key = self._make_key(namespace, key)
# Delete from Redis
self.redis.delete(cache_key)
# Delete from local cache
if cache_key in self.local_cache:
del self.local_cache[cache_key]
def invalidate_namespace(self, namespace: str):
"""Invalidate all keys in namespace."""
pattern = f"cache:{namespace}:*"
# Delete from Redis
for key in self.redis.scan_iter(match=pattern, count=100):
self.redis.delete(key)
# Delete from local cache
to_delete = [
k for k in self.local_cache.keys()
if k.startswith(f"cache:{namespace}:")
]
for k in to_delete:
del self.local_cache[k]python
import redis
import json
import hashlib
from typing import Optional, Any, Callable
class MultiLevelCache:
"""带Redis与本地缓存的多级缓存。"""
def __init__(self, redis_client: redis.Redis,
local_cache_size: int = 100,
local_ttl: int = 60,
redis_ttl: int = 3600):
self.redis = redis_client
self.local_cache = {}
self.local_cache_size = local_cache_size
self.local_ttl = local_ttl
self.redis_ttl = redis_ttl
def _make_key(self, namespace: str, key: str) -> str:
"""生成缓存键。"""
return f"cache:{namespace}:{key}"
def get(self, namespace: str, key: str,
compute_fn: Optional[Callable] = None) -> Optional[Any]:
"""
从缓存获取值,回退到计算函数。
查找顺序: 本地缓存 → Redis → 计算函数
"""
cache_key = self._make_key(namespace, key)
# 一级缓存:本地缓存
if cache_key in self.local_cache:
entry = self.local_cache[cache_key]
if time.time() < entry['expires_at']:
return entry['value']
else:
del self.local_cache[cache_key]
# 二级缓存:Redis
redis_value = self.redis.get(cache_key)
if redis_value:
value = json.loads(redis_value)
# 填充本地缓存
self._set_local(cache_key, value)
return value
# 三级回退:计算函数
if compute_fn:
value = compute_fn()
if value is not None:
self.set(namespace, key, value)
return value
return None
def set(self, namespace: str, key: str, value: Any):
"""在两级缓存中设置值。"""
cache_key = self._make_key(namespace, key)
serialized = json.dumps(value)
# 存入Redis
self.redis.setex(cache_key, self.redis_ttl, serialized)
# 存入本地缓存
self._set_local(cache_key, value)
def _set_local(self, key: str, value: Any):
"""存入本地缓存,带LRU淘汰。"""
# 简单LRU:达到容量时移除最旧条目
if len(self.local_cache) >= self.local_cache_size:
# 移除最旧条目
oldest_key = min(
self.local_cache.keys(),
key=lambda k: self.local_cache[k]['expires_at']
)
del self.local_cache[oldest_key]
self.local_cache[key] = {
'value': value,
'expires_at': time.time() + self.local_ttl
}
def delete(self, namespace: str, key: str):
"""从两级缓存中删除。"""
cache_key = self._make_key(namespace, key)
# 从Redis删除
self.redis.delete(cache_key)
# 从本地缓存删除
if cache_key in self.local_cache:
del self.local_cache[cache_key]
def invalidate_namespace(self, namespace: str):
"""失效命名空间下的所有键。"""
pattern = f"cache:{namespace}:*"
# 从Redis删除
for key in self.redis.scan_iter(match=pattern, count=100):
self.redis.delete(key)
# 从本地缓存删除
to_delete = [
k for k in self.local_cache.keys()
if k.startswith(f"cache:{namespace}:")
]
for k in to_delete:
del self.local_cache[k]Usage
使用
r = redis.Redis(decode_responses=True)
cache = MultiLevelCache(r)
def get_user(user_id: int) -> dict:
"""Get user with multi-level caching."""
return cache.get(
namespace="users",
key=str(user_id),
compute_fn=lambda: database.query_user(user_id)
)
r = redis.Redis(decode_responses=True)
cache = MultiLevelCache(r)
def get_user(user_id: int) -> dict:
"""带多级缓存的用户获取。"""
return cache.get(
namespace="users",
key=str(user_id),
compute_fn=lambda: database.query_user(user_id)
)
First call: Queries database, caches result
首次调用:查询数据库,缓存结果
user = get_user(123)
user = get_user(123)
Second call: Returns from local cache (fastest)
第二次调用:从本地缓存返回(最快)
user = get_user(123)
user = get_user(123)
Update user
更新用户
def update_user(user_id: int, data: dict):
database.update_user(user_id, data)
# Invalidate cache
cache.delete("users", str(user_id))def update_user(user_id: int, data: dict):
database.update_user(user_id, data)
# 失效缓存
cache.delete("users", str(user_id))Invalidate all user caches
失效所有用户缓存
cache.invalidate_namespace("users")
undefinedcache.invalidate_namespace("users")
undefinedExample 7: Geo-Location with Redis
示例7:基于Redis的地理位置
python
import redis
class GeoLocation:
"""Geo-spatial indexing and queries with Redis."""
def __init__(self, redis_client: redis.Redis, index_name: str):
self.redis = redis_client
self.key = f"geo:{index_name}"
def add_location(self, location_id: str, longitude: float, latitude: float):
"""Add location to geo index."""
self.redis.geoadd(self.key, longitude, latitude, location_id)
def add_locations(self, locations: list):
"""Batch add locations.
Args:
locations: List of (location_id, longitude, latitude) tuples
"""
self.redis.geoadd(self.key, *[
item for loc in locations
for item in (loc[1], loc[2], loc[0])
])
def get_position(self, location_id: str) -> tuple:
"""Get coordinates of a location."""
result = self.redis.geopos(self.key, location_id)
if result and result[0]:
return result[0] # (longitude, latitude)
return None
def find_nearby(self, longitude: float, latitude: float,
radius: float, unit: str = 'km', count: int = None) -> list:
"""
Find locations within radius.
Args:
longitude: Center longitude
latitude: Center latitude
radius: Search radius
unit: Distance unit ('m', 'km', 'mi', 'ft')
count: Maximum results
"""
args = {
'longitude': longitude,
'latitude': latitude,
'radius': radius,
'unit': unit,
'withdist': True,
'withcoord': True,
'sort': 'ASC'
}
if count:
args['count'] = count
results = self.redis.georadius(self.key, **args)
return [
{
'location_id': location_id,
'distance': distance,
'coordinates': (longitude, latitude)
}
for location_id, distance, (longitude, latitude) in results
]
def find_nearby_member(self, location_id: str, radius: float,
unit: str = 'km', count: int = None) -> list:
"""Find locations near an existing member."""
args = {
'member': location_id,
'radius': radius,
'unit': unit,
'withdist': True,
'sort': 'ASC'
}
if count:
args['count'] = count
results = self.redis.georadiusbymember(self.key, **args)
return [
{
'location_id': loc_id,
'distance': distance
}
for loc_id, distance in results
if loc_id != location_id # Exclude self
]
def distance_between(self, location_id1: str, location_id2: str,
unit: str = 'km') -> float:
"""Calculate distance between two locations."""
return self.redis.geodist(self.key, location_id1, location_id2, unit)python
import redis
class GeoLocation:
"""基于Redis的地理空间索引与查询。"""
def __init__(self, redis_client: redis.Redis, index_name: str):
self.redis = redis_client
self.key = f"geo:{index_name}"
def add_location(self, location_id: str, longitude: float, latitude: float):
"""向地理索引添加位置。"""
self.redis.geoadd(self.key, longitude, latitude, location_id)
def add_locations(self, locations: list):
"""批量添加位置。
参数:
locations: (location_id, longitude, latitude)元组列表
"""
self.redis.geoadd(self.key, *[
item for loc in locations
for item in (loc[1], loc[2], loc[0])
])
def get_position(self, location_id: str) -> tuple:
"""获取位置坐标。"""
result = self.redis.geopos(self.key, location_id)
if result and result[0]:
return result[0] # (经度, 纬度)
return None
def find_nearby(self, longitude: float, latitude: float,
radius: float, unit: str = 'km', count: int = None) -> list:
"""
查找指定半径内的位置。
参数:
longitude: 中心经度
latitude: 中心纬度
radius: 搜索半径
unit: 距离单位('m', 'km', 'mi', 'ft')
count: 最大结果数
"""
args = {
'longitude': longitude,
'latitude': latitude,
'radius': radius,
'unit': unit,
'withdist': True,
'withcoord': True,
'sort': 'ASC'
}
if count:
args['count'] = count
results = self.redis.georadius(self.key, **args)
return [
{
'location_id': location_id,
'distance': distance,
'coordinates': (longitude, latitude)
}
for location_id, distance, (longitude, latitude) in results
]
def find_nearby_member(self, location_id: str, radius: float,
unit: str = 'km', count: int = None) -> list:
"""查找指定位置附近的其他位置。"""
args = {
'member': location_id,
'radius': radius,
'unit': unit,
'withdist': True,
'sort': 'ASC'
}
if count:
args['count'] = count
results = self.redis.georadiusbymember(self.key, **args)
return [
{
'location_id': loc_id,
'distance': distance
}
for loc_id, distance in results
if loc_id != location_id # 排除自身
]
def distance_between(self, location_id1: str, location_id2: str,
unit: str = 'km') -> float:
"""计算两个位置间的距离。"""
return self.redis.geodist(self.key, location_id1, location_id2, unit)Usage Example: Restaurant finder
使用示例:餐厅查找器
r = redis.Redis(decode_responses=True)
restaurants = GeoLocation(r, "restaurants")
r = redis.Redis(decode_responses=True)
restaurants = GeoLocation(r, "restaurants")
Add restaurants
添加餐厅
restaurants.add_locations([
("rest1", -122.4194, 37.7749), # San Francisco
("rest2", -122.4068, 37.7849),
("rest3", -122.4312, 37.7652),
])
restaurants.add_locations([
("rest1", -122.4194, 37.7749), # 旧金山
("rest2", -122.4068, 37.7849),
("rest3", -122.4312, 37.7652),
])
Find restaurants near coordinates
查找指定坐标附近的餐厅
nearby = restaurants.find_nearby(
longitude=-122.4194,
latitude=37.7749,
radius=5,
unit='km',
count=10
)
for restaurant in nearby:
print(f"{restaurant['location_id']}: {restaurant['distance']:.2f} km away")
nearby = restaurants.find_nearby(
longitude=-122.4194,
latitude=37.7749,
radius=5,
unit='km',
count=10
)
for restaurant in nearby:
print(f"{restaurant['location_id']}: {restaurant['distance']:.2f} 公里远")
Find restaurants near a specific restaurant
查找指定餐厅附近的餐厅
similar = restaurants.find_nearby_member("rest1", radius=2, unit='km')
similar = restaurants.find_nearby_member("rest1", radius=2, unit='km')
Get distance between two restaurants
计算两个餐厅间的距离
distance = restaurants.distance_between("rest1", "rest2", unit='km')
print(f"Distance: {distance:.2f} km")
undefineddistance = restaurants.distance_between("rest1", "rest2", unit='km')
print(f"距离: {distance:.2f} 公里")
undefinedQuick Reference
快速参考
Common Operations
常见操作
python
undefinedpython
undefinedConnection
连接
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
Strings
字符串
r.set('key', 'value')
r.setex('key', 3600, 'value') # With TTL
r.get('key')
r.incr('counter')
r.set('key', 'value')
r.setex('key', 3600, 'value') # 带TTL
r.get('key')
r.incr('counter')
Hashes
哈希
r.hset('user:123', 'name', 'Alice')
r.hset('user:123', mapping={'name': 'Alice', 'age': 30})
r.hget('user:123', 'name')
r.hgetall('user:123')
r.hset('user:123', 'name', 'Alice')
r.hset('user:123', mapping={'name': 'Alice', 'age': 30})
r.hget('user:123', 'name')
r.hgetall('user:123')
Lists
列表
r.lpush('queue', 'item')
r.rpush('queue', 'item')
r.lpop('queue')
r.lrange('queue', 0, -1)
r.lpush('queue', 'item')
r.rpush('queue', 'item')
r.lpop('queue')
r.lrange('queue', 0, -1)
Sets
集合
r.sadd('tags', 'python', 'redis')
r.smembers('tags')
r.sismember('tags', 'python')
r.sadd('tags', 'python', 'redis')
r.smembers('tags')
r.sismember('tags', 'python')
Sorted Sets
有序集合
r.zadd('leaderboard', {'alice': 100, 'bob': 200})
r.zrange('leaderboard', 0, -1, withscores=True)
r.zrank('leaderboard', 'alice')
r.zadd('leaderboard', {'alice': 100, 'bob': 200})
r.zrange('leaderboard', 0, -1, withscores=True)
r.zrank('leaderboard', 'alice')
Expiration
过期
r.expire('key', 3600)
r.ttl('key')
r.expire('key', 3600)
r.ttl('key')
Pipelining
流水线
pipe = r.pipeline()
pipe.set('key1', 'value1')
pipe.set('key2', 'value2')
results = pipe.execute()
undefinedpipe = r.pipeline()
pipe.set('key1', 'value1')
pipe.set('key2', 'value2')
results = pipe.execute()
undefinedTime Complexity
时间复杂度
- GET, SET: O(1)
- HGET, HSET: O(1)
- LPUSH, RPUSH, LPOP, RPOP: O(1)
- SADD, SREM, SISMEMBER: O(1)
- ZADD, ZREM: O(log(N))
- ZRANGE, ZREVRANGE: O(log(N)+M) where M is result size
- SCAN, SSCAN, HSCAN, ZSCAN: O(1) per iteration
Skill Version: 1.0.0
Last Updated: October 2025
Skill Category: State Management, Distributed Systems, Performance Optimization
Compatible With: redis-py, Redis 6.0+, Redis 7.0+
- GET, SET: O(1)
- HGET, HSET: O(1)
- LPUSH, RPUSH, LPOP, RPOP: O(1)
- SADD, SREM, SISMEMBER: O(1)
- ZADD, ZREM: O(log(N))
- ZRANGE, ZREVRANGE: O(log(N)+M) 其中M为结果数量
- SCAN, SSCAN, HSCAN, ZSCAN: O(1) 每次迭代
技能版本: 1.0.0
最后更新: 2025年10月
技能分类: 状态管理、分布式系统、性能优化
兼容版本: redis-py, Redis 6.0+, Redis 7.0+