database-patterns

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Database Patterns

数据库设计模式

Core Principles

核心原则

  • PostgreSQL Primary — Relational data, transactions, complex queries
  • Redis Secondary — Caching, sessions, real-time data
  • Index-First Design — Design queries before indexes
  • JSONB Sparingly — Structured data prefers columns
  • Cache-Aside Default — Read-through, write-around
  • Tiered Storage — Hot/Warm/Cold data separation
  • No backwards compatibility — Migrate data, don't keep legacy schemas

  • PostgreSQL 主库 — 关系型数据、事务处理、复杂查询
  • Redis 辅库 — 缓存、会话、实时数据
  • 索引优先设计 — 先设计查询逻辑再创建索引
  • 谨慎使用JSONB — 结构化数据优先使用列存储
  • 默认采用旁路缓存 — 读穿、写绕策略
  • 分层存储 — 热/温/冷数据分离
  • 不兼容旧版本 — 迁移数据,不保留遗留模式

PostgreSQL

PostgreSQL

Data Type Selection

数据类型选择

Use CaseTypeAvoid
Primary Key
UUID
/
BIGSERIAL
INT
(range limits)
Timestamps
TIMESTAMPTZ
TIMESTAMP
(no timezone)
Money
NUMERIC(19,4)
FLOAT
(precision loss)
Status
TEXT
+ CHECK
INT
(unreadable)
Semi-structured
JSONB
JSON
(no indexing)
Full-text
TSVECTOR
LIKE '%..%'
使用场景类型避免使用
主键
UUID
/
BIGSERIAL
INT
(存在范围限制)
时间戳
TIMESTAMPTZ
TIMESTAMP
(无时区信息)
金额
NUMERIC(19,4)
FLOAT
(精度丢失)
状态字段
TEXT
+ CHECK约束
INT
(可读性差)
半结构化数据
JSONB
JSON
(不支持索引)
全文检索
TSVECTOR
LIKE '%..%'
(性能差)

Schema Design

模式设计

sql
-- Use UUID for distributed-friendly IDs
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";

CREATE TABLE users (
  id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
  email TEXT UNIQUE NOT NULL,
  name TEXT NOT NULL,
  status TEXT NOT NULL DEFAULT 'active'
    CHECK (status IN ('active', 'inactive', 'suspended')),
  metadata JSONB DEFAULT '{}',
  created_at TIMESTAMPTZ DEFAULT NOW(),
  updated_at TIMESTAMPTZ DEFAULT NOW()
);

-- Updated timestamp trigger
CREATE OR REPLACE FUNCTION update_updated_at()
RETURNS TRIGGER AS $$
BEGIN
  NEW.updated_at = NOW();
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER users_updated_at
  BEFORE UPDATE ON users
  FOR EACH ROW
  EXECUTE FUNCTION update_updated_at();
sql
-- 使用UUID实现分布式友好的ID
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";

CREATE TABLE users (
  id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
  email TEXT UNIQUE NOT NULL,
  name TEXT NOT NULL,
  status TEXT NOT NULL DEFAULT 'active'
    CHECK (status IN ('active', 'inactive', 'suspended')),
  metadata JSONB DEFAULT '{}',
  created_at TIMESTAMPTZ DEFAULT NOW(),
  updated_at TIMESTAMPTZ DEFAULT NOW()
);

-- 更新时间戳触发器
CREATE OR REPLACE FUNCTION update_updated_at()
RETURNS TRIGGER AS $$
BEGIN
  NEW.updated_at = NOW();
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER users_updated_at
  BEFORE UPDATE ON users
  FOR EACH ROW
  EXECUTE FUNCTION update_updated_at();

Indexing Strategy

索引策略

sql
-- B-Tree: Equality, range, sorting (default)
CREATE INDEX idx_users_email ON users(email);

-- Composite: Leftmost prefix rule
-- Supports: (user_id), (user_id, created_at)
-- Does NOT support: (created_at) alone
CREATE INDEX idx_orders_user_date ON orders(user_id, created_at DESC);

-- Partial: Reduce index size
CREATE INDEX idx_active_users ON users(email)
  WHERE status = 'active';

-- GIN for JSONB: Containment queries
CREATE INDEX idx_metadata ON users USING GIN (metadata jsonb_path_ops);

-- Expression: Specific JSONB field
CREATE INDEX idx_user_role ON users ((metadata->>'role'));

-- Full-text search
CREATE INDEX idx_search ON products USING GIN (to_tsvector('english', name || ' ' || description));
sql
-- B-树:等值查询、范围查询、排序(默认类型)
CREATE INDEX idx_users_email ON users(email);

-- 复合索引:最左前缀规则
-- 支持查询:(user_id), (user_id, created_at)
-- 不支持单独查询:(created_at)
CREATE INDEX idx_orders_user_date ON orders(user_id, created_at DESC);

-- 部分索引:减小索引体积
CREATE INDEX idx_active_users ON users(email)
  WHERE status = 'active';

-- GIN索引(JSONB):包含查询
CREATE INDEX idx_metadata ON users USING GIN (metadata jsonb_path_ops);

-- 表达式索引:针对JSONB特定字段
CREATE INDEX idx_user_role ON users ((metadata->>'role'));

-- 全文检索索引
CREATE INDEX idx_search ON products USING GIN (to_tsvector('english', name || ' ' || description));

JSONB Usage

JSONB 使用方式

sql
-- Good: Dynamic attributes, rarely queried fields
CREATE TABLE products (
  id UUID PRIMARY KEY,
  name TEXT NOT NULL,
  price NUMERIC(19,4) NOT NULL,
  category TEXT NOT NULL,           -- Extracted: frequently queried
  attributes JSONB DEFAULT '{}'     -- Dynamic: color, size, specs
);

-- Query with containment
SELECT * FROM products
WHERE category = 'electronics'              -- B-Tree index
  AND attributes @> '{"brand": "Apple"}';   -- GIN index

-- Query specific field
SELECT * FROM products
WHERE attributes->>'color' = 'black';       -- Expression index

-- Update JSONB field
UPDATE products
SET attributes = attributes || '{"featured": true}'
WHERE id = '...';
sql
-- 合理用法:动态属性、极少查询的字段
CREATE TABLE products (
  id UUID PRIMARY KEY,
  name TEXT NOT NULL,
  price NUMERIC(19,4) NOT NULL,
  category TEXT NOT NULL,           -- 提取为独立列:高频查询字段
  attributes JSONB DEFAULT '{}'     -- 动态字段:颜色、尺寸、规格
);

-- 包含查询
SELECT * FROM products
WHERE category = 'electronics'              -- B-树索引
  AND attributes @> '{"brand": "Apple"}';   -- GIN索引

-- 特定字段查询
SELECT * FROM products
WHERE attributes->>'color' = 'black';       -- 表达式索引

-- 更新JSONB字段
UPDATE products
SET attributes = attributes || '{"featured": true}'
WHERE id = '...';

Query Optimization

查询优化

sql
-- Always use EXPLAIN ANALYZE
EXPLAIN ANALYZE
SELECT u.*, COUNT(o.id) as order_count
FROM users u
LEFT JOIN orders o ON o.user_id = u.id
WHERE u.status = 'active'
GROUP BY u.id
ORDER BY u.created_at DESC
LIMIT 20;

-- Watch for:
-- ❌ Seq Scan on large tables → Add index
-- ❌ Sort → Use index for ordering
-- ❌ Nested Loop with many rows → Consider JOIN order
-- ❌ Hash Join on huge tables → Add indexes
sql
-- 始终使用EXPLAIN ANALYZE分析查询
EXPLAIN ANALYZE
SELECT u.*, COUNT(o.id) as order_count
FROM users u
LEFT JOIN orders o ON o.user_id = u.id
WHERE u.status = 'active'
GROUP BY u.id
ORDER BY u.created_at DESC
LIMIT 20;

-- 需要关注的问题:
-- ❌ 大表全表扫描 → 添加索引
-- ❌ 排序操作 → 使用索引排序
-- ❌ 大量数据的嵌套循环 → 调整JOIN顺序
-- ❌ 超大表的哈希连接 → 添加索引

Connection Pooling

连接池配置

typescript
// PgBouncer or built-in pool
import { Pool } from 'pg';

const pool = new Pool({
  max: 20,                      // Max connections
  idleTimeoutMillis: 30000,     // Close idle connections
  connectionTimeoutMillis: 2000, // Fail fast
});

// Connection count formula:
// connections = (cores * 2) + effective_spindle_count
// Usually 10-30 is enough

typescript
// 使用PgBouncer或内置连接池
import { Pool } from 'pg';

const pool = new Pool({
  max: 20,                      // 最大连接数
  idleTimeoutMillis: 30000,     // 空闲连接关闭超时
  connectionTimeoutMillis: 2000, // 连接超时时间(快速失败)
});

// 连接数计算公式:
// connections = (CPU核心数 * 2) + 有效磁盘数
// 通常10-30个连接足够

Redis

Redis

Data Structure Selection

数据结构选择

Use CaseStructureExample
Cache objectsString
user:123
→ JSON
CountersString + INCR
views:article:456
SessionsHash
session:abc
→ {userId, ...}
LeaderboardsSorted Set
scores
→ {userId: score}
QueuesList/Stream
tasks
→ LPUSH/RPOP
Unique setsSet
online_users
Real-timePub/Sub/StreamNotifications
使用场景数据结构示例
缓存对象String
user:123
→ JSON格式
计数器String + INCR
views:article:456
会话存储Hash
session:abc
→ {userId, ...}
排行榜Sorted Set
scores
→ {userId: score}
队列List/Stream
tasks
→ LPUSH/RPOP
唯一集合Set
online_users
实时场景Pub/Sub/Stream通知功能

Key Naming

键命名规范

undefined
undefined

Format: <entity>:<id>:<attribute>

格式:<实体>:<ID>:<属性>

user:123:profile user:123:settings order:456:items session:abc123
user:123:profile user:123:settings order:456:items session:abc123

Use colons for hierarchy

使用冒号实现层级结构

Enables pattern matching with SCAN

支持SCAN命令的模式匹配

SCAN 0 MATCH "user:*:profile" COUNT 100
undefined
SCAN 0 MATCH "user:*:profile" COUNT 100
undefined

TTL Strategy

TTL策略

typescript
const TTL = {
  SESSION: 24 * 60 * 60,      // 24 hours
  CACHE: 15 * 60,             // 15 minutes
  RATE_LIMIT: 60,             // 1 minute
  LOCK: 30,                   // 30 seconds
};

// Set with TTL
await redis.set(`cache:user:${id}`, JSON.stringify(user), 'EX', TTL.CACHE);

// Check TTL
const remaining = await redis.ttl(`cache:user:${id}`);

typescript
const TTL = {
  SESSION: 24 * 60 * 60,      // 24小时
  CACHE: 15 * 60,             // 15分钟
  RATE_LIMIT: 60,             // 1分钟
  LOCK: 30,                   // 30秒
};

// 设置带TTL的缓存
await redis.set(`cache:user:${id}`, JSON.stringify(user), 'EX', TTL.CACHE);

// 检查剩余TTL
const remaining = await redis.ttl(`cache:user:${id}`);

Caching Patterns

缓存模式

Cache-Aside (Lazy Loading)

旁路缓存(懒加载)

typescript
async function getUser(id: string): Promise<User> {
  const cacheKey = `user:${id}`;

  // 1. Check cache
  const cached = await redis.get(cacheKey);
  if (cached) {
    return JSON.parse(cached);
  }

  // 2. Cache miss → Query database
  const user = await db.user.findUnique({ where: { id } });
  if (!user) {
    throw new NotFoundError('User not found');
  }

  // 3. Populate cache
  await redis.set(cacheKey, JSON.stringify(user), 'EX', 900);

  return user;
}
typescript
async function getUser(id: string): Promise<User> {
  const cacheKey = `user:${id}`;

  // 1. 检查缓存
  const cached = await redis.get(cacheKey);
  if (cached) {
    return JSON.parse(cached);
  }

  // 2. 缓存未命中 → 查询数据库
  const user = await db.user.findUnique({ where: { id } });
  if (!user) {
    throw new NotFoundError('用户不存在');
  }

  // 3. 写入缓存
  await redis.set(cacheKey, JSON.stringify(user), 'EX', 900);

  return user;
}

Write-Through

写穿缓存

typescript
async function updateUser(id: string, data: UpdateInput): Promise<User> {
  // 1. Update database
  const user = await db.user.update({
    where: { id },
    data,
  });

  // 2. Update cache immediately
  await redis.set(`user:${id}`, JSON.stringify(user), 'EX', 900);

  return user;
}
typescript
async function updateUser(id: string, data: UpdateInput): Promise<User> {
  // 1. 更新数据库
  const user = await db.user.update({
    where: { id },
    data,
  });

  // 2. 立即更新缓存
  await redis.set(`user:${id}`, JSON.stringify(user), 'EX', 900);

  return user;
}

Cache Invalidation

缓存失效

typescript
async function deleteUser(id: string): Promise<void> {
  // 1. Delete from database
  await db.user.delete({ where: { id } });

  // 2. Invalidate cache
  await redis.del(`user:${id}`);

  // 3. Invalidate related caches
  const keys = await redis.keys(`user:${id}:*`);
  if (keys.length > 0) {
    await redis.del(...keys);
  }
}
typescript
async function deleteUser(id: string): Promise<void> {
  // 1. 从数据库删除
  await db.user.delete({ where: { id } });

  // 2. 失效缓存
  await redis.del(`user:${id}`);

  // 3. 失效相关缓存
  const keys = await redis.keys(`user:${id}:*`);
  if (keys.length > 0) {
    await redis.del(...keys);
  }
}

Cache Stampede Prevention

缓存击穿防护

typescript
async function getUserWithLock(id: string): Promise<User> {
  const cacheKey = `user:${id}`;
  const lockKey = `lock:user:${id}`;

  // Check cache
  const cached = await redis.get(cacheKey);
  if (cached) {
    return JSON.parse(cached);
  }

  // Try to acquire lock
  const acquired = await redis.set(lockKey, '1', 'EX', 10, 'NX');

  if (!acquired) {
    // Another process is loading, wait and retry
    await sleep(100);
    return getUserWithLock(id);
  }

  try {
    // Double-check cache (another process might have populated it)
    const rechecked = await redis.get(cacheKey);
    if (rechecked) {
      return JSON.parse(rechecked);
    }

    // Load from database
    const user = await db.user.findUnique({ where: { id } });
    await redis.set(cacheKey, JSON.stringify(user), 'EX', 900);
    return user;
  } finally {
    await redis.del(lockKey);
  }
}
typescript
async function getUserWithLock(id: string): Promise<User> {
  const cacheKey = `user:${id}`;
  const lockKey = `lock:user:${id}`;

  // 检查缓存
  const cached = await redis.get(cacheKey);
  if (cached) {
    return JSON.parse(cached);
  }

  // 尝试获取锁
  const acquired = await redis.set(lockKey, '1', 'EX', 10, 'NX');

  if (!acquired) {
    // 其他进程正在加载,等待后重试
    await sleep(100);
    return getUserWithLock(id);
  }

  try {
    // 再次检查缓存(可能其他进程已写入)
    const rechecked = await redis.get(cacheKey);
    if (rechecked) {
      return JSON.parse(rechecked);
    }

    // 从数据库加载
    const user = await db.user.findUnique({ where: { id } });
    await redis.set(cacheKey, JSON.stringify(user), 'EX', 900);
    return user;
  } finally {
    await redis.del(lockKey);
  }
}

Cache Penetration Prevention

缓存穿透防护

typescript
async function getUserSafe(id: string): Promise<User | null> {
  const cacheKey = `user:${id}`;

  const cached = await redis.get(cacheKey);

  // Check for cached null
  if (cached === 'NULL') {
    return null;
  }

  if (cached) {
    return JSON.parse(cached);
  }

  const user = await db.user.findUnique({ where: { id } });

  if (!user) {
    // Cache null with short TTL
    await redis.set(cacheKey, 'NULL', 'EX', 60);
    return null;
  }

  await redis.set(cacheKey, JSON.stringify(user), 'EX', 900);
  return user;
}

typescript
async function getUserSafe(id: string): Promise<User | null> {
  const cacheKey = `user:${id}`;

  const cached = await redis.get(cacheKey);

  // 检查缓存的空值标记
  if (cached === 'NULL') {
    return null;
  }

  if (cached) {
    return JSON.parse(cached);
  }

  const user = await db.user.findUnique({ where: { id } });

  if (!user) {
    // 缓存空值并设置短TTL
    await redis.set(cacheKey, 'NULL', 'EX', 60);
    return null;
  }

  await redis.set(cacheKey, JSON.stringify(user), 'EX', 900);
  return user;
}

Tiered Storage

分层存储

┌─────────────────────────────────────────────────┐
│                   Application                    │
└─────────────────────────────────────────────────┘
        ┌───────────────┼───────────────┐
        ▼               ▼               ▼
   ┌─────────┐    ┌─────────┐    ┌─────────┐
   │  Redis  │    │ Postgres │    │ Archive │
   │  (Hot)  │    │  (Warm)  │    │  (Cold) │
   └─────────┘    └─────────┘    └─────────┘

   < 1ms          ~10ms           ~100ms+
   Active data    Recent data     Historical
   Memory         SSD             Object storage
┌─────────────────────────────────────────────────┐
│                   应用层                        │
└─────────────────────────────────────────────────┘
        ┌───────────────┼───────────────┐
        ▼               ▼               ▼
   ┌─────────┐    ┌─────────┐    ┌─────────┐
   │  Redis  │    │ Postgres │    │ 归档存储 │
   │  (热数据)│    │  (温数据)│    │  (冷数据)│
   └─────────┘    └─────────┘    └─────────┘

   < 1ms          ~10ms           ~100ms+
   活跃数据        近期数据         历史数据
   内存存储        SSD存储         对象存储

Partitioning for Cold Data

冷数据分区

sql
-- Partition by date range
CREATE TABLE orders (
  id UUID NOT NULL,
  user_id UUID NOT NULL,
  total NUMERIC(19,4) NOT NULL,
  created_at TIMESTAMPTZ NOT NULL
) PARTITION BY RANGE (created_at);

-- Create partitions
CREATE TABLE orders_2025_q1 PARTITION OF orders
  FOR VALUES FROM ('2025-01-01') TO ('2025-04-01');

CREATE TABLE orders_2025_q2 PARTITION OF orders
  FOR VALUES FROM ('2025-04-01') TO ('2025-07-01');

-- Archive old data
CREATE TABLE orders_archive (LIKE orders INCLUDING ALL);

-- Move old data to archive
WITH moved AS (
  DELETE FROM orders
  WHERE created_at < NOW() - INTERVAL '1 year'
  RETURNING *
)
INSERT INTO orders_archive SELECT * FROM moved;

sql
-- 按日期范围分区
CREATE TABLE orders (
  id UUID NOT NULL,
  user_id UUID NOT NULL,
  total NUMERIC(19,4) NOT NULL,
  created_at TIMESTAMPTZ NOT NULL
) PARTITION BY RANGE (created_at);

-- 创建分区
CREATE TABLE orders_2025_q1 PARTITION OF orders
  FOR VALUES FROM ('2025-01-01') TO ('2025-04-01');

CREATE TABLE orders_2025_q2 PARTITION OF orders
  FOR VALUES FROM ('2025-04-01') TO ('2025-07-01');

-- 归档旧数据
CREATE TABLE orders_archive (LIKE orders INCLUDING ALL);

-- 将旧数据迁移到归档表
WITH moved AS (
  DELETE FROM orders
  WHERE created_at < NOW() - INTERVAL '1 year'
  RETURNING *
)
INSERT INTO orders_archive SELECT * FROM moved;

Transactions

事务处理

ACID Compliance

ACID合规性

typescript
// Use transactions for multi-table operations
async function transferFunds(fromId: string, toId: string, amount: number) {
  await db.$transaction(async (tx) => {
    // Deduct from source
    const from = await tx.account.update({
      where: { id: fromId },
      data: { balance: { decrement: amount } },
    });

    if (from.balance < 0) {
      throw new Error('Insufficient funds');
    }

    // Add to destination
    await tx.account.update({
      where: { id: toId },
      data: { balance: { increment: amount } },
    });
  });
}
typescript
-- 多表操作使用事务
async function transferFunds(fromId: string, toId: string, amount: number) {
  await db.$transaction(async (tx) => {
    // 从转出账户扣款
    const from = await tx.account.update({
      where: { id: fromId },
      data: { balance: { decrement: amount } },
    });

    if (from.balance < 0) {
      throw new Error('余额不足');
    }

    // 转入目标账户
    await tx.account.update({
      where: { id: toId },
      data: { balance: { increment: amount } },
    });
  });
}

Optimistic Locking

乐观锁

sql
-- Add version column
ALTER TABLE products ADD COLUMN version INT DEFAULT 1;

-- Update with version check
UPDATE products
SET
  stock = stock - 1,
  version = version + 1
WHERE id = $1 AND version = $2
RETURNING *;

-- If no rows returned, concurrent modification occurred

sql
-- 添加版本号列
ALTER TABLE products ADD COLUMN version INT DEFAULT 1;

-- 带版本检查的更新
UPDATE products
SET
  stock = stock - 1,
  version = version + 1
WHERE id = $1 AND version = $2
RETURNING *;

-- 如果无返回行,说明发生了并发修改

Checklist

检查清单

markdown
undefined
markdown
undefined

Schema

模式设计

  • UUID or BIGSERIAL for primary keys
  • TIMESTAMPTZ for all timestamps
  • NUMERIC for money, not FLOAT
  • CHECK constraints for enums
  • Foreign keys with ON DELETE
  • 主键使用UUID或BIGSERIAL
  • 所有时间戳使用TIMESTAMPTZ
  • 金额使用NUMERIC而非FLOAT
  • 枚举类型使用CHECK约束
  • 外键配置ON DELETE规则

Indexing

索引设计

  • Index for each WHERE clause pattern
  • Composite indexes match query order
  • GIN index for JSONB containment
  • EXPLAIN ANALYZE for slow queries
  • 为每个WHERE子句模式创建索引
  • 复合索引与查询顺序匹配
  • JSONB包含查询使用GIN索引
  • 慢查询使用EXPLAIN ANALYZE分析

Caching

缓存策略

  • Cache-aside as default pattern
  • TTL on all cached data
  • Cache invalidation on writes
  • Stampede/penetration protection
  • 默认采用旁路缓存模式
  • 所有缓存数据设置TTL
  • 写入操作时失效缓存
  • 配置击穿/穿透防护

Operations

运维配置

  • Connection pooling configured
  • Slow query logging enabled
  • Backup and recovery tested
  • Partition strategy for growth

---
  • 配置连接池
  • 启用慢查询日志
  • 测试备份与恢复流程
  • 配置数据分区策略以支持扩容

---

See Also

参考链接

  • reference/postgresql.md — PostgreSQL deep dive
  • reference/redis.md — Redis patterns
  • reference/caching.md — Caching strategies
  • reference/postgresql.md — PostgreSQL深度解析
  • reference/redis.md — Redis设计模式
  • reference/caching.md — 缓存策略详解