redis-patterns

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Redis Patterns

Redis 模式

Quick reference for Redis best practices across common backend use cases.
面向常见后端用例的Redis最佳实践速查手册。

How It Works

工作原理

Redis is an in-memory data structure store that supports strings, hashes, lists, sets, sorted sets, streams, and more. Individual Redis commands are atomic on a single instance; multi-step workflows require Lua scripts, MULTI/EXEC transactions, or explicit synchronization to stay atomic. Data is optionally persisted via RDB snapshots or AOF logs. Clients communicate over TCP using the RESP protocol; connection pools are essential to avoid per-request handshake overhead.
Redis是一款内存数据结构存储,支持字符串、哈希、列表、集合、有序集合、流等多种类型。单个Redis实例上的独立命令是原子性的;多步骤工作流需要Lua脚本、MULTI/EXEC事务或显式同步来保持原子性。数据可通过RDB快照或AOF日志选择性持久化。客户端通过TCP使用RESP协议进行通信;连接池对于避免每次请求的握手开销至关重要。

When to Activate

适用场景

  • Adding caching to an application
  • Implementing rate limiting or throttling
  • Building distributed locks or coordination
  • Setting up session or token storage
  • Using Pub/Sub or Redis Streams for messaging
  • Configuring Redis in production (pooling, eviction, clustering)
  • 为应用添加缓存功能
  • 实现限流或流量控制
  • 构建分布式锁或协调机制
  • 设置会话或令牌存储
  • 使用Pub/Sub或Redis Streams进行消息传递
  • 生产环境中配置Redis(连接池、淘汰策略、集群)

Data Structure Cheat Sheet

数据结构速查表

Use CaseStructureExample Key
Simple cacheString
product:123
User sessionHash
session:abc
LeaderboardSorted Set
scores:weekly
Unique visitorsSet
visitors:2024-01-01
Activity feedList
feed:user:456
Event streamStream
events:orders
Counters / rate limitsString (INCR)
ratelimit:user:123
Bloom filter / HLLHyperLogLog
hll:pageviews
用例数据结构示例键名
简单缓存字符串
product:123
用户会话哈希
session:abc
排行榜有序集合
scores:weekly
独立访客统计集合
visitors:2024-01-01
活动信息流列表
feed:user:456
事件流
events:orders
计数器/限流字符串(INCR命令)
ratelimit:user:123
布隆过滤器/基数统计HyperLogLog
hll:pageviews

Core Patterns

核心模式

Cache-Aside (Lazy Loading)

缓存旁路(懒加载)

python
import redis
import json

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

def get_product(product_id: int):
    cache_key = f"product:{product_id}"
    cached = r.get(cache_key)

    if cached:
        return json.loads(cached)

    product = db.query("SELECT * FROM products WHERE id = %s", product_id)
    r.setex(cache_key, 3600, json.dumps(product))  # TTL: 1 hour
    return product
python
import redis
import json

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

def get_product(product_id: int):
    cache_key = f"product:{product_id}"
    cached = r.get(cache_key)

    if cached:
        return json.loads(cached)

    product = db.query("SELECT * FROM products WHERE id = %s", product_id)
    r.setex(cache_key, 3600, json.dumps(product))  # TTL: 1 hour
    return product

Write-Through Cache

写穿透缓存

python
def update_product(product_id: int, data: dict):
    # Write to DB first
    db.execute("UPDATE products SET ... WHERE id = %s", product_id)

    # Immediately update cache
    cache_key = f"product:{product_id}"
    r.setex(cache_key, 3600, json.dumps(data))
python
def update_product(product_id: int, data: dict):
    # 先写入数据库
    db.execute("UPDATE products SET ... WHERE id = %s", product_id)

    # 立即更新缓存
    cache_key = f"product:{product_id}"
    r.setex(cache_key, 3600, json.dumps(data))

Cache Invalidation

缓存失效

python
undefined
python
undefined

Tag-based invalidation — group related keys under a set

基于标签的失效——将相关键分组存储在集合中

def cache_product(product_id: int, category_id: int, data: dict): key = f"product:{product_id}" tag = f"tag:category:{category_id}" pipe = r.pipeline(transaction=True) pipe.setex(key, 3600, json.dumps(data)) pipe.sadd(tag, key) pipe.expire(tag, 3600) pipe.execute()
def invalidate_category(category_id: int): tag = f"tag:category:{category_id}" keys = r.smembers(tag) if keys: r.delete(*keys) r.delete(tag)
undefined
def cache_product(product_id: int, category_id: int, data: dict): key = f"product:{product_id}" tag = f"tag:category:{category_id}" pipe = r.pipeline(transaction=True) pipe.setex(key, 3600, json.dumps(data)) pipe.sadd(tag, key) pipe.expire(tag, 3600) pipe.execute()
def invalidate_category(category_id: int): tag = f"tag:category:{category_id}" keys = r.smembers(tag) if keys: r.delete(*keys) r.delete(tag)
undefined

Session Storage

会话存储

python
import time
import uuid

def create_session(user_id: int, ttl: int = 86400) -> str:
    session_id = str(uuid.uuid4())
    key = f"session:{session_id}"
    pipe = r.pipeline(transaction=True)
    pipe.hset(key, mapping={
        "user_id": user_id,
        "created_at": int(time.time()),
    })
    pipe.expire(key, ttl)
    pipe.execute()
    return session_id

def get_session(session_id: str) -> dict | None:
    data = r.hgetall(f"session:{session_id}")
    return data if data else None

def delete_session(session_id: str):
    r.delete(f"session:{session_id}")
python
import time
import uuid

def create_session(user_id: int, ttl: int = 86400) -> str:
    session_id = str(uuid.uuid4())
    key = f"session:{session_id}"
    pipe = r.pipeline(transaction=True)
    pipe.hset(key, mapping={
        "user_id": user_id,
        "created_at": int(time.time()),
    })
    pipe.expire(key, ttl)
    pipe.execute()
    return session_id

def get_session(session_id: str) -> dict | None:
    data = r.hgetall(f"session:{session_id}")
    return data if data else None

def delete_session(session_id: str):
    r.delete(f"session:{session_id}")

Rate Limiting

限流

Fixed Window (Simple)

固定窗口(简单版)

python
def is_rate_limited(user_id: int, limit: int = 100, window: int = 60) -> bool:
    key = f"ratelimit:{user_id}:{int(time.time()) // window}"
    pipe = r.pipeline(transaction=True)
    pipe.incr(key)
    pipe.expire(key, window)
    count, _ = pipe.execute()
    return count > limit
python
def is_rate_limited(user_id: int, limit: int = 100, window: int = 60) -> bool:
    key = f"ratelimit:{user_id}:{int(time.time()) // window}"
    pipe = r.pipeline(transaction=True)
    pipe.incr(key)
    pipe.expire(key, window)
    count, _ = pipe.execute()
    return count > limit

Sliding Window (Lua — Atomic)

滑动窗口(Lua脚本——原子性)

lua
-- sliding_window.lua
local key = KEYS[1]
local now = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local limit = tonumber(ARGV[3])

redis.call('ZREMRANGEBYSCORE', key, 0, now - window)
local count = redis.call('ZCARD', key)

if count < limit then
    -- Use unique member (now + sequence) to avoid collisions within the same millisecond
    local seq_key = key .. ':seq'
    local seq = redis.call('INCR', seq_key)
    redis.call('EXPIRE', seq_key, math.ceil(window / 1000))
    redis.call('ZADD', key, now, now .. '-' .. seq)
    redis.call('EXPIRE', key, math.ceil(window / 1000))
    return 1
end
return 0
python
sliding_window = r.register_script(open('sliding_window.lua').read())

def allow_request(user_id: int) -> bool:
    key = f"ratelimit:sliding:{user_id}"
    now = int(time.time() * 1000)
    return bool(sliding_window(keys=[key], args=[now, 60000, 100]))
lua
-- sliding_window.lua
local key = KEYS[1]
local now = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local limit = tonumber(ARGV[3])

redis.call('ZREMRANGEBYSCORE', key, 0, now - window)
local count = redis.call('ZCARD', key)

if count < limit then
    -- 使用唯一成员(now + 序列号)避免同一毫秒内的冲突
    local seq_key = key .. ':seq'
    local seq = redis.call('INCR', seq_key)
    redis.call('EXPIRE', seq_key, math.ceil(window / 1000))
    redis.call('ZADD', key, now, now .. '-' .. seq)
    redis.call('EXPIRE', key, math.ceil(window / 1000))
    return 1
end
return 0
python
sliding_window = r.register_script(open('sliding_window.lua').read())

def allow_request(user_id: int) -> bool:
    key = f"ratelimit:sliding:{user_id}"
    now = int(time.time() * 1000)
    return bool(sliding_window(keys=[key], args=[now, 60000, 100]))

Distributed Locks

分布式锁

Distributed Lock (Single Node — SET NX PX)

分布式锁(单节点——SET NX PX)

python
import uuid

def acquire_lock(resource: str, ttl_ms: int = 5000) -> str | None:
    lock_key = f"lock:{resource}"
    token = str(uuid.uuid4())
    acquired = r.set(lock_key, token, px=ttl_ms, nx=True)
    return token if acquired else None

def release_lock(resource: str, token: str) -> bool:
    release_script = """
    if redis.call('get', KEYS[1]) == ARGV[1] then
        return redis.call('del', KEYS[1])
    else
        return 0
    end
    """
    result = r.eval(release_script, 1, f"lock:{resource}", token)
    return bool(result)
python
import uuid

def acquire_lock(resource: str, ttl_ms: int = 5000) -> str | None:
    lock_key = f"lock:{resource}"
    token = str(uuid.uuid4())
    acquired = r.set(lock_key, token, px=ttl_ms, nx=True)
    return token if acquired else None

def release_lock(resource: str, token: str) -> bool:
    release_script = """
    if redis.call('get', KEYS[1]) == ARGV[1] then
        return redis.call('del', KEYS[1])
    else
        return 0
    end
    """
    result = r.eval(release_script, 1, f"lock:{resource}", token)
    return bool(result)

Usage

使用示例

token = acquire_lock("order:payment:123") if token: try: process_payment() finally: release_lock("order:payment:123", token)

> For multi-node setups use the `redlock-py` library which implements the full Redlock algorithm.
token = acquire_lock("order:payment:123") if token: try: process_payment() finally: release_lock("order:payment:123", token)

> 多节点部署场景下,请使用实现完整Redlock算法的`redlock-py`库。

Pub/Sub & Streams

发布/订阅与流

Pub/Sub (Fire-and-Forget)

发布/订阅(即发即弃)

python
undefined
python
undefined

Publisher

发布者

def publish_event(channel: str, payload: dict): r.publish(channel, json.dumps(payload))
def publish_event(channel: str, payload: dict): r.publish(channel, json.dumps(payload))

Subscriber (blocking — run in separate thread/process)

订阅者(阻塞式——在独立线程/进程中运行)

def subscribe_events(channel: str): pubsub = r.pubsub() pubsub.subscribe(channel) for message in pubsub.listen(): if message['type'] == 'message': handle(json.loads(message['data']))
undefined
def subscribe_events(channel: str): pubsub = r.pubsub() pubsub.subscribe(channel) for message in pubsub.listen(): if message['type'] == 'message': handle(json.loads(message['data']))
undefined

Redis Streams (Durable Queue)

Redis Streams(持久化队列)

python
undefined
python
undefined

Producer

生产者

def emit(stream: str, event: dict): r.xadd(stream, event, maxlen=10000) # Cap stream length
def emit(stream: str, event: dict): r.xadd(stream, event, maxlen=10000) # 限制流长度

Consumer group — guarantees at-least-once delivery

消费者组——保证至少一次投递

try: r.xgroup_create('events:orders', 'processor', id='0', mkstream=True) except Exception: pass # Group already exists
def consume(stream: str, group: str, consumer: str): while True: messages = r.xreadgroup(group, consumer, {stream: '>'}, count=10, block=2000) for _, entries in (messages or []): for msg_id, data in entries: process(data) r.xack(stream, group, msg_id)

> Prefer **Streams** over Pub/Sub when you need delivery guarantees, consumer groups, or replay.
try: r.xgroup_create('events:orders', 'processor', id='0', mkstream=True) except Exception: pass # 组已存在
def consume(stream: str, group: str, consumer: str): while True: messages = r.xreadgroup(group, consumer, {stream: '>'}, count=10, block=2000) for _, entries in (messages or []): for msg_id, data in entries: process(data) r.xack(stream, group, msg_id)

> 当你需要投递保障、消费者组或重放功能时,优先选择**Streams**而非Pub/Sub。

Key Design

键设计

Naming Conventions

命名规范

undefined
undefined

Pattern: resource:id:field

模式:资源:ID:字段

user:123:profile order:456:status cache:product:789
user:123:profile order:456:status cache:product:789

Pattern: namespace:resource:id

模式:命名空间:资源:ID

myapp:session:abc123 myapp:ratelimit:user:123
myapp:session:abc123 myapp:ratelimit:user:123

Pattern: resource:date (time-bound keys)

模式:资源:日期(时间绑定键)

stats:pageviews:2024-01-01
undefined
stats:pageviews:2024-01-01
undefined

TTL Strategy

TTL策略

Data TypeSuggested TTL
User session24h (
86400
)
API response cache5–15 min
Rate limit windowMatch window size
Short-lived tokens5–10 min
Leaderboard1h–24h
Static/reference data1h–1 week
Always set a TTL. Keys without TTL accumulate indefinitely and cause memory pressure.
数据类型建议TTL
用户会话24小时 (
86400
)
API响应缓存5–15分钟
限流窗口匹配窗口大小
短期令牌5–10分钟
排行榜1小时–24小时
静态/参考数据1小时–1周
始终设置TTL。没有TTL的键会无限累积,导致内存压力。

Connection Management

连接管理

Connection Pooling

连接池

python
from redis import ConnectionPool, Redis

pool = ConnectionPool(
    host='localhost',
    port=6379,
    db=0,
    max_connections=20,
    decode_responses=True,
    socket_connect_timeout=2,
    socket_timeout=2,
)

r = Redis(connection_pool=pool)
python
from redis import ConnectionPool, Redis

pool = ConnectionPool(
    host='localhost',
    port=6379,
    db=0,
    max_connections=20,
    decode_responses=True,
    socket_connect_timeout=2,
    socket_timeout=2,
)

r = Redis(connection_pool=pool)

Cluster Mode

集群模式

python
from redis.cluster import RedisCluster

r = RedisCluster(
    startup_nodes=[{"host": "redis-1", "port": 6379}],
    decode_responses=True,
    skip_full_coverage_check=True,
)
python
from redis.cluster import RedisCluster

r = RedisCluster(
    startup_nodes=[{"host": "redis-1", "port": 6379}],
    decode_responses=True,
    skip_full_coverage_check=True,
)

Sentinel (High Availability)

Sentinel(高可用)

python
from redis.sentinel import Sentinel

sentinel = Sentinel(
    [('sentinel-1', 26379), ('sentinel-2', 26379)],
    socket_timeout=0.5,
)
master = sentinel.master_for('mymaster', decode_responses=True)
replica = sentinel.slave_for('mymaster', decode_responses=True)
python
from redis.sentinel import Sentinel

sentinel = Sentinel(
    [('sentinel-1', 26379), ('sentinel-2', 26379)],
    socket_timeout=0.5,
)
master = sentinel.master_for('mymaster', decode_responses=True)
replica = sentinel.slave_for('mymaster', decode_responses=True)

Eviction Policies

淘汰策略

PolicyBehaviorBest For
noeviction
Error on write when fullQueues / critical data
allkeys-lru
Evict least recently usedGeneral cache
volatile-lru
LRU only among keys with TTLMixed data store
allkeys-lfu
Evict least frequently usedSkewed access patterns
volatile-ttl
Evict soonest-to-expirePrioritize long-lived data
Set via
redis.conf
:
maxmemory-policy allkeys-lru
策略行为适用场景
noeviction
内存满时写入报错队列/关键数据
allkeys-lru
淘汰最近最少使用的键通用缓存
volatile-lru
仅淘汰带TTL的键中最近最少使用的混合数据存储
allkeys-lfu
淘汰最不常使用的键访问模式倾斜的场景
volatile-ttl
淘汰即将过期的键优先保留长期数据
通过
redis.conf
设置:
maxmemory-policy allkeys-lru

Anti-Patterns

反模式

Anti-PatternProblemFix
Keys with no TTLMemory grows unboundedAlways set TTL
KEYS *
in production
Blocks the server (O(N))Use
SCAN
cursor
Storing large blobs (>100KB)Slow serialization, memory pressureStore reference + fetch from object store
Single Redis for everythingNo isolation between cache & queueUse separate DBs or instances
Ignoring connection pool limitsConnection exhaustion under loadSize pool to workload
Not handling cache miss stampedeThundering herd on cold startUse locks or probabilistic early expiry
FLUSHALL
without thought
Wipes entire instanceScope deletes by key pattern
反模式问题解决方案
键未设置TTL内存无限增长始终设置TTL
生产环境使用
KEYS *
阻塞服务器(O(N)复杂度)使用
SCAN
游标
存储大对象(>100KB)序列化缓慢,内存压力大存储引用,从对象存储中获取
单一Redis实例处理所有业务缓存与队列无隔离使用独立数据库或实例
忽略连接池限制高负载下连接耗尽根据工作负载调整池大小
未处理缓存击穿冷启动时出现惊群效应使用锁或概率性提前过期
随意使用
FLUSHALL
清空整个实例按键模式范围删除

Cache Miss Stampede Prevention

缓存击穿预防

python
import threading

_locks: dict[str, threading.Lock] = {}
_locks_mutex = threading.Lock()

def get_with_lock(key: str, fetch_fn, ttl: int = 300):
    cached = r.get(key)
    if cached:
        return json.loads(cached)

    with _locks_mutex:
        if key not in _locks:
            _locks[key] = threading.Lock()
        lock = _locks[key]
    with lock:
        cached = r.get(key)  # Re-check after acquiring lock
        if cached:
            return json.loads(cached)
        value = fetch_fn()
        r.setex(key, ttl, json.dumps(value))
        return value
Note: for multi-process deployments, replace the in-process lock with
acquire_lock
/
release_lock
from the Distributed Locks section above.
python
import threading

_locks: dict[str, threading.Lock] = {}
_locks_mutex = threading.Lock()

def get_with_lock(key: str, fetch_fn, ttl: int = 300):
    cached = r.get(key)
    if cached:
        return json.loads(cached)

    with _locks_mutex:
        if key not in _locks:
            _locks[key] = threading.Lock()
        lock = _locks[key]
    with lock:
        cached = r.get(key)  # 获取锁后再次检查
        if cached:
            return json.loads(cached)
        value = fetch_fn()
        r.setex(key, ttl, json.dumps(value))
        return value
注意:多进程部署场景下,将进程内锁替换为分布式锁章节中的
acquire_lock
/
release_lock

Examples

示例

Add caching to a Django/Flask API endpoint: Use cache-aside with
setex
and a 5-minute TTL on the response. Key on the request parameters.
Rate-limit an API by user: Use fixed-window with
pipeline(transaction=True)
for low-traffic endpoints; use sliding-window Lua for accurate per-user throttling.
Coordinate a background job across workers: Use
acquire_lock
with a TTL that exceeds the expected job duration. Always release in a
finally
block.
Fan-out notifications to multiple subscribers: Use Pub/Sub for fire-and-forget. Switch to Streams if you need guaranteed delivery or replay for late consumers.
为Django/Flask API端点添加缓存: 使用缓存旁路模式,结合
setex
设置5分钟TTL,根据请求参数生成键名。
按用户限流API: 低流量端点使用固定窗口模式+
pipeline(transaction=True)
;需要精确用户限流时使用滑动窗口Lua脚本。
跨Worker协调后台任务: 使用
acquire_lock
,设置超过预期任务时长的TTL。始终在
finally
块中释放锁。
向多个订阅者广播通知: 使用Pub/Sub实现即发即弃。如果需要保障投递或为延迟消费者提供重放功能,请切换到Streams。

Quick Reference

速查参考

PatternWhen to Use
Cache-asideRead-heavy, tolerate slight staleness
Write-throughStrong consistency required
Distributed lockPrevent concurrent access to a resource
Sliding window rate limitAccurate per-user throttling
Redis StreamsDurable event queue with consumer groups
Pub/SubBroadcast with no delivery guarantees needed
Sorted Set leaderboardRanked scoring, pagination
HyperLogLogApproximate unique count at low memory
模式适用场景
缓存旁路读密集型,可容忍轻微数据过期
写穿透缓存需要强一致性
分布式锁防止资源并发访问
滑动窗口限流精确的用户限流
Redis Streams带消费者组的持久化事件队列
Pub/Sub无需投递保障的广播场景
有序集合排行榜排名计分、分页
HyperLogLog低内存消耗下的近似独立计数

Related

相关资源

  • Skill:
    postgres-patterns
    — relational data patterns
  • Skill:
    backend-patterns
    — API and service layer patterns
  • Skill:
    database-migrations
    — schema versioning
  • Skill:
    django-patterns
    — Django cache framework integration
  • Agent:
    database-reviewer
    — full database review workflow
  • Skill:
    postgres-patterns
    — 关系型数据模式
  • Skill:
    backend-patterns
    — API与服务层模式
  • Skill:
    database-migrations
    — schema版本控制
  • Skill:
    django-patterns
    — Django缓存框架集成
  • Agent:
    database-reviewer
    — 完整数据库审查工作流