redis-patterns
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseRedis Patterns
Redis 模式
Quick reference for Redis best practices across common backend use cases.
面向常见后端用例的Redis最佳实践速查手册。
How It Works
工作原理
Redis is an in-memory data structure store that supports strings, hashes, lists, sets, sorted sets, streams, and more. Individual Redis commands are atomic on a single instance; multi-step workflows require Lua scripts, MULTI/EXEC transactions, or explicit synchronization to stay atomic. Data is optionally persisted via RDB snapshots or AOF logs. Clients communicate over TCP using the RESP protocol; connection pools are essential to avoid per-request handshake overhead.
Redis是一款内存数据结构存储,支持字符串、哈希、列表、集合、有序集合、流等多种类型。单个Redis实例上的独立命令是原子性的;多步骤工作流需要Lua脚本、MULTI/EXEC事务或显式同步来保持原子性。数据可通过RDB快照或AOF日志选择性持久化。客户端通过TCP使用RESP协议进行通信;连接池对于避免每次请求的握手开销至关重要。
When to Activate
适用场景
- Adding caching to an application
- Implementing rate limiting or throttling
- Building distributed locks or coordination
- Setting up session or token storage
- Using Pub/Sub or Redis Streams for messaging
- Configuring Redis in production (pooling, eviction, clustering)
- 为应用添加缓存功能
- 实现限流或流量控制
- 构建分布式锁或协调机制
- 设置会话或令牌存储
- 使用Pub/Sub或Redis Streams进行消息传递
- 生产环境中配置Redis(连接池、淘汰策略、集群)
Data Structure Cheat Sheet
数据结构速查表
| Use Case | Structure | Example Key |
|---|---|---|
| Simple cache | String | |
| User session | Hash | |
| Leaderboard | Sorted Set | |
| Unique visitors | Set | |
| Activity feed | List | |
| Event stream | Stream | |
| Counters / rate limits | String (INCR) | |
| Bloom filter / HLL | HyperLogLog | |
| 用例 | 数据结构 | 示例键名 |
|---|---|---|
| 简单缓存 | 字符串 | |
| 用户会话 | 哈希 | |
| 排行榜 | 有序集合 | |
| 独立访客统计 | 集合 | |
| 活动信息流 | 列表 | |
| 事件流 | 流 | |
| 计数器/限流 | 字符串(INCR命令) | |
| 布隆过滤器/基数统计 | HyperLogLog | |
Core Patterns
核心模式
Cache-Aside (Lazy Loading)
缓存旁路(懒加载)
python
import redis
import json
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
def get_product(product_id: int):
cache_key = f"product:{product_id}"
cached = r.get(cache_key)
if cached:
return json.loads(cached)
product = db.query("SELECT * FROM products WHERE id = %s", product_id)
r.setex(cache_key, 3600, json.dumps(product)) # TTL: 1 hour
return productpython
import redis
import json
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
def get_product(product_id: int):
cache_key = f"product:{product_id}"
cached = r.get(cache_key)
if cached:
return json.loads(cached)
product = db.query("SELECT * FROM products WHERE id = %s", product_id)
r.setex(cache_key, 3600, json.dumps(product)) # TTL: 1 hour
return productWrite-Through Cache
写穿透缓存
python
def update_product(product_id: int, data: dict):
# Write to DB first
db.execute("UPDATE products SET ... WHERE id = %s", product_id)
# Immediately update cache
cache_key = f"product:{product_id}"
r.setex(cache_key, 3600, json.dumps(data))python
def update_product(product_id: int, data: dict):
# 先写入数据库
db.execute("UPDATE products SET ... WHERE id = %s", product_id)
# 立即更新缓存
cache_key = f"product:{product_id}"
r.setex(cache_key, 3600, json.dumps(data))Cache Invalidation
缓存失效
python
undefinedpython
undefinedTag-based invalidation — group related keys under a set
基于标签的失效——将相关键分组存储在集合中
def cache_product(product_id: int, category_id: int, data: dict):
key = f"product:{product_id}"
tag = f"tag:category:{category_id}"
pipe = r.pipeline(transaction=True)
pipe.setex(key, 3600, json.dumps(data))
pipe.sadd(tag, key)
pipe.expire(tag, 3600)
pipe.execute()
def invalidate_category(category_id: int):
tag = f"tag:category:{category_id}"
keys = r.smembers(tag)
if keys:
r.delete(*keys)
r.delete(tag)
undefineddef cache_product(product_id: int, category_id: int, data: dict):
key = f"product:{product_id}"
tag = f"tag:category:{category_id}"
pipe = r.pipeline(transaction=True)
pipe.setex(key, 3600, json.dumps(data))
pipe.sadd(tag, key)
pipe.expire(tag, 3600)
pipe.execute()
def invalidate_category(category_id: int):
tag = f"tag:category:{category_id}"
keys = r.smembers(tag)
if keys:
r.delete(*keys)
r.delete(tag)
undefinedSession Storage
会话存储
python
import time
import uuid
def create_session(user_id: int, ttl: int = 86400) -> str:
session_id = str(uuid.uuid4())
key = f"session:{session_id}"
pipe = r.pipeline(transaction=True)
pipe.hset(key, mapping={
"user_id": user_id,
"created_at": int(time.time()),
})
pipe.expire(key, ttl)
pipe.execute()
return session_id
def get_session(session_id: str) -> dict | None:
data = r.hgetall(f"session:{session_id}")
return data if data else None
def delete_session(session_id: str):
r.delete(f"session:{session_id}")python
import time
import uuid
def create_session(user_id: int, ttl: int = 86400) -> str:
session_id = str(uuid.uuid4())
key = f"session:{session_id}"
pipe = r.pipeline(transaction=True)
pipe.hset(key, mapping={
"user_id": user_id,
"created_at": int(time.time()),
})
pipe.expire(key, ttl)
pipe.execute()
return session_id
def get_session(session_id: str) -> dict | None:
data = r.hgetall(f"session:{session_id}")
return data if data else None
def delete_session(session_id: str):
r.delete(f"session:{session_id}")Rate Limiting
限流
Fixed Window (Simple)
固定窗口(简单版)
python
def is_rate_limited(user_id: int, limit: int = 100, window: int = 60) -> bool:
key = f"ratelimit:{user_id}:{int(time.time()) // window}"
pipe = r.pipeline(transaction=True)
pipe.incr(key)
pipe.expire(key, window)
count, _ = pipe.execute()
return count > limitpython
def is_rate_limited(user_id: int, limit: int = 100, window: int = 60) -> bool:
key = f"ratelimit:{user_id}:{int(time.time()) // window}"
pipe = r.pipeline(transaction=True)
pipe.incr(key)
pipe.expire(key, window)
count, _ = pipe.execute()
return count > limitSliding Window (Lua — Atomic)
滑动窗口(Lua脚本——原子性)
lua
-- sliding_window.lua
local key = KEYS[1]
local now = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local limit = tonumber(ARGV[3])
redis.call('ZREMRANGEBYSCORE', key, 0, now - window)
local count = redis.call('ZCARD', key)
if count < limit then
-- Use unique member (now + sequence) to avoid collisions within the same millisecond
local seq_key = key .. ':seq'
local seq = redis.call('INCR', seq_key)
redis.call('EXPIRE', seq_key, math.ceil(window / 1000))
redis.call('ZADD', key, now, now .. '-' .. seq)
redis.call('EXPIRE', key, math.ceil(window / 1000))
return 1
end
return 0python
sliding_window = r.register_script(open('sliding_window.lua').read())
def allow_request(user_id: int) -> bool:
key = f"ratelimit:sliding:{user_id}"
now = int(time.time() * 1000)
return bool(sliding_window(keys=[key], args=[now, 60000, 100]))lua
-- sliding_window.lua
local key = KEYS[1]
local now = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local limit = tonumber(ARGV[3])
redis.call('ZREMRANGEBYSCORE', key, 0, now - window)
local count = redis.call('ZCARD', key)
if count < limit then
-- 使用唯一成员(now + 序列号)避免同一毫秒内的冲突
local seq_key = key .. ':seq'
local seq = redis.call('INCR', seq_key)
redis.call('EXPIRE', seq_key, math.ceil(window / 1000))
redis.call('ZADD', key, now, now .. '-' .. seq)
redis.call('EXPIRE', key, math.ceil(window / 1000))
return 1
end
return 0python
sliding_window = r.register_script(open('sliding_window.lua').read())
def allow_request(user_id: int) -> bool:
key = f"ratelimit:sliding:{user_id}"
now = int(time.time() * 1000)
return bool(sliding_window(keys=[key], args=[now, 60000, 100]))Distributed Locks
分布式锁
Distributed Lock (Single Node — SET NX PX)
分布式锁(单节点——SET NX PX)
python
import uuid
def acquire_lock(resource: str, ttl_ms: int = 5000) -> str | None:
lock_key = f"lock:{resource}"
token = str(uuid.uuid4())
acquired = r.set(lock_key, token, px=ttl_ms, nx=True)
return token if acquired else None
def release_lock(resource: str, token: str) -> bool:
release_script = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
"""
result = r.eval(release_script, 1, f"lock:{resource}", token)
return bool(result)python
import uuid
def acquire_lock(resource: str, ttl_ms: int = 5000) -> str | None:
lock_key = f"lock:{resource}"
token = str(uuid.uuid4())
acquired = r.set(lock_key, token, px=ttl_ms, nx=True)
return token if acquired else None
def release_lock(resource: str, token: str) -> bool:
release_script = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
"""
result = r.eval(release_script, 1, f"lock:{resource}", token)
return bool(result)Usage
使用示例
token = acquire_lock("order:payment:123")
if token:
try:
process_payment()
finally:
release_lock("order:payment:123", token)
> For multi-node setups use the `redlock-py` library which implements the full Redlock algorithm.token = acquire_lock("order:payment:123")
if token:
try:
process_payment()
finally:
release_lock("order:payment:123", token)
> 多节点部署场景下,请使用实现完整Redlock算法的`redlock-py`库。Pub/Sub & Streams
发布/订阅与流
Pub/Sub (Fire-and-Forget)
发布/订阅(即发即弃)
python
undefinedpython
undefinedPublisher
发布者
def publish_event(channel: str, payload: dict):
r.publish(channel, json.dumps(payload))
def publish_event(channel: str, payload: dict):
r.publish(channel, json.dumps(payload))
Subscriber (blocking — run in separate thread/process)
订阅者(阻塞式——在独立线程/进程中运行)
def subscribe_events(channel: str):
pubsub = r.pubsub()
pubsub.subscribe(channel)
for message in pubsub.listen():
if message['type'] == 'message':
handle(json.loads(message['data']))
undefineddef subscribe_events(channel: str):
pubsub = r.pubsub()
pubsub.subscribe(channel)
for message in pubsub.listen():
if message['type'] == 'message':
handle(json.loads(message['data']))
undefinedRedis Streams (Durable Queue)
Redis Streams(持久化队列)
python
undefinedpython
undefinedProducer
生产者
def emit(stream: str, event: dict):
r.xadd(stream, event, maxlen=10000) # Cap stream length
def emit(stream: str, event: dict):
r.xadd(stream, event, maxlen=10000) # 限制流长度
Consumer group — guarantees at-least-once delivery
消费者组——保证至少一次投递
try:
r.xgroup_create('events:orders', 'processor', id='0', mkstream=True)
except Exception:
pass # Group already exists
def consume(stream: str, group: str, consumer: str):
while True:
messages = r.xreadgroup(group, consumer, {stream: '>'}, count=10, block=2000)
for _, entries in (messages or []):
for msg_id, data in entries:
process(data)
r.xack(stream, group, msg_id)
> Prefer **Streams** over Pub/Sub when you need delivery guarantees, consumer groups, or replay.try:
r.xgroup_create('events:orders', 'processor', id='0', mkstream=True)
except Exception:
pass # 组已存在
def consume(stream: str, group: str, consumer: str):
while True:
messages = r.xreadgroup(group, consumer, {stream: '>'}, count=10, block=2000)
for _, entries in (messages or []):
for msg_id, data in entries:
process(data)
r.xack(stream, group, msg_id)
> 当你需要投递保障、消费者组或重放功能时,优先选择**Streams**而非Pub/Sub。Key Design
键设计
Naming Conventions
命名规范
undefinedundefinedPattern: resource:id:field
模式:资源:ID:字段
user:123:profile
order:456:status
cache:product:789
user:123:profile
order:456:status
cache:product:789
Pattern: namespace:resource:id
模式:命名空间:资源:ID
myapp:session:abc123
myapp:ratelimit:user:123
myapp:session:abc123
myapp:ratelimit:user:123
Pattern: resource:date (time-bound keys)
模式:资源:日期(时间绑定键)
stats:pageviews:2024-01-01
undefinedstats:pageviews:2024-01-01
undefinedTTL Strategy
TTL策略
| Data Type | Suggested TTL |
|---|---|
| User session | 24h ( |
| API response cache | 5–15 min |
| Rate limit window | Match window size |
| Short-lived tokens | 5–10 min |
| Leaderboard | 1h–24h |
| Static/reference data | 1h–1 week |
Always set a TTL. Keys without TTL accumulate indefinitely and cause memory pressure.
| 数据类型 | 建议TTL |
|---|---|
| 用户会话 | 24小时 ( |
| API响应缓存 | 5–15分钟 |
| 限流窗口 | 匹配窗口大小 |
| 短期令牌 | 5–10分钟 |
| 排行榜 | 1小时–24小时 |
| 静态/参考数据 | 1小时–1周 |
始终设置TTL。没有TTL的键会无限累积,导致内存压力。
Connection Management
连接管理
Connection Pooling
连接池
python
from redis import ConnectionPool, Redis
pool = ConnectionPool(
host='localhost',
port=6379,
db=0,
max_connections=20,
decode_responses=True,
socket_connect_timeout=2,
socket_timeout=2,
)
r = Redis(connection_pool=pool)python
from redis import ConnectionPool, Redis
pool = ConnectionPool(
host='localhost',
port=6379,
db=0,
max_connections=20,
decode_responses=True,
socket_connect_timeout=2,
socket_timeout=2,
)
r = Redis(connection_pool=pool)Cluster Mode
集群模式
python
from redis.cluster import RedisCluster
r = RedisCluster(
startup_nodes=[{"host": "redis-1", "port": 6379}],
decode_responses=True,
skip_full_coverage_check=True,
)python
from redis.cluster import RedisCluster
r = RedisCluster(
startup_nodes=[{"host": "redis-1", "port": 6379}],
decode_responses=True,
skip_full_coverage_check=True,
)Sentinel (High Availability)
Sentinel(高可用)
python
from redis.sentinel import Sentinel
sentinel = Sentinel(
[('sentinel-1', 26379), ('sentinel-2', 26379)],
socket_timeout=0.5,
)
master = sentinel.master_for('mymaster', decode_responses=True)
replica = sentinel.slave_for('mymaster', decode_responses=True)python
from redis.sentinel import Sentinel
sentinel = Sentinel(
[('sentinel-1', 26379), ('sentinel-2', 26379)],
socket_timeout=0.5,
)
master = sentinel.master_for('mymaster', decode_responses=True)
replica = sentinel.slave_for('mymaster', decode_responses=True)Eviction Policies
淘汰策略
| Policy | Behavior | Best For |
|---|---|---|
| Error on write when full | Queues / critical data |
| Evict least recently used | General cache |
| LRU only among keys with TTL | Mixed data store |
| Evict least frequently used | Skewed access patterns |
| Evict soonest-to-expire | Prioritize long-lived data |
Set via :
redis.confmaxmemory-policy allkeys-lru| 策略 | 行为 | 适用场景 |
|---|---|---|
| 内存满时写入报错 | 队列/关键数据 |
| 淘汰最近最少使用的键 | 通用缓存 |
| 仅淘汰带TTL的键中最近最少使用的 | 混合数据存储 |
| 淘汰最不常使用的键 | 访问模式倾斜的场景 |
| 淘汰即将过期的键 | 优先保留长期数据 |
通过设置:
redis.confmaxmemory-policy allkeys-lruAnti-Patterns
反模式
| Anti-Pattern | Problem | Fix |
|---|---|---|
| Keys with no TTL | Memory grows unbounded | Always set TTL |
| Blocks the server (O(N)) | Use |
| Storing large blobs (>100KB) | Slow serialization, memory pressure | Store reference + fetch from object store |
| Single Redis for everything | No isolation between cache & queue | Use separate DBs or instances |
| Ignoring connection pool limits | Connection exhaustion under load | Size pool to workload |
| Not handling cache miss stampede | Thundering herd on cold start | Use locks or probabilistic early expiry |
| Wipes entire instance | Scope deletes by key pattern |
| 反模式 | 问题 | 解决方案 |
|---|---|---|
| 键未设置TTL | 内存无限增长 | 始终设置TTL |
生产环境使用 | 阻塞服务器(O(N)复杂度) | 使用 |
| 存储大对象(>100KB) | 序列化缓慢,内存压力大 | 存储引用,从对象存储中获取 |
| 单一Redis实例处理所有业务 | 缓存与队列无隔离 | 使用独立数据库或实例 |
| 忽略连接池限制 | 高负载下连接耗尽 | 根据工作负载调整池大小 |
| 未处理缓存击穿 | 冷启动时出现惊群效应 | 使用锁或概率性提前过期 |
随意使用 | 清空整个实例 | 按键模式范围删除 |
Cache Miss Stampede Prevention
缓存击穿预防
python
import threading
_locks: dict[str, threading.Lock] = {}
_locks_mutex = threading.Lock()
def get_with_lock(key: str, fetch_fn, ttl: int = 300):
cached = r.get(key)
if cached:
return json.loads(cached)
with _locks_mutex:
if key not in _locks:
_locks[key] = threading.Lock()
lock = _locks[key]
with lock:
cached = r.get(key) # Re-check after acquiring lock
if cached:
return json.loads(cached)
value = fetch_fn()
r.setex(key, ttl, json.dumps(value))
return valueNote: for multi-process deployments, replace the in-process lock with/acquire_lockfrom the Distributed Locks section above.release_lock
python
import threading
_locks: dict[str, threading.Lock] = {}
_locks_mutex = threading.Lock()
def get_with_lock(key: str, fetch_fn, ttl: int = 300):
cached = r.get(key)
if cached:
return json.loads(cached)
with _locks_mutex:
if key not in _locks:
_locks[key] = threading.Lock()
lock = _locks[key]
with lock:
cached = r.get(key) # 获取锁后再次检查
if cached:
return json.loads(cached)
value = fetch_fn()
r.setex(key, ttl, json.dumps(value))
return value注意:多进程部署场景下,将进程内锁替换为分布式锁章节中的/acquire_lock。release_lock
Examples
示例
Add caching to a Django/Flask API endpoint:
Use cache-aside with and a 5-minute TTL on the response. Key on the request parameters.
setexRate-limit an API by user:
Use fixed-window with for low-traffic endpoints; use sliding-window Lua for accurate per-user throttling.
pipeline(transaction=True)Coordinate a background job across workers:
Use with a TTL that exceeds the expected job duration. Always release in a block.
acquire_lockfinallyFan-out notifications to multiple subscribers:
Use Pub/Sub for fire-and-forget. Switch to Streams if you need guaranteed delivery or replay for late consumers.
为Django/Flask API端点添加缓存:
使用缓存旁路模式,结合设置5分钟TTL,根据请求参数生成键名。
setex按用户限流API:
低流量端点使用固定窗口模式+;需要精确用户限流时使用滑动窗口Lua脚本。
pipeline(transaction=True)跨Worker协调后台任务:
使用,设置超过预期任务时长的TTL。始终在块中释放锁。
acquire_lockfinally向多个订阅者广播通知:
使用Pub/Sub实现即发即弃。如果需要保障投递或为延迟消费者提供重放功能,请切换到Streams。
Quick Reference
速查参考
| Pattern | When to Use |
|---|---|
| Cache-aside | Read-heavy, tolerate slight staleness |
| Write-through | Strong consistency required |
| Distributed lock | Prevent concurrent access to a resource |
| Sliding window rate limit | Accurate per-user throttling |
| Redis Streams | Durable event queue with consumer groups |
| Pub/Sub | Broadcast with no delivery guarantees needed |
| Sorted Set leaderboard | Ranked scoring, pagination |
| HyperLogLog | Approximate unique count at low memory |
| 模式 | 适用场景 |
|---|---|
| 缓存旁路 | 读密集型,可容忍轻微数据过期 |
| 写穿透缓存 | 需要强一致性 |
| 分布式锁 | 防止资源并发访问 |
| 滑动窗口限流 | 精确的用户限流 |
| Redis Streams | 带消费者组的持久化事件队列 |
| Pub/Sub | 无需投递保障的广播场景 |
| 有序集合排行榜 | 排名计分、分页 |
| HyperLogLog | 低内存消耗下的近似独立计数 |
Related
相关资源
- Skill: — relational data patterns
postgres-patterns - Skill: — API and service layer patterns
backend-patterns - Skill: — schema versioning
database-migrations - Skill: — Django cache framework integration
django-patterns - Agent: — full database review workflow
database-reviewer
- Skill: — 关系型数据模式
postgres-patterns - Skill: — API与服务层模式
backend-patterns - Skill: — schema版本控制
database-migrations - Skill: — Django缓存框架集成
django-patterns - Agent: — 完整数据库审查工作流
database-reviewer