pagination

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

API Pagination

API 分页

Return large datasets efficiently without killing your database.
高效返回大型数据集,同时避免给数据库造成过大压力。

When to Use This Skill

何时使用该技能

  • List endpoints returning many items
  • Infinite scroll UIs
  • Data export features
  • Any endpoint that could return 100+ items
  • 需返回大量数据的列表接口
  • 无限滚动UI
  • 数据导出功能
  • 任何可能返回100条以上数据的接口

Pagination Strategies

分页策略

Offset Pagination (Simple)

偏移量分页(简单型)

GET /users?page=2&limit=20
Pros: Simple, supports "jump to page" Cons: Slow on large datasets, inconsistent with concurrent writes
GET /users?page=2&limit=20
优点:实现简单,支持“跳转到指定页”功能 缺点:在大型数据集上性能较慢,并发写入时数据结果不一致

Cursor Pagination (Recommended)

游标分页(推荐)

GET /users?cursor=eyJpZCI6MTIzfQ&limit=20
Pros: Fast, consistent, works with real-time data Cons: No "jump to page", slightly more complex
GET /users?cursor=eyJpZCI6MTIzfQ&limit=20
优点:速度快、结果一致,适用于实时数据场景 缺点:不支持“跳转到指定页”,实现复杂度略高

TypeScript Implementation

TypeScript 实现

Cursor Pagination

游标分页

typescript
// pagination.ts
interface PaginationParams {
  cursor?: string;
  limit?: number;
  direction?: 'forward' | 'backward';
}

interface PaginatedResult<T> {
  data: T[];
  pagination: {
    hasMore: boolean;
    nextCursor: string | null;
    prevCursor: string | null;
    total?: number;
  };
}

function encodeCursor(data: Record<string, unknown>): string {
  return Buffer.from(JSON.stringify(data)).toString('base64url');
}

function decodeCursor(cursor: string): Record<string, unknown> {
  return JSON.parse(Buffer.from(cursor, 'base64url').toString());
}

async function paginate<T extends { id: string; createdAt: Date }>(
  query: (where: any, orderBy: any, take: number) => Promise<T[]>,
  params: PaginationParams,
  defaultLimit = 20,
  maxLimit = 100
): Promise<PaginatedResult<T>> {
  const limit = Math.min(params.limit || defaultLimit, maxLimit);
  const direction = params.direction || 'forward';

  let where: any = {};
  let orderBy: any = { createdAt: 'desc', id: 'desc' };

  if (params.cursor) {
    const decoded = decodeCursor(params.cursor);
    
    if (direction === 'forward') {
      where = {
        OR: [
          { createdAt: { lt: decoded.createdAt } },
          { createdAt: decoded.createdAt, id: { lt: decoded.id } },
        ],
      };
    } else {
      where = {
        OR: [
          { createdAt: { gt: decoded.createdAt } },
          { createdAt: decoded.createdAt, id: { gt: decoded.id } },
        ],
      };
      orderBy = { createdAt: 'asc', id: 'asc' };
    }
  }

  // Fetch one extra to check if there's more
  const items = await query(where, orderBy, limit + 1);
  const hasMore = items.length > limit;
  const data = hasMore ? items.slice(0, limit) : items;

  // Reverse if going backward
  if (direction === 'backward') {
    data.reverse();
  }

  return {
    data,
    pagination: {
      hasMore,
      nextCursor: data.length > 0
        ? encodeCursor({ createdAt: data[data.length - 1].createdAt, id: data[data.length - 1].id })
        : null,
      prevCursor: data.length > 0
        ? encodeCursor({ createdAt: data[0].createdAt, id: data[0].id })
        : null,
    },
  };
}

export { paginate, PaginationParams, PaginatedResult, encodeCursor, decodeCursor };
typescript
// pagination.ts
interface PaginationParams {
  cursor?: string;
  limit?: number;
  direction?: 'forward' | 'backward';
}

interface PaginatedResult<T> {
  data: T[];
  pagination: {
    hasMore: boolean;
    nextCursor: string | null;
    prevCursor: string | null;
    total?: number;
  };
}

function encodeCursor(data: Record<string, unknown>): string {
  return Buffer.from(JSON.stringify(data)).toString('base64url');
}

function decodeCursor(cursor: string): Record<string, unknown> {
  return JSON.parse(Buffer.from(cursor, 'base64url').toString());
}

async function paginate<T extends { id: string; createdAt: Date }>(
  query: (where: any, orderBy: any, take: number) => Promise<T[]>,
  params: PaginationParams,
  defaultLimit = 20,
  maxLimit = 100
): Promise<PaginatedResult<T>> {
  const limit = Math.min(params.limit || defaultLimit, maxLimit);
  const direction = params.direction || 'forward';

  let where: any = {};
  let orderBy: any = { createdAt: 'desc', id: 'desc' };

  if (params.cursor) {
    const decoded = decodeCursor(params.cursor);
    
    if (direction === 'forward') {
      where = {
        OR: [
          { createdAt: { lt: decoded.createdAt } },
          { createdAt: decoded.createdAt, id: { lt: decoded.id } },
        ],
      };
    } else {
      where = {
        OR: [
          { createdAt: { gt: decoded.createdAt } },
          { createdAt: decoded.createdAt, id: { gt: decoded.id } },
        ],
      };
      orderBy = { createdAt: 'asc', id: 'asc' };
    }
  }

  // 多获取一条数据,用于判断是否还有更多内容
  const items = await query(where, orderBy, limit + 1);
  const hasMore = items.length > limit;
  const data = hasMore ? items.slice(0, limit) : items;

  // 如果是向后翻页,反转数据顺序
  if (direction === 'backward') {
    data.reverse();
  }

  return {
    data,
    pagination: {
      hasMore,
      nextCursor: data.length > 0
        ? encodeCursor({ createdAt: data[data.length - 1].createdAt, id: data[data.length - 1].id })
        : null,
      prevCursor: data.length > 0
        ? encodeCursor({ createdAt: data[0].createdAt, id: data[0].id })
        : null,
    },
  };
}

export { paginate, PaginationParams, PaginatedResult, encodeCursor, decodeCursor };

Express Route

Express 路由

typescript
// users-route.ts
import { paginate } from './pagination';

router.get('/users', async (req, res) => {
  const { cursor, limit } = req.query;

  const result = await paginate(
    (where, orderBy, take) =>
      db.users.findMany({ where, orderBy, take }),
    { cursor: cursor as string, limit: Number(limit) || 20 }
  );

  res.json(result);
});
typescript
// users-route.ts
import { paginate } from './pagination';

router.get('/users', async (req, res) => {
  const { cursor, limit } = req.query;

  const result = await paginate(
    (where, orderBy, take) =>
      db.users.findMany({ where, orderBy, take }),
    { cursor: cursor as string, limit: Number(limit) || 20 }
  );

  res.json(result);
});

Response Format

响应格式

json
{
  "data": [
    { "id": "user_123", "name": "Alice", "createdAt": "2024-01-15T10:00:00Z" },
    { "id": "user_122", "name": "Bob", "createdAt": "2024-01-14T09:00:00Z" }
  ],
  "pagination": {
    "hasMore": true,
    "nextCursor": "eyJjcmVhdGVkQXQiOiIyMDI0LTAxLTE0VDA5OjAwOjAwWiIsImlkIjoidXNlcl8xMjIifQ",
    "prevCursor": "eyJjcmVhdGVkQXQiOiIyMDI0LTAxLTE1VDEwOjAwOjAwWiIsImlkIjoidXNlcl8xMjMifQ"
  }
}
json
{
  "data": [
    { "id": "user_123", "name": "Alice", "createdAt": "2024-01-15T10:00:00Z" },
    { "id": "user_122", "name": "Bob", "createdAt": "2024-01-14T09:00:00Z" }
  ],
  "pagination": {
    "hasMore": true,
    "nextCursor": "eyJjcmVhdGVkQXQiOiIyMDI0LTAxLTE0VDA5OjAwOjAwWiIsImlkIjoidXNlcl8xMjIifQ",
    "prevCursor": "eyJjcmVhdGVkQXQiOiIyMDI0LTAxLTE1VDEwOjAwOjAwWiIsImlkIjoidXNlcl8xMjMifQ"
  }
}

Python Implementation

Python 实现

python
undefined
python
undefined

pagination.py

pagination.py

import base64 import json from dataclasses import dataclass from typing import TypeVar, Generic, Callable, Optional
T = TypeVar('T')
@dataclass class PaginatedResult(Generic[T]): data: list[T] has_more: bool next_cursor: Optional[str] prev_cursor: Optional[str]
def encode_cursor(data: dict) -> str: return base64.urlsafe_b64encode(json.dumps(data).encode()).decode()
def decode_cursor(cursor: str) -> dict: return json.loads(base64.urlsafe_b64decode(cursor).decode())
async def paginate( query_fn: Callable, cursor: Optional[str] = None, limit: int = 20, max_limit: int = 100, ) -> PaginatedResult: limit = min(limit, max_limit)
filters = {}
if cursor:
    decoded = decode_cursor(cursor)
    filters = {"created_at__lt": decoded["created_at"]}

items = await query_fn(filters, limit + 1)
has_more = len(items) > limit
data = items[:limit] if has_more else items

return PaginatedResult(
    data=data,
    has_more=has_more,
    next_cursor=encode_cursor({"created_at": data[-1].created_at.isoformat()}) if data else None,
    prev_cursor=encode_cursor({"created_at": data[0].created_at.isoformat()}) if data else None,
)
undefined
import base64 import json from dataclasses import dataclass from typing import TypeVar, Generic, Callable, Optional
T = TypeVar('T')
@dataclass class PaginatedResult(Generic[T]): data: list[T] has_more: bool next_cursor: Optional[str] prev_cursor: Optional[str]
def encode_cursor(data: dict) -> str: return base64.urlsafe_b64encode(json.dumps(data).encode()).decode()
def decode_cursor(cursor: str) -> dict: return json.loads(base64.urlsafe_b64decode(cursor).decode())
async def paginate( query_fn: Callable, cursor: Optional[str] = None, limit: int = 20, max_limit: int = 100, ) -> PaginatedResult: limit = min(limit, max_limit)
filters = {}
if cursor:
    decoded = decode_cursor(cursor)
    filters = {"created_at__lt": decoded["created_at"]}

items = await query_fn(filters, limit + 1)
has_more = len(items) > limit
data = items[:limit] if has_more else items

return PaginatedResult(
    data=data,
    has_more=has_more,
    next_cursor=encode_cursor({"created_at": data[-1].created_at.isoformat()}) if data else None,
    prev_cursor=encode_cursor({"created_at": data[0].created_at.isoformat()}) if data else None,
)
undefined

FastAPI Route

FastAPI 路由

python
@router.get("/users")
async def list_users(cursor: str = None, limit: int = 20):
    async def query(filters, take):
        return await db.users.find_many(
            where=filters,
            order_by={"created_at": "desc"},
            take=take,
        )

    result = await paginate(query, cursor=cursor, limit=limit)
    return {
        "data": result.data,
        "pagination": {
            "hasMore": result.has_more,
            "nextCursor": result.next_cursor,
        },
    }
python
@router.get("/users")
async def list_users(cursor: str = None, limit: int = 20):
    async def query(filters, take):
        return await db.users.find_many(
            where=filters,
            order_by={"created_at": "desc"},
            take=take,
        )

    result = await paginate(query, cursor=cursor, limit=limit)
    return {
        "data": result.data,
        "pagination": {
            "hasMore": result.has_more,
            "nextCursor": result.next_cursor,
        },
    }

Database Optimization

数据库优化

sql
-- Essential index for cursor pagination
CREATE INDEX idx_users_pagination ON users(created_at DESC, id DESC);

-- For filtered pagination
CREATE INDEX idx_users_org_pagination ON users(organization_id, created_at DESC, id DESC);
sql
-- 游标分页必备索引
CREATE INDEX idx_users_pagination ON users(created_at DESC, id DESC);

-- 带过滤条件的分页索引
CREATE INDEX idx_users_org_pagination ON users(organization_id, created_at DESC, id DESC);

Frontend Integration

前端集成

typescript
// useInfiniteQuery with cursor pagination
function useUsers() {
  return useInfiniteQuery({
    queryKey: ['users'],
    queryFn: ({ pageParam }) =>
      fetch(`/api/users?cursor=${pageParam || ''}`).then(r => r.json()),
    getNextPageParam: (lastPage) =>
      lastPage.pagination.hasMore ? lastPage.pagination.nextCursor : undefined,
  });
}

// Usage
const { data, fetchNextPage, hasNextPage, isFetchingNextPage } = useUsers();

const allUsers = data?.pages.flatMap(page => page.data) ?? [];
typescript
// 结合useInfiniteQuery实现游标分页
function useUsers() {
  return useInfiniteQuery({
    queryKey: ['users'],
    queryFn: ({ pageParam }) =>
      fetch(`/api/users?cursor=${pageParam || ''}`).then(r => r.json()),
    getNextPageParam: (lastPage) =>
      lastPage.pagination.hasMore ? lastPage.pagination.nextCursor : undefined,
  });
}

// 使用示例
const { data, fetchNextPage, hasNextPage, isFetchingNextPage } = useUsers();

const allUsers = data?.pages.flatMap(page => page.data) ?? [];

Best Practices

最佳实践

  1. Always use stable sort - Include ID in sort to handle ties
  2. Index your sort columns - Pagination is only fast with proper indexes
  3. Limit the limit - Cap maximum page size (100 is reasonable)
  4. Use cursor for real-time data - Offset breaks with concurrent writes
  5. Include total count sparingly - COUNT(*) is expensive on large tables
  1. 始终使用稳定排序 - 排序条件中加入ID以处理相同时间戳的情况
  2. 为排序字段创建索引 - 只有配置合适的索引,分页才能保证高效
  3. 限制最大分页大小 - 合理设置单页数据上限(100条较为合适)
  4. 实时数据场景使用游标分页 - 偏移量分页在并发写入时会出现数据异常
  5. 谨慎返回总数据量 - 对大型表执行COUNT(*)操作性能开销极大

Common Mistakes

常见误区

  • Using OFFSET on large tables (scans all skipped rows)
  • Not including ID in cursor (unstable with same timestamps)
  • Missing index on sort columns
  • Returning total count on every request
  • Not handling deleted items between pages
  • 在大型表上使用OFFSET(会扫描所有被跳过的行)
  • 游标中未包含ID(相同时间戳下排序结果不稳定)
  • 未给排序字段创建索引
  • 每次请求都返回总数据量
  • 未处理分页间数据被删除的情况