pagination
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseAPI Pagination
API 分页
Return large datasets efficiently without killing your database.
高效返回大型数据集,同时避免给数据库造成过大压力。
When to Use This Skill
何时使用该技能
- List endpoints returning many items
- Infinite scroll UIs
- Data export features
- Any endpoint that could return 100+ items
- 需返回大量数据的列表接口
- 无限滚动UI
- 数据导出功能
- 任何可能返回100条以上数据的接口
Pagination Strategies
分页策略
Offset Pagination (Simple)
偏移量分页(简单型)
GET /users?page=2&limit=20Pros: Simple, supports "jump to page"
Cons: Slow on large datasets, inconsistent with concurrent writes
GET /users?page=2&limit=20优点:实现简单,支持“跳转到指定页”功能
缺点:在大型数据集上性能较慢,并发写入时数据结果不一致
Cursor Pagination (Recommended)
游标分页(推荐)
GET /users?cursor=eyJpZCI6MTIzfQ&limit=20Pros: Fast, consistent, works with real-time data
Cons: No "jump to page", slightly more complex
GET /users?cursor=eyJpZCI6MTIzfQ&limit=20优点:速度快、结果一致,适用于实时数据场景
缺点:不支持“跳转到指定页”,实现复杂度略高
TypeScript Implementation
TypeScript 实现
Cursor Pagination
游标分页
typescript
// pagination.ts
interface PaginationParams {
cursor?: string;
limit?: number;
direction?: 'forward' | 'backward';
}
interface PaginatedResult<T> {
data: T[];
pagination: {
hasMore: boolean;
nextCursor: string | null;
prevCursor: string | null;
total?: number;
};
}
function encodeCursor(data: Record<string, unknown>): string {
return Buffer.from(JSON.stringify(data)).toString('base64url');
}
function decodeCursor(cursor: string): Record<string, unknown> {
return JSON.parse(Buffer.from(cursor, 'base64url').toString());
}
async function paginate<T extends { id: string; createdAt: Date }>(
query: (where: any, orderBy: any, take: number) => Promise<T[]>,
params: PaginationParams,
defaultLimit = 20,
maxLimit = 100
): Promise<PaginatedResult<T>> {
const limit = Math.min(params.limit || defaultLimit, maxLimit);
const direction = params.direction || 'forward';
let where: any = {};
let orderBy: any = { createdAt: 'desc', id: 'desc' };
if (params.cursor) {
const decoded = decodeCursor(params.cursor);
if (direction === 'forward') {
where = {
OR: [
{ createdAt: { lt: decoded.createdAt } },
{ createdAt: decoded.createdAt, id: { lt: decoded.id } },
],
};
} else {
where = {
OR: [
{ createdAt: { gt: decoded.createdAt } },
{ createdAt: decoded.createdAt, id: { gt: decoded.id } },
],
};
orderBy = { createdAt: 'asc', id: 'asc' };
}
}
// Fetch one extra to check if there's more
const items = await query(where, orderBy, limit + 1);
const hasMore = items.length > limit;
const data = hasMore ? items.slice(0, limit) : items;
// Reverse if going backward
if (direction === 'backward') {
data.reverse();
}
return {
data,
pagination: {
hasMore,
nextCursor: data.length > 0
? encodeCursor({ createdAt: data[data.length - 1].createdAt, id: data[data.length - 1].id })
: null,
prevCursor: data.length > 0
? encodeCursor({ createdAt: data[0].createdAt, id: data[0].id })
: null,
},
};
}
export { paginate, PaginationParams, PaginatedResult, encodeCursor, decodeCursor };typescript
// pagination.ts
interface PaginationParams {
cursor?: string;
limit?: number;
direction?: 'forward' | 'backward';
}
interface PaginatedResult<T> {
data: T[];
pagination: {
hasMore: boolean;
nextCursor: string | null;
prevCursor: string | null;
total?: number;
};
}
function encodeCursor(data: Record<string, unknown>): string {
return Buffer.from(JSON.stringify(data)).toString('base64url');
}
function decodeCursor(cursor: string): Record<string, unknown> {
return JSON.parse(Buffer.from(cursor, 'base64url').toString());
}
async function paginate<T extends { id: string; createdAt: Date }>(
query: (where: any, orderBy: any, take: number) => Promise<T[]>,
params: PaginationParams,
defaultLimit = 20,
maxLimit = 100
): Promise<PaginatedResult<T>> {
const limit = Math.min(params.limit || defaultLimit, maxLimit);
const direction = params.direction || 'forward';
let where: any = {};
let orderBy: any = { createdAt: 'desc', id: 'desc' };
if (params.cursor) {
const decoded = decodeCursor(params.cursor);
if (direction === 'forward') {
where = {
OR: [
{ createdAt: { lt: decoded.createdAt } },
{ createdAt: decoded.createdAt, id: { lt: decoded.id } },
],
};
} else {
where = {
OR: [
{ createdAt: { gt: decoded.createdAt } },
{ createdAt: decoded.createdAt, id: { gt: decoded.id } },
],
};
orderBy = { createdAt: 'asc', id: 'asc' };
}
}
// 多获取一条数据,用于判断是否还有更多内容
const items = await query(where, orderBy, limit + 1);
const hasMore = items.length > limit;
const data = hasMore ? items.slice(0, limit) : items;
// 如果是向后翻页,反转数据顺序
if (direction === 'backward') {
data.reverse();
}
return {
data,
pagination: {
hasMore,
nextCursor: data.length > 0
? encodeCursor({ createdAt: data[data.length - 1].createdAt, id: data[data.length - 1].id })
: null,
prevCursor: data.length > 0
? encodeCursor({ createdAt: data[0].createdAt, id: data[0].id })
: null,
},
};
}
export { paginate, PaginationParams, PaginatedResult, encodeCursor, decodeCursor };Express Route
Express 路由
typescript
// users-route.ts
import { paginate } from './pagination';
router.get('/users', async (req, res) => {
const { cursor, limit } = req.query;
const result = await paginate(
(where, orderBy, take) =>
db.users.findMany({ where, orderBy, take }),
{ cursor: cursor as string, limit: Number(limit) || 20 }
);
res.json(result);
});typescript
// users-route.ts
import { paginate } from './pagination';
router.get('/users', async (req, res) => {
const { cursor, limit } = req.query;
const result = await paginate(
(where, orderBy, take) =>
db.users.findMany({ where, orderBy, take }),
{ cursor: cursor as string, limit: Number(limit) || 20 }
);
res.json(result);
});Response Format
响应格式
json
{
"data": [
{ "id": "user_123", "name": "Alice", "createdAt": "2024-01-15T10:00:00Z" },
{ "id": "user_122", "name": "Bob", "createdAt": "2024-01-14T09:00:00Z" }
],
"pagination": {
"hasMore": true,
"nextCursor": "eyJjcmVhdGVkQXQiOiIyMDI0LTAxLTE0VDA5OjAwOjAwWiIsImlkIjoidXNlcl8xMjIifQ",
"prevCursor": "eyJjcmVhdGVkQXQiOiIyMDI0LTAxLTE1VDEwOjAwOjAwWiIsImlkIjoidXNlcl8xMjMifQ"
}
}json
{
"data": [
{ "id": "user_123", "name": "Alice", "createdAt": "2024-01-15T10:00:00Z" },
{ "id": "user_122", "name": "Bob", "createdAt": "2024-01-14T09:00:00Z" }
],
"pagination": {
"hasMore": true,
"nextCursor": "eyJjcmVhdGVkQXQiOiIyMDI0LTAxLTE0VDA5OjAwOjAwWiIsImlkIjoidXNlcl8xMjIifQ",
"prevCursor": "eyJjcmVhdGVkQXQiOiIyMDI0LTAxLTE1VDEwOjAwOjAwWiIsImlkIjoidXNlcl8xMjMifQ"
}
}Python Implementation
Python 实现
python
undefinedpython
undefinedpagination.py
pagination.py
import base64
import json
from dataclasses import dataclass
from typing import TypeVar, Generic, Callable, Optional
T = TypeVar('T')
@dataclass
class PaginatedResult(Generic[T]):
data: list[T]
has_more: bool
next_cursor: Optional[str]
prev_cursor: Optional[str]
def encode_cursor(data: dict) -> str:
return base64.urlsafe_b64encode(json.dumps(data).encode()).decode()
def decode_cursor(cursor: str) -> dict:
return json.loads(base64.urlsafe_b64decode(cursor).decode())
async def paginate(
query_fn: Callable,
cursor: Optional[str] = None,
limit: int = 20,
max_limit: int = 100,
) -> PaginatedResult:
limit = min(limit, max_limit)
filters = {}
if cursor:
decoded = decode_cursor(cursor)
filters = {"created_at__lt": decoded["created_at"]}
items = await query_fn(filters, limit + 1)
has_more = len(items) > limit
data = items[:limit] if has_more else items
return PaginatedResult(
data=data,
has_more=has_more,
next_cursor=encode_cursor({"created_at": data[-1].created_at.isoformat()}) if data else None,
prev_cursor=encode_cursor({"created_at": data[0].created_at.isoformat()}) if data else None,
)undefinedimport base64
import json
from dataclasses import dataclass
from typing import TypeVar, Generic, Callable, Optional
T = TypeVar('T')
@dataclass
class PaginatedResult(Generic[T]):
data: list[T]
has_more: bool
next_cursor: Optional[str]
prev_cursor: Optional[str]
def encode_cursor(data: dict) -> str:
return base64.urlsafe_b64encode(json.dumps(data).encode()).decode()
def decode_cursor(cursor: str) -> dict:
return json.loads(base64.urlsafe_b64decode(cursor).decode())
async def paginate(
query_fn: Callable,
cursor: Optional[str] = None,
limit: int = 20,
max_limit: int = 100,
) -> PaginatedResult:
limit = min(limit, max_limit)
filters = {}
if cursor:
decoded = decode_cursor(cursor)
filters = {"created_at__lt": decoded["created_at"]}
items = await query_fn(filters, limit + 1)
has_more = len(items) > limit
data = items[:limit] if has_more else items
return PaginatedResult(
data=data,
has_more=has_more,
next_cursor=encode_cursor({"created_at": data[-1].created_at.isoformat()}) if data else None,
prev_cursor=encode_cursor({"created_at": data[0].created_at.isoformat()}) if data else None,
)undefinedFastAPI Route
FastAPI 路由
python
@router.get("/users")
async def list_users(cursor: str = None, limit: int = 20):
async def query(filters, take):
return await db.users.find_many(
where=filters,
order_by={"created_at": "desc"},
take=take,
)
result = await paginate(query, cursor=cursor, limit=limit)
return {
"data": result.data,
"pagination": {
"hasMore": result.has_more,
"nextCursor": result.next_cursor,
},
}python
@router.get("/users")
async def list_users(cursor: str = None, limit: int = 20):
async def query(filters, take):
return await db.users.find_many(
where=filters,
order_by={"created_at": "desc"},
take=take,
)
result = await paginate(query, cursor=cursor, limit=limit)
return {
"data": result.data,
"pagination": {
"hasMore": result.has_more,
"nextCursor": result.next_cursor,
},
}Database Optimization
数据库优化
sql
-- Essential index for cursor pagination
CREATE INDEX idx_users_pagination ON users(created_at DESC, id DESC);
-- For filtered pagination
CREATE INDEX idx_users_org_pagination ON users(organization_id, created_at DESC, id DESC);sql
-- 游标分页必备索引
CREATE INDEX idx_users_pagination ON users(created_at DESC, id DESC);
-- 带过滤条件的分页索引
CREATE INDEX idx_users_org_pagination ON users(organization_id, created_at DESC, id DESC);Frontend Integration
前端集成
typescript
// useInfiniteQuery with cursor pagination
function useUsers() {
return useInfiniteQuery({
queryKey: ['users'],
queryFn: ({ pageParam }) =>
fetch(`/api/users?cursor=${pageParam || ''}`).then(r => r.json()),
getNextPageParam: (lastPage) =>
lastPage.pagination.hasMore ? lastPage.pagination.nextCursor : undefined,
});
}
// Usage
const { data, fetchNextPage, hasNextPage, isFetchingNextPage } = useUsers();
const allUsers = data?.pages.flatMap(page => page.data) ?? [];typescript
// 结合useInfiniteQuery实现游标分页
function useUsers() {
return useInfiniteQuery({
queryKey: ['users'],
queryFn: ({ pageParam }) =>
fetch(`/api/users?cursor=${pageParam || ''}`).then(r => r.json()),
getNextPageParam: (lastPage) =>
lastPage.pagination.hasMore ? lastPage.pagination.nextCursor : undefined,
});
}
// 使用示例
const { data, fetchNextPage, hasNextPage, isFetchingNextPage } = useUsers();
const allUsers = data?.pages.flatMap(page => page.data) ?? [];Best Practices
最佳实践
- Always use stable sort - Include ID in sort to handle ties
- Index your sort columns - Pagination is only fast with proper indexes
- Limit the limit - Cap maximum page size (100 is reasonable)
- Use cursor for real-time data - Offset breaks with concurrent writes
- Include total count sparingly - COUNT(*) is expensive on large tables
- 始终使用稳定排序 - 排序条件中加入ID以处理相同时间戳的情况
- 为排序字段创建索引 - 只有配置合适的索引,分页才能保证高效
- 限制最大分页大小 - 合理设置单页数据上限(100条较为合适)
- 实时数据场景使用游标分页 - 偏移量分页在并发写入时会出现数据异常
- 谨慎返回总数据量 - 对大型表执行COUNT(*)操作性能开销极大
Common Mistakes
常见误区
- Using OFFSET on large tables (scans all skipped rows)
- Not including ID in cursor (unstable with same timestamps)
- Missing index on sort columns
- Returning total count on every request
- Not handling deleted items between pages
- 在大型表上使用OFFSET(会扫描所有被跳过的行)
- 游标中未包含ID(相同时间戳下排序结果不稳定)
- 未给排序字段创建索引
- 每次请求都返回总数据量
- 未处理分页间数据被删除的情况