api-security-review
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseAPI Security Review Skill
API安全审查指南
Summary
概述
Comprehensive security checklist for API endpoint development. Ensures proper authentication, authorization, input validation, output safety, and security logging are implemented before deployment.
API端点开发的全面安全检查清单。确保在部署前正确实现身份验证、授权、输入验证、输出安全和安全日志记录。
When to Use
使用场景
- Before merging any PR with API changes
- When creating new API endpoints
- When modifying authentication/authorization logic
- During security audits
- Code review of API routes
- 在合并任何包含API变更的PR之前
- 创建新API端点时
- 修改身份验证/授权逻辑时
- 安全审计期间
- API路由的代码审查
Quick Checklist
快速检查清单
Pre-Deployment Security Audit
部署前安全审计
- Authentication: Route requires valid user identity
- Authorization: Ownership/permission checks implemented
- Input Validation: All inputs validated with schema (Zod/Joi/etc.)
- Output Safety: No sensitive data exposed in responses
- Logging: Security events logged appropriately
- Rate Limiting: Protection against abuse configured
- Error Handling: No system information leaked in errors
- 身份验证:路由需要有效的用户身份
- 授权:已实现所有权/权限检查
- 输入验证:所有输入均通过模式验证(Zod/Joi等)
- 输出安全:响应中未暴露敏感数据
- 日志记录:已适当记录安全事件
- 速率限制:已配置防滥用保护
- 错误处理:错误中未泄露系统信息
Authentication
身份验证
Requirements
要求
Every API endpoint must verify the user's identity before processing requests.
每个API端点在处理请求前必须验证用户身份。
Next.js (App Router) with Clerk
搭配Clerk的Next.js(App Router)
typescript
import { auth } from '@clerk/nextjs';
import { NextResponse } from 'next/server';
export async function GET(request: Request) {
// 1. Authenticate request
const { userId } = await auth();
if (!userId) {
return NextResponse.json(
{ error: "Unauthorized" },
{ status: 401 }
);
}
// Continue with authenticated request...
}typescript
import { auth } from '@clerk/nextjs';
import { NextResponse } from 'next/server';
export async function GET(request: Request) {
// 1. 验证请求
const { userId } = await auth();
if (!userId) {
return NextResponse.json(
{ error: "Unauthorized" },
{ status: 401 }
);
}
// 继续处理已验证的请求...
}Express.js with JWT
搭配JWT的Express.js
typescript
import jwt from 'jsonwebtoken';
import { Request, Response, NextFunction } from 'express';
function authenticateToken(req: Request, res: Response, next: NextFunction) {
const authHeader = req.headers['authorization'];
const token = authHeader && authHeader.split(' ')[1];
if (!token) {
return res.sendStatus(401);
}
jwt.verify(token, process.env.JWT_SECRET!, (err, user) => {
if (err) return res.sendStatus(403);
req.user = user;
next();
});
}
app.get('/api/protected', authenticateToken, (req, res) => {
// Request is authenticated
});typescript
import jwt from 'jsonwebtoken';
import { Request, Response, NextFunction } from 'express';
function authenticateToken(req: Request, res: Response, next: NextFunction) {
const authHeader = req.headers['authorization'];
const token = authHeader && authHeader.split(' ')[1];
if (!token) {
return res.sendStatus(401);
}
jwt.verify(token, process.env.JWT_SECRET!, (err, user) => {
if (err) return res.sendStatus(403);
req.user = user;
next();
});
}
app.get('/api/protected', authenticateToken, (req, res) => {
// 请求已通过验证
});FastAPI with OAuth2
搭配OAuth2的FastAPI
python
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
async def get_current_user(token: str = Depends(oauth2_scheme)):
user = await verify_token(token)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return user
@app.get("/api/protected")
async def protected_route(current_user: User = Depends(get_current_user)):
return {"user": current_user.email}python
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
async def get_current_user(token: str = Depends(oauth2_scheme)):
user = await verify_token(token)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return user
@app.get("/api/protected")
async def protected_route(current_user: User = Depends(get_current_user)):
return {"user": current_user.email}Django REST Framework
Django REST Framework
python
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
@api_view(['GET'])
@permission_classes([IsAuthenticated])
def protected_view(request):
# request.user is authenticated
return Response({'user': request.user.email})python
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
@api_view(['GET'])
@permission_classes([IsAuthenticated])
def protected_view(request):
# request.user已通过验证
return Response({'user': request.user.email})Authorization
授权
Resource Ownership Verification
资源所有权验证
Authentication proves WHO the user is. Authorization proves the user has permission to access the resource.
身份验证确认用户是谁。授权确认用户有权访问该资源。
Next.js Example
Next.js示例
typescript
import { auth } from '@clerk/nextjs';
import { NextResponse } from 'next/server';
import { db } from '@/lib/db';
import { eq } from 'drizzle-orm';
export async function GET(
request: Request,
{ params }: { params: { id: string } }
) {
// 1. Authenticate
const { userId } = await auth();
if (!userId) {
return NextResponse.json({ error: "Unauthorized" }, { status: 401 });
}
// 2. Fetch resource
const resource = await db.query.resources.findFirst({
where: eq(resources.id, params.id)
});
if (!resource) {
return NextResponse.json({ error: "Not found" }, { status: 404 });
}
// 3. Authorize - Check ownership
if (resource.ownerId !== userId) {
return NextResponse.json({ error: "Forbidden" }, { status: 403 });
}
// 4. Return authorized data
return NextResponse.json(resource);
}typescript
import { auth } from '@clerk/nextjs';
import { NextResponse } from 'next/server';
import { db } from '@/lib/db';
import { eq } from 'drizzle-orm';
export async function GET(
request: Request,
{ params }: { params: { id: string } }
) {
// 1. 身份验证
const { userId } = await auth();
if (!userId) {
return NextResponse.json({ error: "Unauthorized" }, { status: 401 });
}
// 2. 获取资源
const resource = await db.query.resources.findFirst({
where: eq(resources.id, params.id)
});
if (!resource) {
return NextResponse.json({ error: "Not found" }, { status: 404 });
}
// 3. 授权 - 检查所有权
if (resource.ownerId !== userId) {
return NextResponse.json({ error: "Forbidden" }, { status: 403 });
}
// 4. 返回授权数据
return NextResponse.json(resource);
}Role-Based Access Control (RBAC)
基于角色的访问控制(RBAC)
typescript
enum Role {
USER = 'user',
ADMIN = 'admin',
MODERATOR = 'moderator'
}
function requireRole(allowedRoles: Role[]) {
return async (request: Request) => {
const { userId } = await auth();
if (!userId) {
return NextResponse.json({ error: "Unauthorized" }, { status: 401 });
}
const user = await db.query.users.findFirst({
where: eq(users.clerkId, userId)
});
if (!user || !allowedRoles.includes(user.role)) {
return NextResponse.json({ error: "Forbidden" }, { status: 403 });
}
return null; // Authorized
};
}
export async function DELETE(request: Request) {
const authError = await requireRole([Role.ADMIN, Role.MODERATOR])(request);
if (authError) return authError;
// User is authorized as admin or moderator
}typescript
enum Role {
USER = 'user',
ADMIN = 'admin',
MODERATOR = 'moderator'
}
function requireRole(allowedRoles: Role[]) {
return async (request: Request) => {
const { userId } = await auth();
if (!userId) {
return NextResponse.json({ error: "Unauthorized" }, { status: 401 });
}
const user = await db.query.users.findFirst({
where: eq(users.clerkId, userId)
});
if (!user || !allowedRoles.includes(user.role)) {
return NextResponse.json({ error: "Forbidden" }, { status: 403 });
}
return null; // 已授权
};
}
export async function DELETE(request: Request) {
const authError = await requireRole([Role.ADMIN, Role.MODERATOR])(request);
if (authError) return authError;
// 用户已被授权为管理员或版主
}Multi-Tenant Data Isolation
多租户数据隔离
typescript
// CRITICAL: Prevent cross-tenant data leaks
// ❌ WRONG - No tenant check
const orders = await db.query.orders.findMany({
where: eq(orders.userId, userId)
});
// ✅ CORRECT - Tenant isolation
const user = await db.query.users.findFirst({
where: eq(users.clerkId, userId)
});
const orders = await db.query.orders.findMany({
where: and(
eq(orders.userId, userId),
eq(orders.tenantId, user.tenantId) // CRITICAL: tenant boundary
)
});typescript
// 关键:防止跨租户数据泄露
// ❌ 错误 - 无租户检查
const orders = await db.query.orders.findMany({
where: eq(orders.userId, userId)
});
// ✅ 正确 - 租户隔离
const user = await db.query.users.findFirst({
where: eq(users.clerkId, userId)
});
const orders = await db.query.orders.findMany({
where: and(
eq(orders.userId, userId),
eq(orders.tenantId, user.tenantId) // 关键:租户边界
)
});Input Validation
输入验证
Zod Schema Validation (TypeScript)
Zod模式验证(TypeScript)
typescript
import { z } from 'zod';
import { NextResponse } from 'next/server';
const updateUserSchema = z.object({
id: z.string().uuid(),
email: z.string().email().optional(),
age: z.number().int().min(0).max(150).optional(),
role: z.enum(['user', 'admin', 'moderator']).optional(),
});
export async function PATCH(request: Request) {
const { userId } = await auth();
if (!userId) {
return NextResponse.json({ error: "Unauthorized" }, { status: 401 });
}
// Parse and validate input
const body = await request.json();
const result = updateUserSchema.safeParse(body);
if (!result.success) {
return NextResponse.json({
error: "Validation failed",
details: result.error.issues
}, { status: 400 });
}
// Safe to use validated data
const validatedData = result.data;
// ... update logic
}typescript
import { z } from 'zod';
import { NextResponse } from 'next/server';
const updateUserSchema = z.object({
id: z.string().uuid(),
email: z.string().email().optional(),
age: z.number().int().min(0).max(150).optional(),
role: z.enum(['user', 'admin', 'moderator']).optional(),
});
export async function PATCH(request: Request) {
const { userId } = await auth();
if (!userId) {
return NextResponse.json({ error: "Unauthorized" }, { status: 401 });
}
// 解析并验证输入
const body = await request.json();
const result = updateUserSchema.safeParse(body);
if (!result.success) {
return NextResponse.json({
error: "Validation failed",
details: result.error.issues
}, { status: 400 });
}
// 可安全使用已验证的数据
const validatedData = result.data;
// ... 更新逻辑
}Pydantic Validation (Python)
Pydantic验证(Python)
python
from pydantic import BaseModel, EmailStr, Field, validator
from fastapi import HTTPException
class UpdateUser(BaseModel):
id: str = Field(..., regex=r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$')
email: EmailStr | None = None
age: int | None = Field(None, ge=0, le=150)
role: str | None = Field(None, regex=r'^(user|admin|moderator)$')
@validator('email')
def email_must_not_be_disposable(cls, v):
if v and any(domain in v for domain in ['tempmail.com', '10minutemail.com']):
raise ValueError('Disposable email addresses not allowed')
return v
@app.patch("/api/users")
async def update_user(user_data: UpdateUser, current_user: User = Depends(get_current_user)):
# user_data is validated
return {"status": "updated"}python
from pydantic import BaseModel, EmailStr, Field, validator
from fastapi import HTTPException
class UpdateUser(BaseModel):
id: str = Field(..., regex=r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$')
email: EmailStr | None = None
age: int | None = Field(None, ge=0, le=150)
role: str | None = Field(None, regex=r'^(user|admin|moderator)$')
@validator('email')
def email_must_not_be_disposable(cls, v):
if v and any(domain in v for domain in ['tempmail.com', '10minutemail.com']):
raise ValueError('Disposable email addresses not allowed')
return v
@app.patch("/api/users")
async def update_user(user_data: UpdateUser, current_user: User = Depends(get_current_user)):
# user_data已通过验证
return {"status": "updated"}SQL Injection Prevention
SQL注入防护
typescript
// ❌ NEVER: Raw SQL with string interpolation
const userId = request.params.id;
const query = `SELECT * FROM users WHERE id = '${userId}'`; // VULNERABLE!
db.execute(query);
// ✅ ALWAYS: Use ORM or parameterized queries
import { eq } from 'drizzle-orm';
const user = await db.query.users.findFirst({
where: eq(users.id, userId)
});
// ✅ OR: Parameterized raw query
const [user] = await db.execute(
'SELECT * FROM users WHERE id = ?',
[userId]
);typescript
// ❌ 严禁:使用字符串插值的原生SQL
const userId = request.params.id;
const query = `SELECT * FROM users WHERE id = '${userId}'`; // 存在漏洞!
db.execute(query);
// ✅ 推荐:使用ORM或参数化查询
import { eq } from 'drizzle-orm';
const user = await db.query.users.findFirst({
where: eq(users.id, userId)
});
// ✅ 或:参数化原生查询
const [user] = await db.execute(
'SELECT * FROM users WHERE id = ?',
[userId]
);File Upload Validation
文件上传验证
typescript
const MAX_FILE_SIZE = 5 * 1024 * 1024; // 5MB
const ALLOWED_TYPES = ['image/jpeg', 'image/png', 'image/gif'];
export async function POST(request: Request) {
const formData = await request.formData();
const file = formData.get('file') as File;
if (!file) {
return NextResponse.json({ error: "No file provided" }, { status: 400 });
}
// Validate file size
if (file.size > MAX_FILE_SIZE) {
return NextResponse.json({
error: "File too large. Max 5MB"
}, { status: 400 });
}
// Validate file type
if (!ALLOWED_TYPES.includes(file.type)) {
return NextResponse.json({
error: "Invalid file type. Only JPEG, PNG, GIF allowed"
}, { status: 400 });
}
// Process file...
}typescript
const MAX_FILE_SIZE = 5 * 1024 * 1024; // 5MB
const ALLOWED_TYPES = ['image/jpeg', 'image/png', 'image/gif'];
export async function POST(request: Request) {
const formData = await request.formData();
const file = formData.get('file') as File;
if (!file) {
return NextResponse.json({ error: "No file provided" }, { status: 400 });
}
// 验证文件大小
if (file.size > MAX_FILE_SIZE) {
return NextResponse.json({
error: "File too large. Max 5MB"
}, { status: 400 });
}
// 验证文件类型
if (!ALLOWED_TYPES.includes(file.type)) {
return NextResponse.json({
error: "Invalid file type. Only JPEG, PNG, GIF allowed"
}, { status: 400 });
}
// 处理文件...
}Output Safety
输出安全
Remove Sensitive Data from Responses
从响应中移除敏感数据
typescript
// ❌ WRONG - Exposing sensitive fields
const user = await db.query.users.findFirst({
where: eq(users.id, userId)
});
return NextResponse.json(user); // Includes password hash, internal IDs, etc.
// ✅ CORRECT - Explicitly select safe fields
const user = await db.query.users.findFirst({
where: eq(users.id, userId),
columns: {
id: true,
email: true,
name: true,
createdAt: true,
// Exclude: passwordHash, internalNotes, apiKey, etc.
}
});
return NextResponse.json(user);
// ✅ BETTER - Use DTOs
interface PublicUserDTO {
id: string;
email: string;
name: string;
createdAt: Date;
}
function toPublicUser(user: User): PublicUserDTO {
return {
id: user.id,
email: user.email,
name: user.name,
createdAt: user.createdAt
};
}
return NextResponse.json(toPublicUser(user));typescript
// ❌ 错误 - 暴露敏感字段
const user = await db.query.users.findFirst({
where: eq(users.id, userId)
});
return NextResponse.json(user); // 包含密码哈希、内部ID等
// ✅ 正确 - 明确选择安全字段
const user = await db.query.users.findFirst({
where: eq(users.id, userId),
columns: {
id: true,
email: true,
name: true,
createdAt: true,
// 排除:passwordHash、internalNotes、apiKey等
}
});
return NextResponse.json(user);
// ✅ 更佳 - 使用DTO
interface PublicUserDTO {
id: string;
email: string;
name: string;
createdAt: Date;
}
function toPublicUser(user: User): PublicUserDTO {
return {
id: user.id,
email: user.email,
name: user.name,
createdAt: user.createdAt
};
}
return NextResponse.json(toPublicUser(user));Mask PII in Logs
在日志中屏蔽个人可识别信息(PII)
typescript
function sanitizeForLogging(data: any) {
const sanitized = { ...data };
// Mask email
if (sanitized.email) {
const [local, domain] = sanitized.email.split('@');
sanitized.email = `${local.slice(0, 2)}***@${domain}`;
}
// Mask SSN
if (sanitized.ssn) {
sanitized.ssn = `***-**-${sanitized.ssn.slice(-4)}`;
}
// Remove sensitive fields
delete sanitized.passwordHash;
delete sanitized.apiKey;
return sanitized;
}
console.log('User updated:', sanitizeForLogging(user));typescript
function sanitizeForLogging(data: any) {
const sanitized = { ...data };
// 屏蔽邮箱
if (sanitized.email) {
const [local, domain] = sanitized.email.split('@');
sanitized.email = `${local.slice(0, 2)}***@${domain}`;
}
// 屏蔽社保号(SSN)
if (sanitized.ssn) {
sanitized.ssn = `***-**-${sanitized.ssn.slice(-4)}`;
}
// 移除敏感字段
delete sanitized.passwordHash;
delete sanitized.apiKey;
return sanitized;
}
console.log('User updated:', sanitizeForLogging(user));Safe Error Messages
安全错误消息
typescript
// ❌ WRONG - Leaking system information
try {
await db.execute(query);
} catch (error) {
return NextResponse.json({
error: error.message, // Might expose SQL, file paths, etc.
stack: error.stack // NEVER expose in production
}, { status: 500 });
}
// ✅ CORRECT - Generic error with logging
try {
await db.execute(query);
} catch (error) {
console.error('Database error:', error); // Log full error internally
return NextResponse.json({
error: "An error occurred processing your request"
}, { status: 500 });
}typescript
// ❌ 错误 - 泄露系统信息
try {
await db.execute(query);
} catch (error) {
return NextResponse.json({
error: error.message, // 可能暴露SQL、文件路径等
stack: error.stack // 生产环境中严禁暴露
}, { status: 500 });
}
// ✅ 正确 - 通用错误+内部日志
try {
await db.execute(query);
} catch (error) {
console.error('Database error:', error); // 内部记录完整错误
return NextResponse.json({
error: "An error occurred processing your request"
}, { status: 500 });
}Logging
日志记录
Security Event Logging
安全事件日志
typescript
enum SecurityEvent {
AUTH_FAILURE = 'auth_failure',
UNAUTHORIZED_ACCESS = 'unauthorized_access',
PERMISSION_DENIED = 'permission_denied',
RATE_LIMIT_EXCEEDED = 'rate_limit_exceeded',
INVALID_INPUT = 'invalid_input'
}
function logSecurityEvent(event: SecurityEvent, details: any) {
console.log(JSON.stringify({
timestamp: new Date().toISOString(),
event,
userId: details.userId || 'anonymous',
ip: details.ip,
endpoint: details.endpoint,
details: sanitizeForLogging(details)
}));
}
// Usage
export async function GET(request: Request) {
const { userId } = await auth();
if (!userId) {
logSecurityEvent(SecurityEvent.AUTH_FAILURE, {
ip: request.headers.get('x-forwarded-for'),
endpoint: request.url
});
return NextResponse.json({ error: "Unauthorized" }, { status: 401 });
}
}typescript
enum SecurityEvent {
AUTH_FAILURE = 'auth_failure',
UNAUTHORIZED_ACCESS = 'unauthorized_access',
PERMISSION_DENIED = 'permission_denied',
RATE_LIMIT_EXCEEDED = 'rate_limit_exceeded',
INVALID_INPUT = 'invalid_input'
}
function logSecurityEvent(event: SecurityEvent, details: any) {
console.log(JSON.stringify({
timestamp: new Date().toISOString(),
event,
userId: details.userId || 'anonymous',
ip: details.ip,
endpoint: details.endpoint,
details: sanitizeForLogging(details)
}));
}
// 使用示例
export async function GET(request: Request) {
const { userId } = await auth();
if (!userId) {
logSecurityEvent(SecurityEvent.AUTH_FAILURE, {
ip: request.headers.get('x-forwarded-for'),
endpoint: request.url
});
return NextResponse.json({ error: "Unauthorized" }, { status: 401 });
}
}Request Tracing
请求追踪
typescript
import { v4 as uuidv4 } from 'uuid';
export async function middleware(request: Request) {
const requestId = uuidv4();
console.log(JSON.stringify({
requestId,
method: request.method,
url: request.url,
timestamp: new Date().toISOString()
}));
// Pass request ID through headers
const response = await fetch(request.url, {
headers: {
...request.headers,
'X-Request-ID': requestId
}
});
return response;
}typescript
import { v4 as uuidv4 } from 'uuid';
export async function middleware(request: Request) {
const requestId = uuidv4();
console.log(JSON.stringify({
requestId,
method: request.method,
url: request.url,
timestamp: new Date().toISOString()
}));
// 通过请求头传递请求ID
const response = await fetch(request.url, {
headers: {
...request.headers,
'X-Request-ID': requestId
}
});
return response;
}Example Secure Endpoint
示例安全端点
Complete Next.js API Route
完整的Next.js API路由
typescript
import { auth } from '@clerk/nextjs';
import { NextResponse } from 'next/server';
import { db } from '@/lib/db';
import { eq, and } from 'drizzle-orm';
import { z } from 'zod';
import { ratelimit } from '@/lib/ratelimit';
// 1. Input validation schema
const updateResourceSchema = z.object({
name: z.string().min(1).max(100),
description: z.string().max(500).optional(),
isPublic: z.boolean().optional()
});
// 2. DTO for safe output
interface ResourceDTO {
id: string;
name: string;
description: string;
isPublic: boolean;
createdAt: Date;
}
function toResourceDTO(resource: any): ResourceDTO {
return {
id: resource.id,
name: resource.name,
description: resource.description,
isPublic: resource.isPublic,
createdAt: resource.createdAt
// Exclude: ownerId, internalNotes, etc.
};
}
export async function PATCH(
request: Request,
{ params }: { params: { id: string } }
) {
try {
// 3. Rate limiting
const { success } = await ratelimit.limit(request.headers.get('x-forwarded-for') || 'anonymous');
if (!success) {
return NextResponse.json({ error: "Too many requests" }, { status: 429 });
}
// 4. Authentication
const { userId } = await auth();
if (!userId) {
console.log('Auth failure:', { endpoint: request.url });
return NextResponse.json({ error: "Unauthorized" }, { status: 401 });
}
// 5. Input validation
const body = await request.json();
const result = updateResourceSchema.safeParse(body);
if (!result.success) {
return NextResponse.json({
error: "Validation failed",
details: result.error.issues
}, { status: 400 });
}
// 6. Fetch and verify existence
const resource = await db.query.resources.findFirst({
where: eq(resources.id, params.id)
});
if (!resource) {
return NextResponse.json({ error: "Not found" }, { status: 404 });
}
// 7. Authorization check
if (resource.ownerId !== userId) {
console.log('Permission denied:', { userId, resourceId: params.id });
return NextResponse.json({ error: "Forbidden" }, { status: 403 });
}
// 8. Update resource
const [updatedResource] = await db.update(resources)
.set({
...result.data,
updatedAt: new Date()
})
.where(eq(resources.id, params.id))
.returning();
// 9. Return safe response
return NextResponse.json(toResourceDTO(updatedResource));
} catch (error) {
// 10. Safe error handling
console.error('Error updating resource:', error);
return NextResponse.json({
error: "An error occurred"
}, { status: 500 });
}
}typescript
import { auth } from '@clerk/nextjs';
import { NextResponse } from 'next/server';
import { db } from '@/lib/db';
import { eq, and } from 'drizzle-orm';
import { z } from 'zod';
import { ratelimit } from '@/lib/ratelimit';
// 1. 输入验证模式
const updateResourceSchema = z.object({
name: z.string().min(1).max(100),
description: z.string().max(500).optional(),
isPublic: z.boolean().optional()
});
// 2. 用于安全输出的DTO
interface ResourceDTO {
id: string;
name: string;
description: string;
isPublic: boolean;
createdAt: Date;
}
function toResourceDTO(resource: any): ResourceDTO {
return {
id: resource.id,
name: resource.name,
description: resource.description,
isPublic: resource.isPublic,
createdAt: resource.createdAt
// 排除:ownerId、internalNotes等
};
}
export async function PATCH(
request: Request,
{ params }: { params: { id: string } }
) {
try {
// 3. 速率限制
const { success } = await ratelimit.limit(request.headers.get('x-forwarded-for') || 'anonymous');
if (!success) {
return NextResponse.json({ error: "Too many requests" }, { status: 429 });
}
// 4. 身份验证
const { userId } = await auth();
if (!userId) {
console.log('Auth failure:', { endpoint: request.url });
return NextResponse.json({ error: "Unauthorized" }, { status: 401 });
}
// 5. 输入验证
const body = await request.json();
const result = updateResourceSchema.safeParse(body);
if (!result.success) {
return NextResponse.json({
error: "Validation failed",
details: result.error.issues
}, { status: 400 });
}
// 6. 获取并验证资源存在性
const resource = await db.query.resources.findFirst({
where: eq(resources.id, params.id)
});
if (!resource) {
return NextResponse.json({ error: "Not found" }, { status: 404 });
}
// 7. 授权检查
if (resource.ownerId !== userId) {
console.log('Permission denied:', { userId, resourceId: params.id });
return NextResponse.json({ error: "Forbidden" }, { status: 403 });
}
// 8. 更新资源
const [updatedResource] = await db.update(resources)
.set({
...result.data,
updatedAt: new Date()
})
.where(eq(resources.id, params.id))
.returning();
// 9. 返回安全响应
return NextResponse.json(toResourceDTO(updatedResource));
} catch (error) {
// 10. 安全错误处理
console.error('Error updating resource:', error);
return NextResponse.json({
error: "An error occurred"
}, { status: 500 });
}
}Framework Patterns
框架模式
Express.js Middleware Pattern
Express.js中间件模式
typescript
import { Request, Response, NextFunction } from 'express';
import { z } from 'zod';
// Validation middleware factory
function validateSchema(schema: z.ZodSchema) {
return (req: Request, res: Response, next: NextFunction) => {
const result = schema.safeParse(req.body);
if (!result.success) {
return res.status(400).json({
error: 'Validation failed',
details: result.error.issues
});
}
req.body = result.data;
next();
};
}
// Authorization middleware
async function requireOwnership(req: Request, res: Response, next: NextFunction) {
const resource = await db.resources.findById(req.params.id);
if (!resource) {
return res.status(404).json({ error: 'Not found' });
}
if (resource.ownerId !== req.user.id) {
return res.status(403).json({ error: 'Forbidden' });
}
req.resource = resource;
next();
}
// Usage
const updateSchema = z.object({ name: z.string() });
app.patch('/api/resources/:id',
authenticate,
validateSchema(updateSchema),
requireOwnership,
async (req, res) => {
// All checks passed
const updated = await updateResource(req.resource, req.body);
res.json(toDTO(updated));
}
);typescript
import { Request, Response, NextFunction } from 'express';
import { z } from 'zod';
// 验证中间件工厂
function validateSchema(schema: z.ZodSchema) {
return (req: Request, res: Response, next: NextFunction) => {
const result = schema.safeParse(req.body);
if (!result.success) {
return res.status(400).json({
error: 'Validation failed',
details: result.error.issues
});
}
req.body = result.data;
next();
};
}
// 授权中间件
async function requireOwnership(req: Request, res: Response, next: NextFunction) {
const resource = await db.resources.findById(req.params.id);
if (!resource) {
return res.status(404).json({ error: 'Not found' });
}
if (resource.ownerId !== req.user.id) {
return res.status(403).json({ error: 'Forbidden' });
}
req.resource = resource;
next();
}
// 使用示例
const updateSchema = z.object({ name: z.string() });
app.patch('/api/resources/:id',
authenticate,
validateSchema(updateSchema),
requireOwnership,
async (req, res) => {
// 所有检查已通过
const updated = await updateResource(req.resource, req.body);
res.json(toDTO(updated));
}
);FastAPI Dependency Injection
FastAPI依赖注入
python
from fastapi import Depends, HTTPException
from typing import Annotated
async def verify_ownership(
resource_id: str,
current_user: User = Depends(get_current_user)
):
resource = await db.resources.get(resource_id)
if not resource:
raise HTTPException(status_code=404, detail="Not found")
if resource.owner_id != current_user.id:
raise HTTPException(status_code=403, detail="Forbidden")
return resource
@app.patch("/api/resources/{resource_id}")
async def update_resource(
data: UpdateResourceSchema,
resource: Resource = Depends(verify_ownership)
):
# Resource ownership verified
updated = await resource.update(data.dict())
return ResourceDTO.from_orm(updated)python
from fastapi import Depends, HTTPException
from typing import Annotated
async def verify_ownership(
resource_id: str,
current_user: User = Depends(get_current_user)
):
resource = await db.resources.get(resource_id)
if not resource:
raise HTTPException(status_code=404, detail="Not found")
if resource.owner_id != current_user.id:
raise HTTPException(status_code=403, detail="Forbidden")
return resource
@app.patch("/api/resources/{resource_id}")
async def update_resource(
data: UpdateResourceSchema,
resource: Resource = Depends(verify_ownership)
):
# 资源所有权已验证
updated = await resource.update(data.dict())
return ResourceDTO.from_orm(updated)Common Vulnerabilities
常见漏洞
OWASP Top 10 API Security
OWASP API安全Top 10
1. Broken Object Level Authorization (BOLA)
1. 破损的对象级授权(BOLA)
typescript
// ❌ VULNERABLE
export async function GET(request: Request) {
const { userId } = await auth();
const resourceId = new URL(request.url).searchParams.get('id');
// Missing ownership check!
const resource = await db.query.resources.findFirst({
where: eq(resources.id, resourceId)
});
return NextResponse.json(resource);
}
// ✅ FIXED
export async function GET(request: Request) {
const { userId } = await auth();
const resourceId = new URL(request.url).searchParams.get('id');
const resource = await db.query.resources.findFirst({
where: and(
eq(resources.id, resourceId),
eq(resources.ownerId, userId) // Ownership check
)
});
if (!resource) {
return NextResponse.json({ error: "Not found" }, { status: 404 });
}
return NextResponse.json(resource);
}typescript
// ❌ 存在漏洞
export async function GET(request: Request) {
const { userId } = await auth();
const resourceId = new URL(request.url).searchParams.get('id');
// 缺少所有权检查!
const resource = await db.query.resources.findFirst({
where: eq(resources.id, resourceId)
});
return NextResponse.json(resource);
}
// ✅ 修复后
export async function GET(request: Request) {
const { userId } = await auth();
const resourceId = new URL(request.url).searchParams.get('id');
const resource = await db.query.resources.findFirst({
where: and(
eq(resources.id, resourceId),
eq(resources.ownerId, userId) // 所有权检查
)
});
if (!resource) {
return NextResponse.json({ error: "Not found" }, { status: 404 });
}
return NextResponse.json(resource);
}2. Mass Assignment
2. 大量赋值
typescript
// ❌ VULNERABLE - User can set any field
export async function PATCH(request: Request) {
const body = await request.json();
// User could send: { role: 'admin', isVerified: true }
await db.update(users)
.set(body) // DANGEROUS!
.where(eq(users.id, userId));
}
// ✅ FIXED - Explicit allowed fields
const allowedFields = z.object({
name: z.string().optional(),
bio: z.string().optional()
// role, isVerified NOT allowed
});
export async function PATCH(request: Request) {
const body = await request.json();
const validated = allowedFields.parse(body);
await db.update(users)
.set(validated)
.where(eq(users.id, userId));
}typescript
// ❌ 存在漏洞 - 用户可设置任意字段
export async function PATCH(request: Request) {
const body = await request.json();
// 用户可能发送:{ role: 'admin', isVerified: true }
await db.update(users)
.set(body) // 危险!
.where(eq(users.id, userId));
}
// ✅ 修复后 - 明确允许的字段
const allowedFields = z.object({
name: z.string().optional(),
bio: z.string().optional()
// role、isVerified不允许修改
});
export async function PATCH(request: Request) {
const body = await request.json();
const validated = allowedFields.parse(body);
await db.update(users)
.set(validated)
.where(eq(users.id, userId));
}3. Excessive Data Exposure
3. 过度数据暴露
typescript
// ❌ VULNERABLE
return NextResponse.json(user); // All fields exposed
// ✅ FIXED
return NextResponse.json({
id: user.id,
name: user.name,
email: user.email
// passwordHash, resetToken, etc. excluded
});typescript
// ❌ 存在漏洞
return NextResponse.json(user); // 所有字段均暴露
// ✅ 修复后
return NextResponse.json({
id: user.id,
name: user.name,
email: user.email
// 排除passwordHash、resetToken等
});4. Rate Limiting
4. 速率限制
typescript
import { Ratelimit } from '@upstash/ratelimit';
import { Redis } from '@upstash/redis';
const ratelimit = new Ratelimit({
redis: Redis.fromEnv(),
limiter: Ratelimit.slidingWindow(10, '10 s'),
});
export async function POST(request: Request) {
const identifier = request.headers.get('x-forwarded-for') || 'anonymous';
const { success } = await ratelimit.limit(identifier);
if (!success) {
return NextResponse.json({ error: "Too many requests" }, { status: 429 });
}
// Process request...
}typescript
import { Ratelimit } from '@upstash/ratelimit';
import { Redis } from '@upstash/redis';
const ratelimit = new Ratelimit({
redis: Redis.fromEnv(),
limiter: Ratelimit.slidingWindow(10, '10 s'),
});
export async function POST(request: Request) {
const identifier = request.headers.get('x-forwarded-for') || 'anonymous';
const { success } = await ratelimit.limit(identifier);
if (!success) {
return NextResponse.json({ error: "Too many requests" }, { status: 429 });
}
// 处理请求...
}Summary
总结
Security Checklist Template
安全检查清单模板
markdown
undefinedmarkdown
undefinedSecurity Review for [Endpoint Name]
[端点名称]安全审查
Authentication
身份验证
- User identity verified before processing
- Invalid tokens rejected with 401
- Token expiration checked
- 处理前已验证用户身份
- 无效令牌已被拒绝并返回401
- 已检查令牌有效期
Authorization
授权
- Resource ownership verified
- Role/permission checks implemented
- Cross-tenant data isolation enforced
- 已验证资源所有权
- 已实现角色/权限检查
- 已强制实施跨租户数据隔离
Input Validation
输入验证
- All inputs validated with schema (Zod/Pydantic)
- File uploads size/type limited
- SQL injection prevented (using ORM)
- XSS prevention in place
- 所有输入均通过模式验证(Zod/Pydantic)
- 文件上传已限制大小/类型
- 已防止SQL注入(使用ORM)
- 已部署XSS防护
Output Safety
输出安全
- Sensitive fields excluded from responses
- PII masked in logs
- Error messages don't leak system info
- 响应中已排除敏感字段
- 日志中已屏蔽PII
- 错误消息未泄露系统信息
Rate Limiting
速率限制
- Rate limits configured per endpoint
- DDoS protection in place
- 每个端点已配置速率限制
- 已部署DDoS防护
Logging
日志记录
- Failed auth attempts logged
- Permission denials logged
- Request IDs for traceability
Use this skill during code reviews and before deploying API changes to ensure comprehensive security coverage.- 已记录失败的身份验证尝试
- 已记录权限拒绝事件
- 已使用请求ID实现可追踪性
在代码审查和部署API变更前使用本指南,确保全面的安全覆盖。