rest-api-design-patterns
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseREST API Design Patterns
REST API设计模式
A comprehensive skill for designing, implementing, and maintaining RESTful APIs. Master resource modeling, HTTP methods, versioning strategies, pagination, filtering, error handling, and best practices for building scalable, maintainable APIs using FastAPI, Express.js, and modern frameworks.
这是一份关于设计、实现和维护RESTful API的综合指南。掌握资源建模、HTTP方法、版本控制策略、分页、过滤、错误处理,以及使用FastAPI、Express.js和现代框架构建可扩展、可维护API的最佳实践。
When to Use This Skill
何时使用此技能
Use this skill when:
- Designing a new RESTful API from scratch
- Building microservices with HTTP/REST interfaces
- Refactoring existing APIs for better design and consistency
- Implementing CRUD operations with proper HTTP semantics
- Adding versioning to an existing API
- Designing resource relationships and nested endpoints
- Implementing pagination, filtering, and sorting
- Handling errors and validation consistently
- Building hypermedia-driven APIs (HATEOAS)
- Optimizing API performance with caching and compression
- Documenting APIs with OpenAPI/Swagger specifications
- Ensuring API security with authentication and authorization patterns
在以下场景中使用此技能:
- 从零开始设计新的RESTful API
- 构建带有HTTP/REST接口的微服务
- 重构现有API以提升设计一致性
- 使用正确的HTTP语义实现CRUD操作
- 为现有API添加版本控制
- 设计资源关系和嵌套端点
- 实现分页、过滤和排序功能
- 统一处理错误和验证逻辑
- 构建超媒体驱动的API(HATEOAS)
- 通过缓存和压缩优化API性能
- 使用OpenAPI/Swagger规范编写API文档
- 通过认证和授权模式保障API安全
Core REST Principles
REST核心原则
What is REST?
什么是REST?
REST (Representational State Transfer) is an architectural style for distributed systems that emphasizes:
- Resource-Based: Everything is a resource with a unique identifier (URI)
- Standard Methods: Use standard HTTP methods (GET, POST, PUT, DELETE, PATCH)
- Stateless: Each request contains all information needed to process it
- Client-Server: Clear separation between client and server
- Cacheable: Responses can be cached for performance
- Uniform Interface: Consistent patterns across the API
REST(Representational State Transfer,表述性状态转移)是分布式系统的一种架构风格,强调:
- 基于资源:一切都是带有唯一标识符(URI)的资源
- 标准方法:使用标准HTTP方法(GET、POST、PUT、DELETE、PATCH)
- 无状态:每个请求包含处理所需的全部信息
- 客户端-服务端分离:客户端与服务端职责清晰分离
- 可缓存:响应可被缓存以提升性能
- 统一接口:API采用一致的设计模式
REST Maturity Model (Richardson Maturity Model)
REST成熟度模型(Richardson成熟度模型)
Level 0 - The Swamp of POX: Single URI, single HTTP method (usually POST)
- Example: with all operations in POST body
/api
Level 1 - Resources: Multiple URIs, each representing a resource
- Example: ,
/users,/posts/products
Level 2 - HTTP Verbs: Proper use of HTTP methods
- Example: GET , POST
/users/123, PUT/users/users/123
Level 3 - Hypermedia Controls (HATEOAS): API responses include links to related resources
- Example: Response includes
"_links": {"self": "/users/123", "posts": "/users/123/posts"}
Level 0 - POX沼泽:单个URI,单个HTTP方法(通常为POST)
- 示例:,所有操作都在POST请求体中
/api
Level 1 - 资源:多个URI,每个代表一个资源
- 示例:、
/users、/posts/products
Level 2 - HTTP动词:正确使用HTTP方法
- 示例:GET 、POST
/users/123、PUT/users/users/123
Level 3 - 超媒体控制(HATEOAS):API响应中包含关联资源的链接
- 示例:响应包含
"_links": {"self": "/users/123", "posts": "/users/123/posts"}
Resource Modeling
资源建模
Resource Naming Conventions
资源命名规范
1. Use Nouns, Not Verbs
Good:
GET /users
GET /products
POST /orders
Bad:
GET /getUsers
GET /getAllProducts
POST /createOrder2. Use Plural Nouns for Collections
Good:
GET /users # Collection
GET /users/123 # Individual resource
Bad:
GET /user
GET /user/1233. Use Lowercase and Hyphens
Good:
/user-profiles
/order-items
/payment-methods
Bad:
/userProfiles
/OrderItems
/payment_methods4. Hierarchy for Related Resources
Good:
/users/123/posts
/users/123/posts/456
/users/123/posts/456/comments
Avoid Deep Nesting (max 2-3 levels):
/organizations/1/departments/2/teams/3/members/4/tasks/5 # Too deep!1. 使用名词,而非动词
推荐:
GET /users
GET /products
POST /orders
不推荐:
GET /getUsers
GET /getAllProducts
POST /createOrder2. 集合资源使用复数名词
推荐:
GET /users # 集合
GET /users/123 # 单个资源
不推荐:
GET /user
GET /user/1233. 使用小写字母和连字符
推荐:
/user-profiles
/order-items
/payment-methods
不推荐:
/userProfiles
/OrderItems
/payment_methods4. 使用层级结构表示关联资源
推荐:
/users/123/posts
/users/123/posts/456
/users/123/posts/456/comments
避免过深嵌套(最多2-3层):
/organizations/1/departments/2/teams/3/members/4/tasks/5 # 嵌套过深!Resource Design Patterns
资源设计模式
Pattern 1: Collection and Item Resources
模式1:集合与单个资源
Collection Resource:
GET /products # List all products
POST /products # Create new product
Item Resource:
GET /products/123 # Get specific product
PUT /products/123 # Replace product (full update)
PATCH /products/123 # Partial update
DELETE /products/123 # Delete product集合资源:
GET /products # 列出所有产品
POST /products # 创建新产品
单个资源:
GET /products/123 # 获取指定产品
PUT /products/123 # 替换产品(全量更新)
PATCH /products/123 # 部分更新
DELETE /products/123 # 删除产品Pattern 2: Nested Resources (Parent-Child Relationships)
模式2:嵌套资源(父子关系)
undefinedundefinedComments belong to posts
评论属于帖子
GET /posts/42/comments # List comments for post 42
POST /posts/42/comments # Create comment on post 42
GET /posts/42/comments/7 # Get specific comment
DELETE /posts/42/comments/7 # Delete specific comment
GET /posts/42/comments # 列出帖子42的所有评论
POST /posts/42/comments # 在帖子42下创建评论
GET /posts/42/comments/7 # 获取指定评论
DELETE /posts/42/comments/7 # 删除指定评论
Alternative for accessing comments directly
直接访问评论的替代方式
GET /comments/7 # Get comment by ID (if you have it)
undefinedGET /comments/7 # 通过ID获取评论(如果已知ID)
undefinedPattern 3: Filtering Collections (Query Parameters)
模式3:集合过滤(查询参数)
GET /products?category=electronics
GET /products?price_min=100&price_max=500
GET /products?sort=price&order=desc
GET /users?status=active&role=admin
GET /posts?author=123&published=trueGET /products?category=electronics
GET /products?price_min=100&price_max=500
GET /products?sort=price&order=desc
GET /users?status=active&role=admin
GET /posts?author=123&published=truePattern 4: Actions on Resources (Controllers)
模式4:资源操作(控制器)
For operations that don't fit standard CRUD:
POST /users/123/activate # Activate user account
POST /orders/456/cancel # Cancel order
POST /payments/789/refund # Refund payment
POST /documents/321/publish # Publish document
POST /subscriptions/654/renew # Renew subscription针对不适合标准CRUD的操作:
POST /users/123/activate # 激活用户账号
POST /orders/456/cancel # 取消订单
POST /payments/789/refund # 退款
POST /documents/321/publish # 发布文档
POST /subscriptions/654/renew # 续订订阅Pattern 5: Bulk Operations
模式5:批量操作
POST /users/bulk-create # Create multiple users
PATCH /products/bulk-update # Update multiple products
DELETE /orders/bulk-delete # Delete multiple ordersPOST /users/bulk-create # 批量创建用户
PATCH /products/bulk-update # 批量更新产品
DELETE /orders/bulk-delete # 批量删除订单Or using query parameters
或使用查询参数
DELETE /orders?ids=1,2,3,4,5
undefinedDELETE /orders?ids=1,2,3,4,5
undefinedHTTP Methods Deep Dive
HTTP方法深入解析
GET - Retrieve Resources
GET - 获取资源
Characteristics:
- Safe: No side effects
- Idempotent: Multiple identical requests have the same effect
- Cacheable: Responses can be cached
FastAPI Example:
python
from fastapi import FastAPI, HTTPException
from typing import List, Optional
app = FastAPI()特性:
- 安全:无副作用
- 幂等:多次相同请求效果一致
- 可缓存:响应可被缓存
FastAPI示例:
python
from fastapi import FastAPI, HTTPException
from typing import List, Optional
app = FastAPI()Collection endpoint
集合端点
@app.get("/items/")
async def list_items(
skip: int = 0,
limit: int = 10,
category: Optional[str] = None
) -> List[dict]:
"""List items with pagination and filtering."""
# Filter and paginate
items = get_items_from_db(skip=skip, limit=limit, category=category)
return items
@app.get("/items/")
async def list_items(
skip: int = 0,
limit: int = 10,
category: Optional[str] = None
) -> List[dict]:
"""带分页和过滤的物品列表。"""
# 过滤和分页
items = get_items_from_db(skip=skip, limit=limit, category=category)
return items
Individual resource endpoint
单个资源端点
@app.get("/items/{item_id}")
async def get_item(item_id: int) -> dict:
"""Get a specific item by ID."""
item = get_item_from_db(item_id)
if not item:
raise HTTPException(status_code=404, detail="Item not found")
return item
**Express.js Example:**
```javascript
const express = require('express');
const app = express();
// Collection endpoint
app.get('/items', async (req, res) => {
try {
const { skip = 0, limit = 10, category } = req.query;
const items = await getItemsFromDB({ skip, limit, category });
res.json(items);
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
});
// Individual resource endpoint
app.get('/items/:id', async (req, res) => {
try {
const item = await getItemFromDB(req.params.id);
if (!item) {
return res.status(404).json({ error: 'Item not found' });
}
res.json(item);
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
});@app.get("/items/{item_id}")
async def get_item(item_id: int) -> dict:
"""通过ID获取指定物品。"""
item = get_item_from_db(item_id)
if not item:
raise HTTPException(status_code=404, detail="Item not found")
return item
**Express.js示例:**
```javascript
const express = require('express');
const app = express();
// 集合端点
app.get('/items', async (req, res) => {
try {
const { skip = 0, limit = 10, category } = req.query;
const items = await getItemsFromDB({ skip, limit, category });
res.json(items);
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
});
// 单个资源端点
app.get('/items/:id', async (req, res) => {
try {
const item = await getItemFromDB(req.params.id);
if (!item) {
return res.status(404).json({ error: 'Item not found' });
}
res.json(item);
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
});POST - Create Resources
POST - 创建资源
Characteristics:
- Not safe: Has side effects (creates resource)
- Not idempotent: Multiple requests create multiple resources
- Response should include header with new resource URI
Location
FastAPI Example:
python
from fastapi import FastAPI, HTTPException, status
from pydantic import BaseModel
class ItemCreate(BaseModel):
name: str
price: float
category: str
description: Optional[str] = None
@app.post("/items/", status_code=status.HTTP_201_CREATED)
async def create_item(item: ItemCreate, response: Response) -> dict:
"""Create a new item."""
# Validate and create
new_item = create_item_in_db(item)
# Set Location header
response.headers["Location"] = f"/items/{new_item.id}"
return new_itemExpress.js Example:
javascript
app.use(express.json());
app.post('/items', async (req, res) => {
try {
const { name, price, category, description } = req.body;
// Validate
if (!name || !price || !category) {
return res.status(400).json({
error: 'Missing required fields: name, price, category'
});
}
// Create resource
const newItem = await createItemInDB({ name, price, category, description });
// Set Location header and return 201
res.location(`/items/${newItem.id}`)
.status(201)
.json(newItem);
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
});特性:
- 不安全:有副作用(创建资源)
- 非幂等:多次请求会创建多个资源
- 响应应包含头,指向新资源的URI
Location
FastAPI示例:
python
from fastapi import FastAPI, HTTPException, status
from pydantic import BaseModel
class ItemCreate(BaseModel):
name: str
price: float
category: str
description: Optional[str] = None
@app.post("/items/", status_code=status.HTTP_201_CREATED)
async def create_item(item: ItemCreate, response: Response) -> dict:
"""创建新物品。"""
# 验证并创建
new_item = create_item_in_db(item)
# 设置Location头
response.headers["Location"] = f"/items/{new_item.id}"
return new_itemExpress.js示例:
javascript
app.use(express.json());
app.post('/items', async (req, res) => {
try {
const { name, price, category, description } = req.body;
// 验证
if (!name || !price || !category) {
return res.status(400).json({
error: 'Missing required fields: name, price, category'
});
}
// 创建资源
const newItem = await createItemInDB({ name, price, category, description });
// 设置Location头并返回201
res.location(`/items/${newItem.id}`)
.status(201)
.json(newItem);
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
});PUT - Replace Resource
PUT - 替换资源
Characteristics:
- Not safe: Has side effects
- Idempotent: Multiple identical requests have the same effect
- Replaces entire resource (all fields required)
FastAPI Example:
python
class ItemUpdate(BaseModel):
name: str
price: float
category: str
description: str
@app.put("/items/{item_id}")
async def replace_item(item_id: int, item: ItemUpdate) -> dict:
"""Replace an entire item (all fields required)."""
existing_item = get_item_from_db(item_id)
if not existing_item:
raise HTTPException(status_code=404, detail="Item not found")
# Replace entire resource
updated_item = replace_item_in_db(item_id, item)
return updated_itemExpress.js Example:
javascript
app.put('/items/:id', async (req, res) => {
try {
const { name, price, category, description } = req.body;
// All fields required for PUT
if (!name || !price || !category || description === undefined) {
return res.status(400).json({
error: 'PUT requires all fields: name, price, category, description'
});
}
const existingItem = await getItemFromDB(req.params.id);
if (!existingItem) {
return res.status(404).json({ error: 'Item not found' });
}
// Replace entire resource
const updatedItem = await replaceItemInDB(req.params.id, req.body);
res.json(updatedItem);
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
});特性:
- 不安全:有副作用
- 幂等:多次相同请求效果一致
- 替换整个资源(需要所有字段)
FastAPI示例:
python
class ItemUpdate(BaseModel):
name: str
price: float
category: str
description: str
@app.put("/items/{item_id}")
async def replace_item(item_id: int, item: ItemUpdate) -> dict:
"""替换整个物品(需要所有字段)。"""
existing_item = get_item_from_db(item_id)
if not existing_item:
raise HTTPException(status_code=404, detail="Item not found")
# 替换整个资源
updated_item = replace_item_in_db(item_id, item)
return updated_itemExpress.js示例:
javascript
app.put('/items/:id', async (req, res) => {
try {
const { name, price, category, description } = req.body;
// PUT需要所有字段
if (!name || !price || !category || description === undefined) {
return res.status(400).json({
error: 'PUT requires all fields: name, price, category, description'
});
}
const existingItem = await getItemFromDB(req.params.id);
if (!existingItem) {
return res.status(404).json({ error: 'Item not found' });
}
// 替换整个资源
const updatedItem = await replaceItemInDB(req.params.id, req.body);
res.json(updatedItem);
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
});PATCH - Partial Update
PATCH - 部分更新
Characteristics:
- Not safe: Has side effects
- Idempotent: Multiple identical requests have the same effect
- Updates only specified fields (partial update)
FastAPI Example:
python
class ItemPatch(BaseModel):
name: Optional[str] = None
price: Optional[float] = None
category: Optional[str] = None
description: Optional[str] = None
@app.patch("/items/{item_id}")
async def update_item(item_id: int, item: ItemPatch) -> dict:
"""Partially update an item (only provided fields)."""
existing_item = get_item_from_db(item_id)
if not existing_item:
raise HTTPException(status_code=404, detail="Item not found")
# Update only provided fields
update_data = item.model_dump(exclude_unset=True)
updated_item = update_item_in_db(item_id, update_data)
return updated_itemExpress.js Example:
javascript
app.patch('/items/:id', async (req, res) => {
try {
const existingItem = await getItemFromDB(req.params.id);
if (!existingItem) {
return res.status(404).json({ error: 'Item not found' });
}
// Update only provided fields
const updatedItem = await updateItemInDB(req.params.id, req.body);
res.json(updatedItem);
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
});特性:
- 不安全:有副作用
- 幂等:多次相同请求效果一致
- 仅更新指定字段(部分更新)
FastAPI示例:
python
class ItemPatch(BaseModel):
name: Optional[str] = None
price: Optional[float] = None
category: Optional[str] = None
description: Optional[str] = None
@app.patch("/items/{item_id}")
async def update_item(item_id: int, item: ItemPatch) -> dict:
"""部分更新物品(仅更新提供的字段)。"""
existing_item = get_item_from_db(item_id)
if not existing_item:
raise HTTPException(status_code=404, detail="Item not found")
# 仅更新提供的字段
update_data = item.model_dump(exclude_unset=True)
updated_item = update_item_in_db(item_id, update_data)
return updated_itemExpress.js示例:
javascript
app.patch('/items/:id', async (req, res) => {
try {
const existingItem = await getItemFromDB(req.params.id);
if (!existingItem) {
return res.status(404).json({ error: 'Item not found' });
}
// 仅更新提供的字段
const updatedItem = await updateItemInDB(req.params.id, req.body);
res.json(updatedItem);
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
});DELETE - Remove Resource
DELETE - 删除资源
Characteristics:
- Not safe: Has side effects
- Idempotent: Multiple identical requests have the same effect
- Returns 204 No Content or 200 OK with response body
FastAPI Example:
python
@app.delete("/items/{item_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_item(item_id: int):
"""Delete an item."""
existing_item = get_item_from_db(item_id)
if not existing_item:
raise HTTPException(status_code=404, detail="Item not found")
delete_item_from_db(item_id)
return None # 204 No Content特性:
- 不安全:有副作用
- 幂等:多次相同请求效果一致
- 返回204 No Content或带响应体的200 OK
FastAPI示例:
python
@app.delete("/items/{item_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_item(item_id: int):
"""删除物品。"""
existing_item = get_item_from_db(item_id)
if not existing_item:
raise HTTPException(status_code=404, detail="Item not found")
delete_item_from_db(item_id)
return None # 204 No ContentAlternative: Return deleted resource
替代方案:返回被删除的资源
@app.delete("/items/{item_id}")
async def delete_item_with_response(item_id: int) -> dict:
"""Delete an item and return it."""
existing_item = get_item_from_db(item_id)
if not existing_item:
raise HTTPException(status_code=404, detail="Item not found")
delete_item_from_db(item_id)
return existing_item # 200 OK with body
**Express.js Example:**
```javascript
// 204 No Content approach
app.delete('/items/:id', async (req, res) => {
try {
const existingItem = await getItemFromDB(req.params.id);
if (!existingItem) {
return res.status(404).json({ error: 'Item not found' });
}
await deleteItemFromDB(req.params.id);
res.status(204).send();
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
});
// 200 OK with response body approach
app.delete('/items/:id', async (req, res) => {
try {
const existingItem = await getItemFromDB(req.params.id);
if (!existingItem) {
return res.status(404).json({ error: 'Item not found' });
}
await deleteItemFromDB(req.params.id);
res.json({ message: 'Item deleted successfully', item: existingItem });
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
});@app.delete("/items/{item_id}")
async def delete_item_with_response(item_id: int) -> dict:
"""删除物品并返回该资源。"""
existing_item = get_item_from_db(item_id)
if not existing_item:
raise HTTPException(status_code=404, detail="Item not found")
delete_item_from_db(item_id)
return existing_item # 200 OK 带响应体
**Express.js示例:**
```javascript
// 204 No Content 方案
app.delete('/items/:id', async (req, res) => {
try {
const existingItem = await getItemFromDB(req.params.id);
if (!existingItem) {
return res.status(404).json({ error: 'Item not found' });
}
await deleteItemFromDB(req.params.id);
res.status(204).send();
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
});
// 200 OK 带响应体方案
app.delete('/items/:id', async (req, res) => {
try {
const existingItem = await getItemFromDB(req.params.id);
if (!existingItem) {
return res.status(404).json({ error: 'Item not found' });
}
await deleteItemFromDB(req.params.id);
res.json({ message: 'Item deleted successfully', item: existingItem });
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
});API Versioning Strategies
API版本控制策略
Strategy 1: URI Versioning (Most Common)
策略1:URI版本控制(最常用)
Version in the URI path - clear, explicit, easy to understand.
Pros:
- Explicit and visible
- Easy to route to different code versions
- Browser-friendly
- Simple for documentation
Cons:
- Creates multiple endpoints
- Can lead to code duplication
- URLs change between versions
FastAPI Implementation:
python
from fastapi import FastAPI, APIRouter
app = FastAPI()在URI路径中包含版本 - 清晰、明确、易于理解。
优点:
- 明确可见
- 易于路由到不同代码版本
- 浏览器友好
- 文档编写简单
缺点:
- 创建多个端点
- 可能导致代码重复
- 版本间URL会变化
FastAPI实现:
python
from fastapi import FastAPI, APIRouter
app = FastAPI()Version 1 router
版本1路由
v1_router = APIRouter(prefix="/api/v1")
@v1_router.get("/users")
async def get_users_v1():
return {"users": ["user1", "user2"], "version": "1.0"}
@v1_router.get("/users/{user_id}")
async def get_user_v1(user_id: int):
return {"id": user_id, "name": "John", "version": "1.0"}
v1_router = APIRouter(prefix="/api/v1")
@v1_router.get("/users")
async def get_users_v1():
return {"users": ["user1", "user2"], "version": "1.0"}
@v1_router.get("/users/{user_id}")
async def get_user_v1(user_id: int):
return {"id": user_id, "name": "John", "version": "1.0"}
Version 2 router
版本2路由
v2_router = APIRouter(prefix="/api/v2")
@v2_router.get("/users")
async def get_users_v2(limit: int = 10, offset: int = 0):
"""V2 adds pagination"""
return {
"users": ["user1", "user2"],
"pagination": {"limit": limit, "offset": offset},
"version": "2.0"
}
@v2_router.get("/users/{user_id}")
async def get_user_v2(user_id: int):
"""V2 returns more fields"""
return {
"id": user_id,
"name": "John",
"email": "john@example.com",
"created_at": "2024-01-01",
"version": "2.0"
}
app.include_router(v1_router)
app.include_router(v2_router)
**Express.js Implementation:**
```javascript
const express = require('express');
const app = express();
// Version 1 routes
const v1Router = express.Router();
v1Router.get('/users', (req, res) => {
res.json({ users: ['user1', 'user2'], version: '1.0' });
});
v1Router.get('/users/:id', (req, res) => {
res.json({ id: req.params.id, name: 'John', version: '1.0' });
});
// Version 2 routes
const v2Router = express.Router();
v2Router.get('/users', (req, res) => {
const { limit = 10, offset = 0 } = req.query;
res.json({
users: ['user1', 'user2'],
pagination: { limit, offset },
version: '2.0'
});
});
v2Router.get('/users/:id', (req, res) => {
res.json({
id: req.params.id,
name: 'John',
email: 'john@example.com',
created_at: '2024-01-01',
version: '2.0'
});
});
app.use('/api/v1', v1Router);
app.use('/api/v2', v2Router);v2_router = APIRouter(prefix="/api/v2")
@v2_router.get("/users")
async def get_users_v2(limit: int = 10, offset: int = 0):
"""V2新增分页"""
return {
"users": ["user1", "user2"],
"pagination": {"limit": limit, "offset": offset},
"version": "2.0"
}
@v2_router.get("/users/{user_id}")
async def get_user_v2(user_id: int):
"""V2返回更多字段"""
return {
"id": user_id,
"name": "John",
"email": "john@example.com",
"created_at": "2024-01-01",
"version": "2.0"
}
app.include_router(v1_router)
app.include_router(v2_router)
**Express.js实现:**
```javascript
const express = require('express');
const app = express();
// 版本1路由
const v1Router = express.Router();
v1Router.get('/users', (req, res) => {
res.json({ users: ['user1', 'user2'], version: '1.0' });
});
v1Router.get('/users/:id', (req, res) => {
res.json({ id: req.params.id, name: 'John', version: '1.0' });
});
// 版本2路由
const v2Router = express.Router();
v2Router.get('/users', (req, res) => {
const { limit = 10, offset = 0 } = req.query;
res.json({
users: ['user1', 'user2'],
pagination: { limit, offset },
version: '2.0'
});
});
v2Router.get('/users/:id', (req, res) => {
res.json({
id: req.params.id,
name: 'John',
email: 'john@example.com',
created_at: '2024-01-01',
version: '2.0'
});
});
app.use('/api/v1', v1Router);
app.use('/api/v2', v2Router);Strategy 2: Header Versioning
策略2:Header版本控制
Version specified in custom header or Accept header.
Pros:
- Clean URIs
- No URL pollution
- More "RESTful" (resources have single URI)
Cons:
- Less visible
- Harder to test in browser
- More complex routing logic
FastAPI Implementation:
python
from fastapi import FastAPI, Header, HTTPException
app = FastAPI()
@app.get("/users")
async def get_users(api_version: str = Header(default="1.0", alias="X-API-Version")):
"""Handle multiple versions based on header"""
if api_version == "1.0":
return {"users": ["user1", "user2"], "version": "1.0"}
elif api_version == "2.0":
return {
"users": ["user1", "user2"],
"pagination": {"limit": 10, "offset": 0},
"version": "2.0"
}
else:
raise HTTPException(
status_code=400,
detail=f"Unsupported API version: {api_version}"
)
@app.get("/users/{user_id}")
async def get_user(
user_id: int,
api_version: str = Header(default="1.0", alias="X-API-Version")
):
"""User endpoint with version handling"""
if api_version == "1.0":
return {"id": user_id, "name": "John", "version": "1.0"}
elif api_version == "2.0":
return {
"id": user_id,
"name": "John",
"email": "john@example.com",
"created_at": "2024-01-01",
"version": "2.0"
}
else:
raise HTTPException(
status_code=400,
detail=f"Unsupported API version: {api_version}"
)Express.js Implementation:
javascript
app.get('/users', (req, res) => {
const version = req.get('X-API-Version') || '1.0';
if (version === '1.0') {
res.json({ users: ['user1', 'user2'], version: '1.0' });
} else if (version === '2.0') {
res.json({
users: ['user1', 'user2'],
pagination: { limit: 10, offset: 0 },
version: '2.0'
});
} else {
res.status(400).json({ error: `Unsupported API version: ${version}` });
}
});
app.get('/users/:id', (req, res) => {
const version = req.get('X-API-Version') || '1.0';
if (version === '1.0') {
res.json({ id: req.params.id, name: 'John', version: '1.0' });
} else if (version === '2.0') {
res.json({
id: req.params.id,
name: 'John',
email: 'john@example.com',
created_at: '2024-01-01',
version: '2.0'
});
} else {
res.status(400).json({ error: `Unsupported API version: ${version}` });
}
});在自定义Header或Accept Header中指定版本。
优点:
- URI简洁
- 无URL冗余
- 更符合REST风格(资源只有单个URI)
缺点:
- 可见性低
- 浏览器中测试较难
- 路由逻辑更复杂
FastAPI实现:
python
from fastapi import FastAPI, Header, HTTPException
app = FastAPI()
@app.get("/users")
async def get_users(api_version: str = Header(default="1.0", alias="X-API-Version")):
"""基于Header处理多版本"""
if api_version == "1.0":
return {"users": ["user1", "user2"], "version": "1.0"}
elif api_version == "2.0":
return {
"users": ["user1", "user2"],
"pagination": {"limit": 10, "offset": 0},
"version": "2.0"
}
else:
raise HTTPException(
status_code=400,
detail=f"Unsupported API version: {api_version}"
)
@app.get("/users/{user_id}")
async def get_user(
user_id: int,
api_version: str = Header(default="1.0", alias="X-API-Version")
):
"""带版本处理的用户端点"""
if api_version == "1.0":
return {"id": user_id, "name": "John", "version": "1.0"}
elif api_version == "2.0":
return {
"id": user_id,
"name": "John",
"email": "john@example.com",
"created_at": "2024-01-01",
"version": "2.0"
}
else:
raise HTTPException(
status_code=400,
detail=f"Unsupported API version: {api_version}"
)Express.js实现:
javascript
app.get('/users', (req, res) => {
const version = req.get('X-API-Version') || '1.0';
if (version === '1.0') {
res.json({ users: ['user1', 'user2'], version: '1.0' });
} else if (version === '2.0') {
res.json({
users: ['user1', 'user2'],
pagination: { limit: 10, offset: 0 },
version: '2.0'
});
} else {
res.status(400).json({ error: `Unsupported API version: ${version}` });
}
});
app.get('/users/:id', (req, res) => {
const version = req.get('X-API-Version') || '1.0';
if (version === '1.0') {
res.json({ id: req.params.id, name: 'John', version: '1.0' });
} else if (version === '2.0') {
res.json({
id: req.params.id,
name: 'John',
email: 'john@example.com',
created_at: '2024-01-01',
version: '2.0'
});
} else {
res.status(400).json({ error: `Unsupported API version: ${version}` });
}
});Strategy 3: Content Negotiation (Accept Header)
策略3:内容协商(Accept Header)
Version specified in Accept header with custom media types.
FastAPI Implementation:
python
from fastapi import FastAPI, Request, HTTPException
app = FastAPI()
@app.get("/users")
async def get_users(request: Request):
"""Handle versioning via Accept header"""
accept = request.headers.get("accept", "application/vnd.api.v1+json")
if "vnd.api.v1+json" in accept:
return {"users": ["user1", "user2"], "version": "1.0"}
elif "vnd.api.v2+json" in accept:
return {
"users": ["user1", "user2"],
"pagination": {"limit": 10, "offset": 0},
"version": "2.0"
}
else:
raise HTTPException(
status_code=406,
detail="Not Acceptable: Unsupported media type"
)Express.js Implementation:
javascript
app.get('/users', (req, res) => {
const accept = req.get('Accept') || 'application/vnd.api.v1+json';
if (accept.includes('vnd.api.v1+json')) {
res.type('application/vnd.api.v1+json')
.json({ users: ['user1', 'user2'], version: '1.0' });
} else if (accept.includes('vnd.api.v2+json')) {
res.type('application/vnd.api.v2+json')
.json({
users: ['user1', 'user2'],
pagination: { limit: 10, offset: 0 },
version: '2.0'
});
} else {
res.status(406).json({ error: 'Not Acceptable: Unsupported media type' });
}
});在Accept Header中使用自定义媒体类型指定版本。
FastAPI实现:
python
from fastapi import FastAPI, Request, HTTPException
app = FastAPI()
@app.get("/users")
async def get_users(request: Request):
"""通过Accept Header处理版本控制"""
accept = request.headers.get("accept", "application/vnd.api.v1+json")
if "vnd.api.v1+json" in accept:
return {"users": ["user1", "user2"], "version": "1.0"}
elif "vnd.api.v2+json" in accept:
return {
"users": ["user1", "user2"],
"pagination": {"limit": 10, "offset": 0},
"version": "2.0"
}
else:
raise HTTPException(
status_code=406,
detail="Not Acceptable: Unsupported media type"
)Express.js实现:
javascript
app.get('/users', (req, res) => {
const accept = req.get('Accept') || 'application/vnd.api.v1+json';
if (accept.includes('vnd.api.v1+json')) {
res.type('application/vnd.api.v1+json')
.json({ users: ['user1', 'user2'], version: '1.0' });
} else if (accept.includes('vnd.api.v2+json')) {
res.type('application/vnd.api.v2+json')
.json({
users: ['user1', 'user2'],
pagination: { limit: 10, offset: 0 },
version: '2.0'
});
} else {
res.status(406).json({ error: 'Not Acceptable: Unsupported media type' });
}
});Strategy 4: Query Parameter Versioning
策略4:查询参数版本控制
Version as query parameter (least recommended).
GET /users?version=2.0
GET /users/123?v=2Cons:
- Mixes versioning with filtering
- Harder to cache
- Less clear separation of concerns
版本作为查询参数(最不推荐)。
GET /users?version=2.0
GET /users/123?v=2缺点:
- 版本控制与过滤逻辑混合
- 缓存难度大
- 关注点分离不清晰
Pagination Patterns
分页模式
Pattern 1: Offset-Based Pagination (Traditional)
模式1:基于偏移量的分页(传统方式)
Simple but can have performance issues with large datasets.
FastAPI Implementation:
python
from fastapi import FastAPI, Query
from typing import List
from pydantic import BaseModel
class PaginatedResponse(BaseModel):
items: List[dict]
total: int
limit: int
offset: int
has_more: bool
@app.get("/items", response_model=PaginatedResponse)
async def list_items(
limit: int = Query(default=10, ge=1, le=100),
offset: int = Query(default=0, ge=0)
):
"""Offset-based pagination"""
# Get total count
total = count_items_in_db()
# Get paginated items
items = get_items_from_db(limit=limit, offset=offset)
# Check if there are more items
has_more = (offset + limit) < total
return {
"items": items,
"total": total,
"limit": limit,
"offset": offset,
"has_more": has_more
}Express.js Implementation:
javascript
app.get('/items', async (req, res) => {
try {
const limit = Math.min(parseInt(req.query.limit) || 10, 100);
const offset = parseInt(req.query.offset) || 0;
const total = await countItemsInDB();
const items = await getItemsFromDB({ limit, offset });
const hasMore = (offset + limit) < total;
res.json({
items,
total,
limit,
offset,
has_more: hasMore
});
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
});简单但在大数据集下可能存在性能问题。
FastAPI实现:
python
from fastapi import FastAPI, Query
from typing import List
from pydantic import BaseModel
class PaginatedResponse(BaseModel):
items: List[dict]
total: int
limit: int
offset: int
has_more: bool
@app.get("/items", response_model=PaginatedResponse)
async def list_items(
limit: int = Query(default=10, ge=1, le=100),
offset: int = Query(default=0, ge=0)
):
"""基于偏移量的分页"""
# 获取总数
total = count_items_in_db()
# 获取分页后的物品
items = get_items_from_db(limit=limit, offset=offset)
# 检查是否还有更多物品
has_more = (offset + limit) < total
return {
"items": items,
"total": total,
"limit": limit,
"offset": offset,
"has_more": has_more
}Express.js实现:
javascript
app.get('/items', async (req, res) => {
try {
const limit = Math.min(parseInt(req.query.limit) || 10, 100);
const offset = parseInt(req.query.offset) || 0;
const total = await countItemsInDB();
const items = await getItemsFromDB({ limit, offset });
const hasMore = (offset + limit) < total;
res.json({
items,
total,
limit,
offset,
has_more: hasMore
});
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
});Pattern 2: Cursor-Based Pagination (Recommended for Large Datasets)
模式2:基于游标(Cursor)的分页(推荐用于大数据集)
More efficient for large datasets, prevents issues with data changes during pagination.
FastAPI Implementation:
python
from typing import Optional
class CursorPaginatedResponse(BaseModel):
items: List[dict]
next_cursor: Optional[str] = None
has_more: bool
@app.get("/items", response_model=CursorPaginatedResponse)
async def list_items_cursor(
limit: int = Query(default=10, ge=1, le=100),
cursor: Optional[str] = None
):
"""Cursor-based pagination"""
# Get items after cursor
items = get_items_after_cursor(cursor=cursor, limit=limit + 1)
# Check if there are more items
has_more = len(items) > limit
# Get next cursor from last item
next_cursor = None
if has_more:
items = items[:limit] # Remove extra item
next_cursor = items[-1]["id"] # Use last item ID as cursor
return {
"items": items,
"next_cursor": next_cursor,
"has_more": has_more
}Express.js Implementation:
javascript
app.get('/items', async (req, res) => {
try {
const limit = Math.min(parseInt(req.query.limit) || 10, 100);
const cursor = req.query.cursor || null;
// Get one extra item to check if there are more
const items = await getItemsAfterCursor({ cursor, limit: limit + 1 });
const hasMore = items.length > limit;
let nextCursor = null;
if (hasMore) {
items.pop(); // Remove extra item
nextCursor = items[items.length - 1].id; // Last item ID as cursor
}
res.json({
items,
next_cursor: nextCursor,
has_more: hasMore
});
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
});在大数据集下更高效,避免分页期间数据变化导致的问题。
FastAPI实现:
python
from typing import Optional
class CursorPaginatedResponse(BaseModel):
items: List[dict]
next_cursor: Optional[str] = None
has_more: bool
@app.get("/items", response_model=CursorPaginatedResponse)
async def list_items_cursor(
limit: int = Query(default=10, ge=1, le=100),
cursor: Optional[str] = None
):
"""基于游标的分页"""
# 获取游标之后的物品
items = get_items_after_cursor(cursor=cursor, limit=limit + 1)
# 检查是否还有更多物品
has_more = len(items) > limit
# 从最后一个物品获取下一个游标
next_cursor = None
if has_more:
items = items[:limit] # 移除多余的物品
next_cursor = items[-1]["id"] # 使用最后一个物品的ID作为游标
return {
"items": items,
"next_cursor": next_cursor,
"has_more": has_more
}Express.js实现:
javascript
app.get('/items', async (req, res) => {
try {
const limit = Math.min(parseInt(req.query.limit) || 10, 100);
const cursor = req.query.cursor || null;
// 多获取一个物品以检查是否还有更多
const items = await getItemsAfterCursor({ cursor, limit: limit + 1 });
const hasMore = items.length > limit;
let nextCursor = null;
if (hasMore) {
items.pop(); // 移除多余的物品
nextCursor = items[items.length - 1].id; // 最后一个物品的ID作为游标
}
res.json({
items,
next_cursor: nextCursor,
has_more: hasMore
});
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
});Pattern 3: Page-Based Pagination
模式3:基于页码的分页
User-friendly for UIs with page numbers.
FastAPI Implementation:
python
class PagePaginatedResponse(BaseModel):
items: List[dict]
page: int
page_size: int
total_pages: int
total_items: int
@app.get("/items", response_model=PagePaginatedResponse)
async def list_items_pages(
page: int = Query(default=1, ge=1),
page_size: int = Query(default=10, ge=1, le=100)
):
"""Page-based pagination"""
# Calculate offset
offset = (page - 1) * page_size
# Get total count and items
total_items = count_items_in_db()
items = get_items_from_db(limit=page_size, offset=offset)
# Calculate total pages
total_pages = (total_items + page_size - 1) // page_size
return {
"items": items,
"page": page,
"page_size": page_size,
"total_pages": total_pages,
"total_items": total_items
}Express.js Implementation:
javascript
app.get('/items', async (req, res) => {
try {
const page = Math.max(parseInt(req.query.page) || 1, 1);
const pageSize = Math.min(parseInt(req.query.page_size) || 10, 100);
const offset = (page - 1) * pageSize;
const totalItems = await countItemsInDB();
const items = await getItemsFromDB({ limit: pageSize, offset });
const totalPages = Math.ceil(totalItems / pageSize);
res.json({
items,
page,
page_size: pageSize,
total_pages: totalPages,
total_items: totalItems
});
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
});对带页码的UI更友好。
FastAPI实现:
python
class PagePaginatedResponse(BaseModel):
items: List[dict]
page: int
page_size: int
total_pages: int
total_items: int
@app.get("/items", response_model=PagePaginatedResponse)
async def list_items_pages(
page: int = Query(default=1, ge=1),
page_size: int = Query(default=10, ge=1, le=100)
):
"""基于页码的分页"""
# 计算偏移量
offset = (page - 1) * page_size
# 获取总数和物品
total_items = count_items_in_db()
items = get_items_from_db(limit=page_size, offset=offset)
# 计算总页数
total_pages = (total_items + page_size - 1) // page_size
return {
"items": items,
"page": page,
"page_size": page_size,
"total_pages": total_pages,
"total_items": total_items
}Express.js实现:
javascript
app.get('/items', async (req, res) => {
try {
const page = Math.max(parseInt(req.query.page) || 1, 1);
const pageSize = Math.min(parseInt(req.query.page_size) || 10, 100);
const offset = (page - 1) * pageSize;
const totalItems = await countItemsInDB();
const items = await getItemsFromDB({ limit: pageSize, offset });
const totalPages = Math.ceil(totalItems / pageSize);
res.json({
items,
page,
page_size: pageSize,
total_pages: totalPages,
total_items: totalItems
});
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
});Filtering and Sorting
过滤与排序
Advanced Filtering
高级过滤
FastAPI Implementation:
python
from enum import Enum
from typing import Optional, List
class SortOrder(str, Enum):
asc = "asc"
desc = "desc"
@app.get("/products")
async def list_products(
# Filtering
category: Optional[str] = None,
min_price: Optional[float] = None,
max_price: Optional[float] = None,
in_stock: Optional[bool] = None,
tags: Optional[List[str]] = Query(None),
search: Optional[str] = None,
# Sorting
sort_by: Optional[str] = Query(default="created_at"),
order: SortOrder = SortOrder.desc,
# Pagination
limit: int = Query(default=10, ge=1, le=100),
offset: int = Query(default=0, ge=0)
):
"""Advanced filtering and sorting"""
filters = {
"category": category,
"min_price": min_price,
"max_price": max_price,
"in_stock": in_stock,
"tags": tags,
"search": search
}
# Remove None values
filters = {k: v for k, v in filters.items() if v is not None}
# Query database
products = query_products(
filters=filters,
sort_by=sort_by,
order=order.value,
limit=limit,
offset=offset
)
return {
"products": products,
"filters": filters,
"sort": {"by": sort_by, "order": order.value},
"pagination": {"limit": limit, "offset": offset}
}Express.js Implementation:
javascript
app.get('/products', async (req, res) => {
try {
const {
category,
min_price,
max_price,
in_stock,
tags,
search,
sort_by = 'created_at',
order = 'desc',
limit = 10,
offset = 0
} = req.query;
// Build filters
const filters = {};
if (category) filters.category = category;
if (min_price) filters.min_price = parseFloat(min_price);
if (max_price) filters.max_price = parseFloat(max_price);
if (in_stock !== undefined) filters.in_stock = in_stock === 'true';
if (tags) filters.tags = Array.isArray(tags) ? tags : [tags];
if (search) filters.search = search;
// Query database
const products = await queryProducts({
filters,
sortBy: sort_by,
order,
limit: Math.min(parseInt(limit), 100),
offset: parseInt(offset)
});
res.json({
products,
filters,
sort: { by: sort_by, order },
pagination: { limit, offset }
});
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
});FastAPI实现:
python
from enum import Enum
from typing import Optional, List
class SortOrder(str, Enum):
asc = "asc"
desc = "desc"
@app.get("/products")
async def list_products(
# 过滤
category: Optional[str] = None,
min_price: Optional[float] = None,
max_price: Optional[float] = None,
in_stock: Optional[bool] = None,
tags: Optional[List[str]] = Query(None),
search: Optional[str] = None,
# 排序
sort_by: Optional[str] = Query(default="created_at"),
order: SortOrder = SortOrder.desc,
# 分页
limit: int = Query(default=10, ge=1, le=100),
offset: int = Query(default=0, ge=0)
):
"""高级过滤与排序"""
filters = {
"category": category,
"min_price": min_price,
"max_price": max_price,
"in_stock": in_stock,
"tags": tags,
"search": search
}
# 移除None值
filters = {k: v for k, v in filters.items() if v is not None}
# 查询数据库
products = query_products(
filters=filters,
sort_by=sort_by,
order=order.value,
limit=limit,
offset=offset
)
return {
"products": products,
"filters": filters,
"sort": {"by": sort_by, "order": order.value},
"pagination": {"limit": limit, "offset": offset}
}Express.js实现:
javascript
app.get('/products', async (req, res) => {
try {
const {
category,
min_price,
max_price,
in_stock,
tags,
search,
sort_by = 'created_at',
order = 'desc',
limit = 10,
offset = 0
} = req.query;
// 构建过滤条件
const filters = {};
if (category) filters.category = category;
if (min_price) filters.min_price = parseFloat(min_price);
if (max_price) filters.max_price = parseFloat(max_price);
if (in_stock !== undefined) filters.in_stock = in_stock === 'true';
if (tags) filters.tags = Array.isArray(tags) ? tags : [tags];
if (search) filters.search = search;
// 查询数据库
const products = await queryProducts({
filters,
sortBy: sort_by,
order,
limit: Math.min(parseInt(limit), 100),
offset: parseInt(offset)
});
res.json({
products,
filters,
sort: { by: sort_by, order },
pagination: { limit, offset }
});
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
});Error Handling Best Practices
错误处理最佳实践
Consistent Error Response Format
统一的错误响应格式
Standard Error Response Structure:
json
{
"error": {
"code": "RESOURCE_NOT_FOUND",
"message": "The requested user was not found",
"details": {
"user_id": "123",
"resource": "user"
},
"timestamp": "2024-01-15T10:30:00Z"
}
}标准错误响应结构:
json
{
"error": {
"code": "RESOURCE_NOT_FOUND",
"message": "请求的用户不存在",
"details": {
"user_id": "123",
"resource": "user"
},
"timestamp": "2024-01-15T10:30:00Z"
}
}HTTP Status Codes
HTTP状态码
Success Codes:
- : Successful GET, PUT, PATCH, DELETE with response
200 OK - : Successful POST creating a resource
201 Created - : Request accepted for async processing
202 Accepted - : Successful DELETE with no response body
204 No Content
Client Error Codes:
- : Invalid request syntax or validation error
400 Bad Request - : Authentication required
401 Unauthorized - : Authenticated but not authorized
403 Forbidden - : Resource doesn't exist
404 Not Found - : HTTP method not supported
405 Method Not Allowed - : Request conflicts with current state
409 Conflict - : Validation failed
422 Unprocessable Entity - : Rate limit exceeded
429 Too Many Requests
Server Error Codes:
- : Generic server error
500 Internal Server Error - : Invalid response from upstream server
502 Bad Gateway - : Server temporarily unavailable
503 Service Unavailable - : Upstream server timeout
504 Gateway Timeout
成功状态码:
- :成功的GET、PUT、PATCH、DELETE请求(带响应体)
200 OK - :成功创建资源的POST请求
201 Created - :请求已接受,将异步处理
202 Accepted - :成功的DELETE请求(无响应体)
204 No Content
客户端错误状态码:
- :请求语法无效或验证错误
400 Bad Request - :需要认证
401 Unauthorized - :已认证但无权限
403 Forbidden - :资源不存在
404 Not Found - :不支持该HTTP方法
405 Method Not Allowed - :请求与当前资源状态冲突
409 Conflict - :验证失败
422 Unprocessable Entity - :超出请求频率限制
429 Too Many Requests
服务端错误状态码:
- :通用服务端错误
500 Internal Server Error - :上游服务器响应无效
502 Bad Gateway - :服务暂时不可用
503 Service Unavailable - :上游服务器超时
504 Gateway Timeout
FastAPI Error Handling
FastAPI错误处理
python
from fastapi import FastAPI, HTTPException, status, Request
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
from datetime import datetime
app = FastAPI()python
from fastapi import FastAPI, HTTPException, status, Request
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
from datetime import datetime
app = FastAPI()Custom exception
自定义异常
class ResourceNotFoundError(Exception):
def init(self, resource: str, resource_id: str):
self.resource = resource
self.resource_id = resource_id
class ResourceNotFoundError(Exception):
def init(self, resource: str, resource_id: str):
self.resource = resource
self.resource_id = resource_id
Global exception handler
全局异常处理器
@app.exception_handler(ResourceNotFoundError)
async def resource_not_found_handler(request: Request, exc: ResourceNotFoundError):
return JSONResponse(
status_code=404,
content={
"error": {
"code": "RESOURCE_NOT_FOUND",
"message": f"The requested {exc.resource} was not found",
"details": {
"resource": exc.resource,
"resource_id": exc.resource_id
},
"timestamp": datetime.utcnow().isoformat() + "Z"
}
}
)
@app.exception_handler(ResourceNotFoundError)
async def resource_not_found_handler(request: Request, exc: ResourceNotFoundError):
return JSONResponse(
status_code=404,
content={
"error": {
"code": "RESOURCE_NOT_FOUND",
"message": f"请求的{exc.resource}不存在",
"details": {
"resource": exc.resource,
"resource_id": exc.resource_id
},
"timestamp": datetime.utcnow().isoformat() + "Z"
}
}
)
Validation error handler
验证错误处理器
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
return JSONResponse(
status_code=422,
content={
"error": {
"code": "VALIDATION_ERROR",
"message": "Request validation failed",
"details": exc.errors(),
"timestamp": datetime.utcnow().isoformat() + "Z"
}
}
)
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
return JSONResponse(
status_code=422,
content={
"error": {
"code": "VALIDATION_ERROR",
"message": "请求验证失败",
"details": exc.errors(),
"timestamp": datetime.utcnow().isoformat() + "Z"
}
}
)
Using exceptions
使用异常
@app.get("/users/{user_id}")
async def get_user(user_id: int):
user = get_user_from_db(user_id)
if not user:
raise ResourceNotFoundError("user", str(user_id))
return user
@app.get("/users/{user_id}")
async def get_user(user_id: int):
user = get_user_from_db(user_id)
if not user:
raise ResourceNotFoundError("user", str(user_id))
return user
Manual error responses
手动返回错误响应
@app.post("/users")
async def create_user(email: str):
if user_exists(email):
raise HTTPException(
status_code=409,
detail={
"code": "DUPLICATE_EMAIL",
"message": "A user with this email already exists",
"details": {"email": email}
}
)
return create_user_in_db(email)
undefined@app.post("/users")
async def create_user(email: str):
if user_exists(email):
raise HTTPException(
status_code=409,
detail={
"code": "DUPLICATE_EMAIL",
"message": "该邮箱已被注册",
"details": {"email": email}
}
)
return create_user_in_db(email)
undefinedExpress.js Error Handling
Express.js错误处理
javascript
const express = require('express');
const app = express();
app.use(express.json());
// Custom error class
class ResourceNotFoundError extends Error {
constructor(resource, resourceId) {
super(`${resource} not found`);
this.name = 'ResourceNotFoundError';
this.resource = resource;
this.resourceId = resourceId;
this.statusCode = 404;
}
}
class ValidationError extends Error {
constructor(message, details) {
super(message);
this.name = 'ValidationError';
this.details = details;
this.statusCode = 422;
}
}
// Routes
app.get('/users/:id', async (req, res, next) => {
try {
const user = await getUserFromDB(req.params.id);
if (!user) {
throw new ResourceNotFoundError('user', req.params.id);
}
res.json(user);
} catch (error) {
next(error);
}
});
app.post('/users', async (req, res, next) => {
try {
const { email } = req.body;
if (!email) {
throw new ValidationError('Validation failed', {
field: 'email',
message: 'Email is required'
});
}
const userExists = await checkUserExists(email);
if (userExists) {
const error = new Error('Duplicate email');
error.statusCode = 409;
error.code = 'DUPLICATE_EMAIL';
error.details = { email };
throw error;
}
const newUser = await createUserInDB(email);
res.status(201).json(newUser);
} catch (error) {
next(error);
}
});
// Global error handler (must be last)
app.use((err, req, res, next) => {
const statusCode = err.statusCode || 500;
const code = err.code || err.name || 'INTERNAL_SERVER_ERROR';
const errorResponse = {
error: {
code,
message: err.message,
timestamp: new Date().toISOString()
}
};
// Add details if available
if (err.details) {
errorResponse.error.details = err.details;
} else if (err.resource && err.resourceId) {
errorResponse.error.details = {
resource: err.resource,
resource_id: err.resourceId
};
}
// Log error for debugging (don't expose in production)
if (process.env.NODE_ENV !== 'production') {
errorResponse.error.stack = err.stack;
}
res.status(statusCode).json(errorResponse);
});javascript
const express = require('express');
const app = express();
app.use(express.json());
// 自定义错误类
class ResourceNotFoundError extends Error {
constructor(resource, resourceId) {
super(`${resource} not found`);
this.name = 'ResourceNotFoundError';
this.resource = resource;
this.resourceId = resourceId;
this.statusCode = 404;
}
}
class ValidationError extends Error {
constructor(message, details) {
super(message);
this.name = 'ValidationError';
this.details = details;
this.statusCode = 422;
}
}
// 路由
app.get('/users/:id', async (req, res, next) => {
try {
const user = await getUserFromDB(req.params.id);
if (!user) {
throw new ResourceNotFoundError('user', req.params.id);
}
res.json(user);
} catch (error) {
next(error);
}
});
app.post('/users', async (req, res, next) => {
try {
const { email } = req.body;
if (!email) {
throw new ValidationError('Validation failed', {
field: 'email',
message: 'Email is required'
});
}
const userExists = await checkUserExists(email);
if (userExists) {
const error = new Error('Duplicate email');
error.statusCode = 409;
error.code = 'DUPLICATE_EMAIL';
error.details = { email };
throw error;
}
const newUser = await createUserInDB(email);
res.status(201).json(newUser);
} catch (error) {
next(error);
}
});
// 全局错误处理器(必须放在最后)
app.use((err, req, res, next) => {
const statusCode = err.statusCode || 500;
const code = err.code || err.name || 'INTERNAL_SERVER_ERROR';
const errorResponse = {
error: {
code,
message: err.message,
timestamp: new Date().toISOString()
}
};
// 如果有详情则添加
if (err.details) {
errorResponse.error.details = err.details;
} else if (err.resource && err.resourceId) {
errorResponse.error.details = {
resource: err.resource,
resource_id: err.resourceId
};
}
// 调试用日志(生产环境不要暴露)
if (process.env.NODE_ENV !== 'production') {
errorResponse.error.stack = err.stack;
}
res.status(statusCode).json(errorResponse);
});HATEOAS and Hypermedia
HATEOAS与超媒体
What is HATEOAS?
什么是HATEOAS?
HATEOAS (Hypermedia as the Engine of Application State) means including links to related resources in API responses.
Benefits:
- Self-documenting API
- Client doesn't need to construct URLs
- Easier API evolution
- Better discoverability
HATEOAS(Hypermedia as the Engine of Application State,超媒体作为应用状态引擎)指在API响应中包含关联资源的链接。
优点:
- API自文档化
- 客户端无需构造URL
- API演进更简单
- 资源可发现性更好
FastAPI HATEOAS Implementation
FastAPI HATEOAS实现
python
from fastapi import FastAPI
from pydantic import BaseModel, HttpUrl
from typing import List, Optional
class Link(BaseModel):
rel: str
href: str
method: str = "GET"
class UserResponse(BaseModel):
id: int
name: str
email: str
links: List[Link]
class UserListResponse(BaseModel):
users: List[UserResponse]
links: List[Link]
app = FastAPI()
def build_user_links(user_id: int, base_url: str = "http://api.example.com") -> List[Link]:
"""Build HATEOAS links for a user"""
return [
Link(rel="self", href=f"{base_url}/users/{user_id}", method="GET"),
Link(rel="update", href=f"{base_url}/users/{user_id}", method="PUT"),
Link(rel="delete", href=f"{base_url}/users/{user_id}", method="DELETE"),
Link(rel="posts", href=f"{base_url}/users/{user_id}/posts", method="GET"),
Link(rel="create_post", href=f"{base_url}/users/{user_id}/posts", method="POST")
]
@app.get("/users/{user_id}", response_model=UserResponse)
async def get_user(user_id: int):
"""Get user with HATEOAS links"""
user = get_user_from_db(user_id)
return {
"id": user.id,
"name": user.name,
"email": user.email,
"links": build_user_links(user.id)
}
@app.get("/users", response_model=UserListResponse)
async def list_users():
"""List users with HATEOAS links"""
users = get_users_from_db()
users_with_links = [
{
"id": user.id,
"name": user.name,
"email": user.email,
"links": build_user_links(user.id)
}
for user in users
]
collection_links = [
Link(rel="self", href="http://api.example.com/users", method="GET"),
Link(rel="create", href="http://api.example.com/users", method="POST")
]
return {
"users": users_with_links,
"links": collection_links
}python
from fastapi import FastAPI
from pydantic import BaseModel, HttpUrl
from typing import List, Optional
class Link(BaseModel):
rel: str
href: str
method: str = "GET"
class UserResponse(BaseModel):
id: int
name: str
email: str
links: List[Link]
class UserListResponse(BaseModel):
users: List[UserResponse]
links: List[Link]
app = FastAPI()
def build_user_links(user_id: int, base_url: str = "http://api.example.com") -> List[Link]:
"""为用户构建HATEOAS链接"""
return [
Link(rel="self", href=f"{base_url}/users/{user_id}", method="GET"),
Link(rel="update", href=f"{base_url}/users/{user_id}", method="PUT"),
Link(rel="delete", href=f"{base_url}/users/{user_id}", method="DELETE"),
Link(rel="posts", href=f"{base_url}/users/{user_id}/posts", method="GET"),
Link(rel="create_post", href=f"{base_url}/users/{user_id}/posts", method="POST")
]
@app.get("/users/{user_id}", response_model=UserResponse)
async def get_user(user_id: int):
"""获取带HATEOAS链接的用户"""
user = get_user_from_db(user_id)
return {
"id": user.id,
"name": user.name,
"email": user.email,
"links": build_user_links(user.id)
}
@app.get("/users", response_model=UserListResponse)
async def list_users():
"""列出带HATEOAS链接的用户"""
users = get_users_from_db()
users_with_links = [
{
"id": user.id,
"name": user.name,
"email": user.email,
"links": build_user_links(user.id)
}
for user in users
]
collection_links = [
Link(rel="self", href="http://api.example.com/users", method="GET"),
Link(rel="create", href="http://api.example.com/users", method="POST")
]
return {
"users": users_with_links,
"links": collection_links
}Express.js HATEOAS Implementation
Express.js HATEOAS实现
javascript
app.get('/users/:id', async (req, res) => {
try {
const user = await getUserFromDB(req.params.id);
if (!user) {
return res.status(404).json({ error: 'User not found' });
}
const baseUrl = `${req.protocol}://${req.get('host')}`;
res.json({
id: user.id,
name: user.name,
email: user.email,
_links: {
self: { href: `${baseUrl}/users/${user.id}`, method: 'GET' },
update: { href: `${baseUrl}/users/${user.id}`, method: 'PUT' },
delete: { href: `${baseUrl}/users/${user.id}`, method: 'DELETE' },
posts: { href: `${baseUrl}/users/${user.id}/posts`, method: 'GET' },
create_post: { href: `${baseUrl}/users/${user.id}/posts`, method: 'POST' }
}
});
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
});
app.get('/users', async (req, res) => {
try {
const users = await getUsersFromDB();
const baseUrl = `${req.protocol}://${req.get('host')}`;
const usersWithLinks = users.map(user => ({
id: user.id,
name: user.name,
email: user.email,
_links: {
self: { href: `${baseUrl}/users/${user.id}`, method: 'GET' },
update: { href: `${baseUrl}/users/${user.id}`, method: 'PUT' },
delete: { href: `${baseUrl}/users/${user.id}`, method: 'DELETE' }
}
}));
res.json({
users: usersWithLinks,
_links: {
self: { href: `${baseUrl}/users`, method: 'GET' },
create: { href: `${baseUrl}/users`, method: 'POST' }
}
});
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
});javascript
app.get('/users/:id', async (req, res) => {
try {
const user = await getUserFromDB(req.params.id);
if (!user) {
return res.status(404).json({ error: 'User not found' });
}
const baseUrl = `${req.protocol}://${req.get('host')}`;
res.json({
id: user.id,
name: user.name,
email: user.email,
_links: {
self: { href: `${baseUrl}/users/${user.id}`, method: 'GET' },
update: { href: `${baseUrl}/users/${user.id}`, method: 'PUT' },
delete: { href: `${baseUrl}/users/${user.id}`, method: 'DELETE' },
posts: { href: `${baseUrl}/users/${user.id}/posts`, method: 'GET' },
create_post: { href: `${baseUrl}/users/${user.id}/posts`, method: 'POST' }
}
});
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
});
app.get('/users', async (req, res) => {
try {
const users = await getUsersFromDB();
const baseUrl = `${req.protocol}://${req.get('host')}`;
const usersWithLinks = users.map(user => ({
id: user.id,
name: user.name,
email: user.email,
_links: {
self: { href: `${baseUrl}/users/${user.id}`, method: 'GET' },
update: { href: `${baseUrl}/users/${user.id}`, method: 'PUT' },
delete: { href: `${baseUrl}/users/${user.id}`, method: 'DELETE' }
}
}));
res.json({
users: usersWithLinks,
_links: {
self: { href: `${baseUrl}/users`, method: 'GET' },
create: { href: `${baseUrl}/users`, method: 'POST' }
}
});
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
});Performance Optimization
性能优化
Caching with ETags
使用ETags缓存
FastAPI Implementation:
python
from fastapi import FastAPI, Request, Response
from fastapi.responses import JSONResponse
import hashlib
app = FastAPI()
def generate_etag(data: dict) -> str:
"""Generate ETag from response data"""
content = str(data).encode('utf-8')
return hashlib.md5(content).hexdigest()
@app.get("/users/{user_id}")
async def get_user_cached(user_id: int, request: Request):
"""Get user with ETag caching"""
user = get_user_from_db(user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
# Generate ETag
etag = generate_etag(user)
# Check If-None-Match header
if_none_match = request.headers.get("if-none-match")
if if_none_match == etag:
return Response(status_code=304) # Not Modified
# Return with ETag header
return JSONResponse(
content=user,
headers={"ETag": etag, "Cache-Control": "max-age=300"}
)Express.js Implementation:
javascript
const crypto = require('crypto');
function generateETag(data) {
const content = JSON.stringify(data);
return crypto.createHash('md5').update(content).digest('hex');
}
app.get('/users/:id', async (req, res) => {
try {
const user = await getUserFromDB(req.params.id);
if (!user) {
return res.status(404).json({ error: 'User not found' });
}
const etag = generateETag(user);
// Check If-None-Match header
if (req.get('If-None-Match') === etag) {
return res.status(304).send(); // Not Modified
}
res.set('ETag', etag)
.set('Cache-Control', 'max-age=300')
.json(user);
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
});FastAPI实现:
python
from fastapi import FastAPI, Request, Response
from fastapi.responses import JSONResponse
import hashlib
app = FastAPI()
def generate_etag(data: dict) -> str:
"""从响应数据生成ETag"""
content = str(data).encode('utf-8')
return hashlib.md5(content).hexdigest()
@app.get("/users/{user_id}")
async def get_user_cached(user_id: int, request: Request):
"""带ETag缓存的用户获取接口"""
user = get_user_from_db(user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
# 生成ETag
etag = generate_etag(user)
# 检查If-None-Match Header
if_none_match = request.headers.get("if-none-match")
if if_none_match == etag:
return Response(status_code=304) # 未修改
# 返回带ETag Header的响应
return JSONResponse(
content=user,
headers={"ETag": etag, "Cache-Control": "max-age=300"}
)Express.js实现:
javascript
const crypto = require('crypto');
function generateETag(data) {
const content = JSON.stringify(data);
return crypto.createHash('md5').update(content).digest('hex');
}
app.get('/users/:id', async (req, res) => {
try {
const user = await getUserFromDB(req.params.id);
if (!user) {
return res.status(404).json({ error: 'User not found' });
}
const etag = generateETag(user);
// 检查If-None-Match Header
if (req.get('If-None-Match') === etag) {
return res.status(304).send(); // 未修改
}
res.set('ETag', etag)
.set('Cache-Control', 'max-age=300')
.json(user);
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
});Rate Limiting
请求频率限制
Express.js Implementation:
javascript
const rateLimit = require('express-rate-limit');
// Create rate limiter
const apiLimiter = rateLimit({
windowMs: 15 * 60 * 1000, // 15 minutes
max: 100, // Limit each IP to 100 requests per windowMs
message: {
error: {
code: 'RATE_LIMIT_EXCEEDED',
message: 'Too many requests, please try again later'
}
},
standardHeaders: true, // Return rate limit info in headers
legacyHeaders: false
});
// Apply to all routes
app.use('/api/', apiLimiter);
// Or create specific limiters
const createAccountLimiter = rateLimit({
windowMs: 60 * 60 * 1000, // 1 hour
max: 5, // 5 requests per hour
message: 'Too many accounts created, please try again later'
});
app.post('/api/users', createAccountLimiter, async (req, res) => {
// Create user
});Express.js实现:
javascript
const rateLimit = require('express-rate-limit');
// 创建频率限制器
const apiLimiter = rateLimit({
windowMs: 15 * 60 * 1000, // 15分钟
max: 100, // 每个IP在窗口内最多100次请求
message: {
error: {
code: 'RATE_LIMIT_EXCEEDED',
message: '请求过于频繁,请稍后再试'
}
},
standardHeaders: true, // 在Header中返回频率限制信息
legacyHeaders: false
});
// 应用到所有路由
app.use('/api/', apiLimiter);
// 或创建特定的频率限制器
const createAccountLimiter = rateLimit({
windowMs: 60 * 60 * 1000, // 1小时
max: 5, // 每小时最多5次请求
message: '创建账号过于频繁,请稍后再试'
});
app.post('/api/users', createAccountLimiter, async (req, res) => {
// 创建用户
});Compression
压缩
FastAPI Implementation:
python
from fastapi import FastAPI
from fastapi.middleware.gzip import GZipMiddleware
app = FastAPI()FastAPI实现:
python
from fastapi import FastAPI
from fastapi.middleware.gzip import GZipMiddleware
app = FastAPI()Add compression middleware
添加压缩中间件
app.add_middleware(GZipMiddleware, minimum_size=1000)
@app.get("/large-data")
async def get_large_data():
"""This response will be compressed if > 1000 bytes"""
return {"data": [{"id": i, "value": f"item_{i}"} for i in range(1000)]}
**Express.js Implementation:**
```javascript
const compression = require('compression');
// Add compression middleware
app.use(compression({
threshold: 1024, // Only compress responses > 1KB
level: 6 // Compression level (0-9)
}));
app.get('/large-data', (req, res) => {
const data = Array.from({ length: 1000 }, (_, i) => ({
id: i,
value: `item_${i}`
}));
res.json({ data });
});app.add_middleware(GZipMiddleware, minimum_size=1000)
@app.get("/large-data")
async def get_large_data():
"""响应大小超过1000字节时会被压缩"""
return {"data": [{"id": i, "value": f"item_{i}"} for i in range(1000)]}
**Express.js实现:**
```javascript
const compression = require('compression');
// 添加压缩中间件
app.use(compression({
threshold: 1024, // 仅压缩大于1KB的响应
level: 6 // 压缩级别(0-9)
}));
app.get('/large-data', (req, res) => {
const data = Array.from({ length: 1000 }, (_, i) => ({
id: i,
value: `item_${i}`
}));
res.json({ data });
});Security Best Practices
安全最佳实践
Authentication Patterns
认证模式
JWT Authentication in FastAPI:
python
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt
from datetime import datetime, timedelta
app = FastAPI()
security = HTTPBearer()
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
def create_access_token(data: dict, expires_delta: timedelta = None):
"""Create JWT token"""
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""Verify JWT token"""
token = credentials.credentials
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token has expired"
)
except jwt.JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials"
)
@app.post("/login")
async def login(email: str, password: str):
"""Login and get access token"""
user = authenticate_user(email, password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password"
)
access_token = create_access_token(
data={"sub": user.email, "user_id": user.id}
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me")
async def get_current_user(payload: dict = Depends(verify_token)):
"""Protected endpoint - requires authentication"""
user_id = payload.get("user_id")
user = get_user_from_db(user_id)
return userJWT Authentication in Express.js:
javascript
const jwt = require('jsonwebtoken');
const express = require('express');
const app = express();
const SECRET_KEY = 'your-secret-key';
function createAccessToken(data, expiresIn = '15m') {
return jwt.sign(data, SECRET_KEY, { expiresIn });
}
function verifyToken(req, res, next) {
const authHeader = req.get('Authorization');
if (!authHeader || !authHeader.startsWith('Bearer ')) {
return res.status(401).json({ error: 'Missing or invalid authorization header' });
}
const token = authHeader.substring(7);
try {
const payload = jwt.verify(token, SECRET_KEY);
req.user = payload;
next();
} catch (error) {
if (error.name === 'TokenExpiredError') {
return res.status(401).json({ error: 'Token has expired' });
}
return res.status(401).json({ error: 'Invalid token' });
}
}
app.post('/login', async (req, res) => {
const { email, password } = req.body;
const user = await authenticateUser(email, password);
if (!user) {
return res.status(401).json({ error: 'Incorrect email or password' });
}
const accessToken = createAccessToken({
sub: user.email,
user_id: user.id
});
res.json({ access_token: accessToken, token_type: 'bearer' });
});
app.get('/users/me', verifyToken, async (req, res) => {
const user = await getUserFromDB(req.user.user_id);
res.json(user);
});FastAPI中的JWT认证:
python
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt
from datetime import datetime, timedelta
app = FastAPI()
security = HTTPBearer()
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
def create_access_token(data: dict, expires_delta: timedelta = None):
"""创建JWT令牌"""
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""验证JWT令牌"""
token = credentials.credentials
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="令牌已过期"
)
except jwt.JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="凭证验证失败"
)
@app.post("/login")
async def login(email: str, password: str):
"""登录并获取访问令牌"""
user = authenticate_user(email, password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="邮箱或密码错误"
)
access_token = create_access_token(
data={"sub": user.email, "user_id": user.id}
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me")
async def get_current_user(payload: dict = Depends(verify_token)):
"""受保护的端点 - 需要认证"""
user_id = payload.get("user_id")
user = get_user_from_db(user_id)
return userExpress.js中的JWT认证:
javascript
const jwt = require('jsonwebtoken');
const express = require('express');
const app = express();
const SECRET_KEY = 'your-secret-key';
function createAccessToken(data, expiresIn = '15m') {
return jwt.sign(data, SECRET_KEY, { expiresIn });
}
function verifyToken(req, res, next) {
const authHeader = req.get('Authorization');
if (!authHeader || !authHeader.startsWith('Bearer ')) {
return res.status(401).json({ error: 'Missing or invalid authorization header' });
}
const token = authHeader.substring(7);
try {
const payload = jwt.verify(token, SECRET_KEY);
req.user = payload;
next();
} catch (error) {
if (error.name === 'TokenExpiredError') {
return res.status(401).json({ error: '令牌已过期' });
}
return res.status(401).json({ error: '无效令牌' });
}
}
app.post('/login', async (req, res) => {
const { email, password } = req.body;
const user = await authenticateUser(email, password);
if (!user) {
return res.status(401).json({ error: '邮箱或密码错误' });
}
const accessToken = createAccessToken({
sub: user.email,
user_id: user.id
});
res.json({ access_token: accessToken, token_type: 'bearer' });
});
app.get('/users/me', verifyToken, async (req, res) => {
const user = await getUserFromDB(req.user.user_id);
res.json(user);
});Input Validation and Sanitization
输入验证与清理
FastAPI Validation:
python
from fastapi import FastAPI
from pydantic import BaseModel, EmailStr, Field, validator
from typing import Optional
class UserCreate(BaseModel):
email: EmailStr
username: str = Field(..., min_length=3, max_length=50, pattern="^[a-zA-Z0-9_-]+$")
password: str = Field(..., min_length=8)
age: Optional[int] = Field(None, ge=0, le=150)
@validator('password')
def password_strength(cls, v):
"""Validate password strength"""
if not any(char.isdigit() for char in v):
raise ValueError('Password must contain at least one digit')
if not any(char.isupper() for char in v):
raise ValueError('Password must contain at least one uppercase letter')
return v
@app.post("/users")
async def create_user(user: UserCreate):
"""Automatically validates input"""
# Input is already validated by Pydantic
hashed_password = hash_password(user.password)
new_user = create_user_in_db(user.email, user.username, hashed_password)
return new_userFastAPI验证:
python
from fastapi import FastAPI
from pydantic import BaseModel, EmailStr, Field, validator
from typing import Optional
class UserCreate(BaseModel):
email: EmailStr
username: str = Field(..., min_length=3, max_length=50, pattern="^[a-zA-Z0-9_-]+$")
password: str = Field(..., min_length=8)
age: Optional[int] = Field(None, ge=0, le=150)
@validator('password')
def password_strength(cls, v):
"""验证密码强度"""
if not any(char.isdigit() for char in v):
raise ValueError('密码必须包含至少一个数字')
if not any(char.isupper() for char in v):
raise ValueError('密码必须包含至少一个大写字母')
return v
@app.post("/users")
async def create_user(user: UserCreate):
"""自动验证输入"""
# 输入已被Pydantic验证
hashed_password = hash_password(user.password)
new_user = create_user_in_db(user.email, user.username, hashed_password)
return new_userAPI Documentation
API文档
OpenAPI/Swagger with FastAPI
FastAPI中的OpenAPI/Swagger
FastAPI automatically generates OpenAPI documentation:
python
from fastapi import FastAPI
from pydantic import BaseModel, Field
app = FastAPI(
title="My API",
description="Comprehensive API for managing resources",
version="1.0.0",
docs_url="/api/docs",
redoc_url="/api/redoc"
)
class Item(BaseModel):
"""Item model with rich documentation"""
name: str = Field(..., description="The name of the item", example="Widget")
price: float = Field(..., description="Price in USD", example=19.99, gt=0)
description: str = Field(None, description="Optional item description")
model_config = {
"json_schema_extra": {
"examples": [
{
"name": "Super Widget",
"price": 29.99,
"description": "An amazing widget"
}
]
}
}
@app.post(
"/items",
response_model=Item,
status_code=201,
summary="Create a new item",
description="Create a new item with name, price, and optional description",
response_description="The created item",
tags=["items"]
)
async def create_item(item: Item):
"""
Create a new item with all the information:
- **name**: The item name (required)
- **price**: The item price in USD (required, must be positive)
- **description**: Optional description of the item
"""
return itemFastAPI会自动生成OpenAPI文档:
python
from fastapi import FastAPI
from pydantic import BaseModel, Field
app = FastAPI(
title="My API",
description="管理资源的综合API",
version="1.0.0",
docs_url="/api/docs",
redoc_url="/api/redoc"
)
class Item(BaseModel):
"""带有详细文档的物品模型"""
name: str = Field(..., description="物品名称", example="Widget")
price: float = Field(..., description="美元价格", example=19.99, gt=0)
description: str = Field(None, description="可选的物品描述")
model_config = {
"json_schema_extra": {
"examples": [
{
"name": "Super Widget",
"price": 29.99,
"description": "An amazing widget"
}
]
}
}
@app.post(
"/items",
response_model=Item,
status_code=201,
summary="创建新物品",
description="使用名称、价格和可选描述创建新物品",
response_description="已创建的物品",
tags=["items"]
)
async def create_item(item: Item):
"""
创建包含所有信息的新物品:
- **name**: 物品名称(必填)
- **price**: 物品的美元价格(必填,必须为正数)
- **description**: 可选的物品描述
"""
return itemBest Practices Summary
最佳实践总结
API Design Principles
API设计原则
- Use nouns for resources, not verbs
- Use plural nouns for collections
- Use HTTP methods correctly (GET, POST, PUT, PATCH, DELETE)
- Use proper HTTP status codes
- Version your API (URI versioning recommended)
- Support pagination for collections
- Allow filtering and sorting with query parameters
- Return consistent error responses
- Use HATEOAS for better discoverability
- Document your API with OpenAPI/Swagger
- 资源使用名词而非动词
- 集合资源使用复数名词
- 正确使用HTTP方法(GET、POST、PUT、PATCH、DELETE)
- 使用正确的HTTP状态码
- 为API添加版本控制(推荐使用URI版本控制)
- 集合资源支持分页
- 允许通过查询参数进行过滤和排序
- 返回统一的错误响应
- 使用HATEOAS提升可发现性
- 使用OpenAPI/Swagger编写API文档
Security Principles
安全原则
- Always use HTTPS in production
- Implement authentication (JWT, OAuth, API keys)
- Validate all inputs thoroughly
- Use rate limiting to prevent abuse
- Sanitize outputs to prevent XSS
- Implement CORS correctly
- Use security headers (CSP, X-Frame-Options, etc.)
- Log security events for monitoring
- Keep dependencies updated
- Never expose sensitive data in responses
- 生产环境始终使用HTTPS
- 实现认证机制(JWT、OAuth、API密钥)
- 彻底验证所有输入
- 使用频率限制防止滥用
- 清理输出以防止XSS攻击
- 正确实现CORS
- 使用安全Header(CSP、X-Frame-Options等)
- 记录安全事件以便监控
- 保持依赖库更新
- 响应中绝不暴露敏感数据
Performance Principles
性能原则
- Use caching (ETags, Cache-Control headers)
- Implement compression for large responses
- Use pagination for large datasets
- Optimize database queries (indexes, N+1 prevention)
- Use async/await for I/O operations
- Implement connection pooling
- Monitor API performance (response times, error rates)
- Use CDNs for static content
- Implement proper logging without blocking
- Load test your API regularly
Skill Version: 1.0.0
Last Updated: October 2025
Skill Category: API Design, Backend Development, REST Architecture
Compatible With: FastAPI, Express.js, Node.js, Python, HTTP Frameworks
- 使用缓存(ETags、Cache-Control Headers)
- 对大响应进行压缩
- 大数据集使用分页
- 优化数据库查询(索引、避免N+1问题)
- I/O操作使用async/await
- 实现连接池
- 监控API性能(响应时间、错误率)
- 静态内容使用CDN
- 实现非阻塞的正确日志
- 定期对API进行负载测试
技能版本: 1.0.0
最后更新: 2025年10月
技能分类: API设计、后端开发、REST架构
兼容框架: FastAPI、Express.js、Node.js、Python、HTTP框架