fastapi
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseFastAPI Development Skill
FastAPI开发技能指南
File Organization
文件组织结构
- SKILL.md: Core principles, patterns, essential security (this file)
- references/security-examples.md: CVE details and OWASP implementations
- references/advanced-patterns.md: Advanced FastAPI patterns
- references/threat-model.md: Attack scenarios and STRIDE analysis
- SKILL.md: 核心原则、模式、基础安全内容(本文件)
- references/security-examples.md: CVE详情与OWASP实现方案
- references/advanced-patterns.md: 高级FastAPI模式
- references/threat-model.md: 攻击场景与STRIDE分析
Validation Gates
验证关卡
Gate 0.2: Vulnerability Research (BLOCKING for HIGH-RISK)
关卡0.2:漏洞研究(高风险项为阻塞性要求)
- Status: PASSED (5+ CVEs documented)
- Research Date: 2025-11-20
- CVEs: CVE-2024-47874, CVE-2024-12868, CVE-2023-30798, Starlette DoS variants
- 状态: 已通过(已记录5个以上CVE)
- 研究日期: 2025-11-20
- CVE编号: CVE-2024-47874, CVE-2024-12868, CVE-2023-30798, Starlette拒绝服务变种漏洞
1. Overview
1. 概述
Risk Level: HIGH
Justification: FastAPI applications handle authentication, database access, file uploads, and external API communication. DoS vulnerabilities in Starlette, injection risks, and improper validation can compromise availability and security.
You are an expert FastAPI developer creating secure, performant REST APIs and WebSocket services. You configure proper validation, authentication, and security headers.
风险等级: 高
理由: FastAPI应用处理身份验证、数据库访问、文件上传和外部API通信。Starlette中的拒绝服务漏洞、注入风险以及验证不当可能会影响可用性和安全性。
你作为一名FastAPI专家开发者,负责创建安全、高性能的REST API和WebSocket服务。你需要配置正确的验证、身份验证和安全头。
Core Expertise Areas
核心专业领域
- Pydantic validation and dependency injection
- Authentication: OAuth2, JWT, API keys
- Security headers and CORS configuration
- Rate limiting and DoS protection
- Database integration with async ORMs
- WebSocket security
- Pydantic验证与依赖注入
- 身份验证:OAuth2、JWT、API密钥
- 安全头与CORS配置
- 速率限制与拒绝服务防护
- 异步ORM数据库集成
- WebSocket安全
2. Core Responsibilities
2. 核心职责
Fundamental Principles
基本原则
- TDD First: Write tests before implementation code
- Performance Aware: Connection pooling, caching, async patterns
- Validate Everything: Use Pydantic models for all inputs
- Secure by Default: HTTPS, security headers, strict CORS
- Rate Limit: Protect all endpoints from abuse
- Authenticate & Authorize: Verify identity and permissions
- Handle Errors Safely: Never leak internal details
- 测试驱动开发优先: 在实现代码前编写测试
- 性能感知: 连接池、缓存、异步模式
- 全面验证: 对所有输入使用Pydantic模型
- 默认安全: HTTPS、安全头、严格的CORS
- 速率限制: 保护所有端点免受滥用
- 身份验证与授权: 验证身份与权限
- 安全处理错误: 绝不泄露内部细节
3. Technical Foundation
3. 技术基础
Version Recommendations
版本推荐
| Component | Version | Notes |
|---|---|---|
| FastAPI | 0.115.3+ | CVE-2024-47874 fix |
| Starlette | 0.40.0+ | DoS vulnerability fix |
| Pydantic | 2.0+ | Better validation |
| Python | 3.11+ | Performance |
| 组件 | 版本 | 说明 |
|---|---|---|
| FastAPI | 0.115.3+ | 修复CVE-2024-47874 |
| Starlette | 0.40.0+ | 修复拒绝服务漏洞 |
| Pydantic | 2.0+ | 更优的验证能力 |
| Python | 3.11+ | 性能提升 |
Security Dependencies
安全依赖
toml
[project]
dependencies = [
"fastapi>=0.115.3",
"starlette>=0.40.0",
"pydantic>=2.5",
"python-jose[cryptography]>=3.3",
"passlib[argon2]>=1.7",
"python-multipart>=0.0.6",
"slowapi>=0.1.9",
"secure>=0.3",
]toml
[project]
dependencies = [
"fastapi>=0.115.3",
"starlette>=0.40.0",
"pydantic>=2.5",
"python-jose[cryptography]>=3.3",
"passlib[argon2]>=1.7",
"python-multipart>=0.0.6",
"slowapi>=0.1.9",
"secure>=0.3",
]4. Implementation Patterns
4. 实现模式
Pattern 1: Secure Application Setup
模式1:安全应用设置
python
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from secure import SecureHeaders
app = FastAPI(
title="Secure API",
docs_url=None if PRODUCTION else "/docs", # Disable in prod
redoc_url=None,
)python
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from secure import SecureHeaders
app = FastAPI(
title="Secure API",
docs_url=None if PRODUCTION else "/docs", # Disable in prod
redoc_url=None,
)Security headers
Security headers
secure_headers = SecureHeaders()
@app.middleware("http")
async def add_security_headers(request, call_next):
response = await call_next(request)
secure_headers.framework.fastapi(response)
return response
secure_headers = SecureHeaders()
@app.middleware("http")
async def add_security_headers(request, call_next):
response = await call_next(request)
secure_headers.framework.fastapi(response)
return response
Restrictive CORS
Restrictive CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["https://app.example.com"], # Never ["*"]
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["Authorization", "Content-Type"],
)
undefinedapp.add_middleware(
CORSMiddleware,
allow_origins=["https://app.example.com"], # Never ["*"]
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["Authorization", "Content-Type"],
)
undefinedPattern 2: Input Validation
模式2:输入验证
python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field, field_validator, EmailStr
class UserCreate(BaseModel):
username: str = Field(min_length=3, max_length=50, pattern=r'^[a-zA-Z0-9_-]+$')
email: EmailStr
password: str = Field(min_length=12)
@field_validator('password')
@classmethod
def validate_password(cls, v):
if not any(c.isupper() for c in v):
raise ValueError('Must contain uppercase')
if not any(c.isdigit() for c in v):
raise ValueError('Must contain digit')
return v
@app.post("/users")
async def create_user(user: UserCreate):
# Input already validated by Pydantic
return await user_service.create(user)python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field, field_validator, EmailStr
class UserCreate(BaseModel):
username: str = Field(min_length=3, max_length=50, pattern=r'^[a-zA-Z0-9_-]+$')
email: EmailStr
password: str = Field(min_length=12)
@field_validator('password')
@classmethod
def validate_password(cls, v):
if not any(c.isupper() for c in v):
raise ValueError('Must contain uppercase')
if not any(c.isdigit() for c in v):
raise ValueError('Must contain digit')
return v
@app.post("/users")
async def create_user(user: UserCreate):
# Input already validated by Pydantic
return await user_service.create(user)Pattern 3: JWT Authentication
模式3:JWT身份验证
python
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from datetime import datetime, timedelta
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
SECRET_KEY = os.environ["JWT_SECRET"]
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
def create_access_token(data: dict) -> str:
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_id: str = payload.get("sub")
if user_id is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = await user_service.get(user_id)
if user is None:
raise credentials_exception
return userpython
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from datetime import datetime, timedelta
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
SECRET_KEY = os.environ["JWT_SECRET"]
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
def create_access_token(data: dict) -> str:
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_id: str = payload.get("sub")
if user_id is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = await user_service.get(user_id)
if user is None:
raise credentials_exception
return userPattern 4: Rate Limiting
模式4:速率限制
python
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@app.post("/login")
@limiter.limit("5/minute") # Strict for auth endpoints
async def login(request: Request, credentials: LoginRequest):
return await auth_service.login(credentials)
@app.get("/data")
@limiter.limit("100/minute")
async def get_data(request: Request):
return await data_service.get_all()python
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@app.post("/login")
@limiter.limit("5/minute") # Strict for auth endpoints
async def login(request: Request, credentials: LoginRequest):
return await auth_service.login(credentials)
@app.get("/data")
@limiter.limit("100/minute")
async def get_data(request: Request):
return await data_service.get_all()Pattern 5: Safe File Upload
模式5:安全文件上传
python
from fastapi import UploadFile, File, HTTPException
import magic
ALLOWED_TYPES = {"image/jpeg", "image/png", "application/pdf"}
MAX_SIZE = 10 * 1024 * 1024 # 10MB
@app.post("/upload")
async def upload_file(file: UploadFile = File(...)):
# Check size
content = await file.read()
if len(content) > MAX_SIZE:
raise HTTPException(400, "File too large")
# Check magic bytes, not just extension
mime_type = magic.from_buffer(content, mime=True)
if mime_type not in ALLOWED_TYPES:
raise HTTPException(400, f"File type not allowed: {mime_type}")
# Generate safe filename
safe_name = f"{uuid4()}{Path(file.filename).suffix}"
# Store outside webroot
file_path = UPLOAD_DIR / safe_name
file_path.write_bytes(content)
return {"filename": safe_name}python
from fastapi import UploadFile, File, HTTPException
import magic
ALLOWED_TYPES = {"image/jpeg", "image/png", "application/pdf"}
MAX_SIZE = 10 * 1024 * 1024 # 10MB
@app.post("/upload")
async def upload_file(file: UploadFile = File(...)):
# Check size
content = await file.read()
if len(content) > MAX_SIZE:
raise HTTPException(400, "File too large")
# Check magic bytes, not just extension
mime_type = magic.from_buffer(content, mime=True)
if mime_type not in ALLOWED_TYPES:
raise HTTPException(400, f"File type not allowed: {mime_type}")
# Generate safe filename
safe_name = f"{uuid4()}{Path(file.filename).suffix}"
# Store outside webroot
file_path = UPLOAD_DIR / safe_name
file_path.write_bytes(content)
return {"filename": safe_name}5. Implementation Workflow (TDD)
5. 实现工作流(测试驱动开发)
Step 1: Write Failing Test First
步骤1:先编写失败的测试
Always start with tests that define expected behavior:
python
import pytest
from httpx import AsyncClient, ASGITransport
from app.main import app
@pytest.mark.asyncio
async def test_create_item_success():
"""Test successful item creation with valid data."""
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.post(
"/items",
json={"name": "Test Item", "price": 29.99},
headers={"Authorization": "Bearer valid_token"}
)
assert response.status_code == 201
data = response.json()
assert data["name"] == "Test Item"
assert "id" in data
@pytest.mark.asyncio
async def test_create_item_validation_error():
"""Test validation rejects invalid price."""
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.post(
"/items",
json={"name": "Test", "price": -10},
headers={"Authorization": "Bearer valid_token"}
)
assert response.status_code == 422
@pytest.mark.asyncio
async def test_create_item_unauthorized():
"""Test endpoint requires authentication."""
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.post("/items", json={"name": "Test", "price": 10})
assert response.status_code == 401始终从定义预期行为的测试开始:
python
import pytest
from httpx import AsyncClient, ASGITransport
from app.main import app
@pytest.mark.asyncio
async def test_create_item_success():
"""Test successful item creation with valid data."""
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.post(
"/items",
json={"name": "Test Item", "price": 29.99},
headers={"Authorization": "Bearer valid_token"}
)
assert response.status_code == 201
data = response.json()
assert data["name"] == "Test Item"
assert "id" in data
@pytest.mark.asyncio
async def test_create_item_validation_error():
"""Test validation rejects invalid price."""
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.post(
"/items",
json={"name": "Test", "price": -10},
headers={"Authorization": "Bearer valid_token"}
)
assert response.status_code == 422
@pytest.mark.asyncio
async def test_create_item_unauthorized():
"""Test endpoint requires authentication."""
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.post("/items", json={"name": "Test", "price": 10})
assert response.status_code == 401Step 2: Implement Minimum to Pass
步骤2:实现最小编码以通过测试
Write only the code needed to make tests pass:
python
@app.post("/items", status_code=201)
async def create_item(
item: ItemCreate,
user: User = Depends(get_current_user)
) -> ItemResponse:
created = await item_service.create(item, user.id)
return ItemResponse.from_orm(created)仅编写让测试通过所需的代码:
python
@app.post("/items", status_code=201)
async def create_item(
item: ItemCreate,
user: User = Depends(get_current_user)
) -> ItemResponse:
created = await item_service.create(item, user.id)
return ItemResponse.from_orm(created)Step 3: Refactor if Needed
步骤3:按需重构
Improve code quality while keeping tests green. Extract common patterns, improve naming, optimize queries.
在保持测试通过的同时提升代码质量。提取通用模式、优化命名、优化查询。
Step 4: Run Full Verification
步骤4:运行完整验证
bash
undefinedbash
undefinedRun all tests with coverage
Run all tests with coverage
pytest --cov=app --cov-report=term-missing
pytest --cov=app --cov-report=term-missing
Type checking
Type checking
mypy app --strict
mypy app --strict
Security scan
Security scan
bandit -r app -ll
bandit -r app -ll
All must pass before committing
All must pass before committing
---
---6. Performance Patterns
6. 性能模式
Pattern 1: Connection Pooling for Database
模式1:数据库连接池
python
undefinedpython
undefinedBAD - Creates new connection per request
BAD - Creates new connection per request
@app.get("/users/{user_id}")
async def get_user(user_id: int):
conn = await asyncpg.connect(DATABASE_URL)
try:
return await conn.fetchrow("SELECT * FROM users WHERE id = $1", user_id)
finally:
await conn.close()
@app.get("/users/{user_id}")
async def get_user(user_id: int):
conn = await asyncpg.connect(DATABASE_URL)
try:
return await conn.fetchrow("SELECT * FROM users WHERE id = $1", user_id)
finally:
await conn.close()
GOOD - Uses connection pool
GOOD - Uses connection pool
from contextlib import asynccontextmanager
pool: asyncpg.Pool = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global pool
pool = await asyncpg.create_pool(
DATABASE_URL,
min_size=5,
max_size=20,
command_timeout=60
)
yield
await pool.close()
app = FastAPI(lifespan=lifespan)
@app.get("/users/{user_id}")
async def get_user(user_id: int):
async with pool.acquire() as conn:
return await conn.fetchrow("SELECT * FROM users WHERE id = $1", user_id)
undefinedfrom contextlib import asynccontextmanager
pool: asyncpg.Pool = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global pool
pool = await asyncpg.create_pool(
DATABASE_URL,
min_size=5,
max_size=20,
command_timeout=60
)
yield
await pool.close()
app = FastAPI(lifespan=lifespan)
@app.get("/users/{user_id}")
async def get_user(user_id: int):
async with pool.acquire() as conn:
return await conn.fetchrow("SELECT * FROM users WHERE id = $1", user_id)
undefinedPattern 2: Concurrent Requests with asyncio.gather
模式2:使用asyncio.gather实现并发请求
python
undefinedpython
undefinedBAD - Sequential external API calls
BAD - Sequential external API calls
@app.get("/dashboard")
async def get_dashboard(user_id: int):
profile = await fetch_profile(user_id) # 100ms
orders = await fetch_orders(user_id) # 150ms
notifications = await fetch_notifications(user_id) # 80ms
return {"profile": profile, "orders": orders, "notifications": notifications}
# Total: ~330ms
@app.get("/dashboard")
async def get_dashboard(user_id: int):
profile = await fetch_profile(user_id) # 100ms
orders = await fetch_orders(user_id) # 150ms
notifications = await fetch_notifications(user_id) # 80ms
return {"profile": profile, "orders": orders, "notifications": notifications}
# Total: ~330ms
GOOD - Concurrent calls
GOOD - Concurrent calls
@app.get("/dashboard")
async def get_dashboard(user_id: int):
profile, orders, notifications = await asyncio.gather(
fetch_profile(user_id),
fetch_orders(user_id),
fetch_notifications(user_id)
)
return {"profile": profile, "orders": orders, "notifications": notifications}
# Total: ~150ms (slowest call)
undefined@app.get("/dashboard")
async def get_dashboard(user_id: int):
profile, orders, notifications = await asyncio.gather(
fetch_profile(user_id),
fetch_orders(user_id),
fetch_notifications(user_id)
)
return {"profile": profile, "orders": orders, "notifications": notifications}
# Total: ~150ms (slowest call)
undefinedPattern 3: Response Caching
模式3:响应缓存
python
undefinedpython
undefinedBAD - Recomputes expensive data every request
BAD - Recomputes expensive data every request
@app.get("/stats")
async def get_stats():
return await compute_expensive_stats() # 500ms each time
@app.get("/stats")
async def get_stats():
return await compute_expensive_stats() # 500ms each time
GOOD - Cache with Redis
GOOD - Cache with Redis
from fastapi_cache import FastAPICache
from fastapi_cache.backends.redis import RedisBackend
from fastapi_cache.decorator import cache
@asynccontextmanager
async def lifespan(app: FastAPI):
redis = aioredis.from_url("redis://localhost")
FastAPICache.init(RedisBackend(redis), prefix="api-cache")
yield
@app.get("/stats")
@cache(expire=300) # Cache for 5 minutes
async def get_stats():
return await compute_expensive_stats()
from fastapi_cache import FastAPICache
from fastapi_cache.backends.redis import RedisBackend
from fastapi_cache.decorator import cache
@asynccontextmanager
async def lifespan(app: FastAPI):
redis = aioredis.from_url("redis://localhost")
FastAPICache.init(RedisBackend(redis), prefix="api-cache")
yield
@app.get("/stats")
@cache(expire=300) # Cache for 5 minutes
async def get_stats():
return await compute_expensive_stats()
GOOD - In-memory cache for simpler cases
GOOD - In-memory cache for simpler cases
from functools import lru_cache
from datetime import datetime, timedelta
_cache = {}
_cache_time = {}
async def get_cached_config(key: str, ttl: int = 60):
now = datetime.utcnow()
if key in _cache and _cache_time[key] > now:
return _cache[key]
value = await fetch_config(key)
_cache[key] = value
_cache_time[key] = now + timedelta(seconds=ttl)
return valueundefinedfrom functools import lru_cache
from datetime import datetime, timedelta
_cache = {}
_cache_time = {}
async def get_cached_config(key: str, ttl: int = 60):
now = datetime.utcnow()
if key in _cache and _cache_time[key] > now:
return _cache[key]
value = await fetch_config(key)
_cache[key] = value
_cache_time[key] = now + timedelta(seconds=ttl)
return valueundefinedPattern 4: Pagination for Large Datasets
模式4:大数据集分页
python
undefinedpython
undefinedBAD - Returns all records
BAD - Returns all records
@app.get("/items")
async def list_items():
return await db.fetch("SELECT * FROM items") # Could be millions
@app.get("/items")
async def list_items():
return await db.fetch("SELECT * FROM items") # Could be millions
GOOD - Cursor-based pagination
GOOD - Cursor-based pagination
from pydantic import BaseModel
class PaginatedResponse(BaseModel):
items: list
next_cursor: str | None
has_more: bool
@app.get("/items")
async def list_items(
cursor: str | None = None,
limit: int = Query(default=20, le=100)
) -> PaginatedResponse:
query = "SELECT * FROM items"
params = []
if cursor:
query += " WHERE id > $1"
params.append(decode_cursor(cursor))
query += f" ORDER BY id LIMIT {limit + 1}"
rows = await db.fetch(query, *params)
has_more = len(rows) > limit
items = rows[:limit]
return PaginatedResponse(
items=items,
next_cursor=encode_cursor(items[-1]["id"]) if items else None,
has_more=has_more
)undefinedfrom pydantic import BaseModel
class PaginatedResponse(BaseModel):
items: list
next_cursor: str | None
has_more: bool
@app.get("/items")
async def list_items(
cursor: str | None = None,
limit: int = Query(default=20, le=100)
) -> PaginatedResponse:
query = "SELECT * FROM items"
params = []
if cursor:
query += " WHERE id > $1"
params.append(decode_cursor(cursor))
query += f" ORDER BY id LIMIT {limit + 1}"
rows = await db.fetch(query, *params)
has_more = len(rows) > limit
items = rows[:limit]
return PaginatedResponse(
items=items,
next_cursor=encode_cursor(items[-1]["id"]) if items else None,
has_more=has_more
)undefinedPattern 5: Background Tasks for Heavy Operations
模式5:后台任务处理重操作
python
undefinedpython
undefinedBAD - Blocks response for slow operations
BAD - Blocks response for slow operations
@app.post("/reports")
async def create_report(request: ReportRequest):
report = await generate_report(request) # Takes 30 seconds
await send_email(request.email, report)
return {"status": "completed"}
@app.post("/reports")
async def create_report(request: ReportRequest):
report = await generate_report(request) # Takes 30 seconds
await send_email(request.email, report)
return {"status": "completed"}
GOOD - Return immediately, process in background
GOOD - Return immediately, process in background
from fastapi import BackgroundTasks
@app.post("/reports", status_code=202)
async def create_report(
request: ReportRequest,
background_tasks: BackgroundTasks
):
report_id = str(uuid4())
background_tasks.add_task(process_report, report_id, request)
return {"report_id": report_id, "status": "processing"}
async def process_report(report_id: str, request: ReportRequest):
report = await generate_report(request)
await save_report(report_id, report)
await send_email(request.email, report)
@app.get("/reports/{report_id}")
async def get_report_status(report_id: str):
return await get_report(report_id)
---from fastapi import BackgroundTasks
@app.post("/reports", status_code=202)
async def create_report(
request: ReportRequest,
background_tasks: BackgroundTasks
):
report_id = str(uuid4())
background_tasks.add_task(process_report, report_id, request)
return {"report_id": report_id, "status": "processing"}
async def process_report(report_id: str, request: ReportRequest):
report = await generate_report(request)
await save_report(report_id, report)
await send_email(request.email, report)
@app.get("/reports/{report_id}")
async def get_report_status(report_id: str):
return await get_report(report_id)
---7. Security Standards
7. 安全标准
7.1 Domain Vulnerability Landscape
7.1 领域漏洞概况
| CVE ID | Severity | Description | Mitigation |
|---|---|---|---|
| CVE-2024-47874 | HIGH | Starlette multipart DoS via memory exhaustion | Upgrade Starlette 0.40.0+ |
| CVE-2024-12868 | HIGH | Downstream DoS via fastapi dependency | Upgrade FastAPI 0.115.3+ |
| CVE-2023-30798 | HIGH | Starlette <0.25 DoS | Upgrade FastAPI 0.92+ |
| CVE ID | 严重程度 | 描述 | 缓解措施 |
|---|---|---|---|
| CVE-2024-47874 | 高 | Starlette多部分请求导致的内存耗尽型拒绝服务漏洞 | 升级至Starlette 0.40.0+ |
| CVE-2024-12868 | 高 | FastAPI依赖导致的下游拒绝服务漏洞 | 升级至FastAPI 0.115.3+ |
| CVE-2023-30798 | 高 | Starlette <0.25版本中的拒绝服务漏洞 | 升级至FastAPI 0.92+ |
7.2 OWASP Top 10 Mapping
7.2 OWASP Top 10映射
| Category | Risk | Mitigations |
|---|---|---|
| A01 Access Control | HIGH | Dependency injection for auth, permission decorators |
| A02 Crypto Failures | HIGH | JWT with proper algorithms, Argon2 passwords |
| A03 Injection | HIGH | Pydantic validation, parameterized queries |
| A04 Insecure Design | MEDIUM | Type safety, validation layers |
| A05 Misconfiguration | HIGH | Security headers, disable docs in prod |
| A06 Vulnerable Components | CRITICAL | Keep Starlette/FastAPI updated |
| A07 Auth Failures | HIGH | Rate limiting on auth, secure JWT |
| 类别 | 风险等级 | 缓解措施 |
|---|---|---|
| A01 访问控制 | 高 | 使用依赖注入实现身份验证、权限装饰器 |
| A02 加密失败 | 高 | 使用合适算法的JWT、Argon2密码哈希 |
| A03 注入攻击 | 高 | Pydantic验证、参数化查询 |
| A04 不安全设计 | 中 | 类型安全、多层验证 |
| A05 配置错误 | 高 | 安全头配置、生产环境禁用文档 |
| A06 易受攻击的组件 | 关键 | 保持Starlette/FastAPI版本更新 |
| A07 身份验证失败 | 高 | 身份验证端点速率限制、安全JWT配置 |
7.3 Error Handling
7.3 错误处理
python
from fastapi import HTTPException
from fastapi.responses import JSONResponse
import logging
logger = logging.getLogger(__name__)
@app.exception_handler(Exception)
async def global_exception_handler(request, exc):
# Log full details
logger.error(f"Unhandled error: {exc}", exc_info=True)
# Return safe message
return JSONResponse(
status_code=500,
content={"detail": "Internal server error"}
)
@app.exception_handler(HTTPException)
async def http_exception_handler(request, exc):
return JSONResponse(
status_code=exc.status_code,
content={"detail": exc.detail}
)python
from fastapi import HTTPException
from fastapi.responses import JSONResponse
import logging
logger = logging.getLogger(__name__)
@app.exception_handler(Exception)
async def global_exception_handler(request, exc):
# Log full details
logger.error(f"Unhandled error: {exc}", exc_info=True)
# Return safe message
return JSONResponse(
status_code=500,
content={"detail": "Internal server error"}
)
@app.exception_handler(HTTPException)
async def http_exception_handler(request, exc):
return JSONResponse(
status_code=exc.status_code,
content={"detail": exc.detail}
)6. Testing & Validation
6. 测试与验证
Security Tests
安全测试
python
import pytest
from fastapi.testclient import TestClient
def test_rate_limiting():
client = TestClient(app)
# Exceed rate limit
for _ in range(10):
response = client.post("/login", json={"username": "test", "password": "test"})
assert response.status_code == 429
def test_invalid_jwt_rejected():
client = TestClient(app)
response = client.get(
"/protected",
headers={"Authorization": "Bearer invalid.token.here"}
)
assert response.status_code == 401
def test_sql_injection_prevented():
client = TestClient(app)
response = client.get("/users", params={"search": "'; DROP TABLE users; --"})
assert response.status_code in [200, 400]
# Should not cause 500 (SQL error)
def test_file_upload_type_validation():
client = TestClient(app)
# Try uploading executable disguised as image
response = client.post(
"/upload",
files={"file": ("test.jpg", b"MZ\x90\x00", "image/jpeg")} # EXE magic bytes
)
assert response.status_code == 400python
import pytest
from fastapi.testclient import TestClient
def test_rate_limiting():
client = TestClient(app)
# Exceed rate limit
for _ in range(10):
response = client.post("/login", json={"username": "test", "password": "test"})
assert response.status_code == 429
def test_invalid_jwt_rejected():
client = TestClient(app)
response = client.get(
"/protected",
headers={"Authorization": "Bearer invalid.token.here"}
)
assert response.status_code == 401
def test_sql_injection_prevented():
client = TestClient(app)
response = client.get("/users", params={"search": "'; DROP TABLE users; --"})
assert response.status_code in [200, 400]
# Should not cause 500 (SQL error)
def test_file_upload_type_validation():
client = TestClient(app)
# Try uploading executable disguised as image
response = client.post(
"/upload",
files={"file": ("test.jpg", b"MZ\x90\x00", "image/jpeg")} # EXE magic bytes
)
assert response.status_code == 4008. Common Mistakes & Anti-Patterns
8. 常见错误与反模式
Anti-Pattern 1: Permissive CORS
反模式1:宽松的CORS配置
python
undefinedpython
undefinedNEVER
NEVER
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True)
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True)
ALWAYS
ALWAYS
app.add_middleware(CORSMiddleware, allow_origins=["https://app.example.com"])
undefinedapp.add_middleware(CORSMiddleware, allow_origins=["https://app.example.com"])
undefinedAnti-Pattern 2: No Rate Limiting
反模式2:未设置速率限制
python
undefinedpython
undefinedNEVER - allows brute force
NEVER - allows brute force
@app.post("/login")
async def login(creds): ...
@app.post("/login")
async def login(creds): ...
ALWAYS
ALWAYS
@app.post("/login")
@limiter.limit("5/minute")
async def login(request, creds): ...
undefined@app.post("/login")
@limiter.limit("5/minute")
async def login(request, creds): ...
undefinedAnti-Pattern 3: Exposing Docs in Production
反模式3:生产环境暴露API文档
python
undefinedpython
undefinedNEVER
NEVER
app = FastAPI()
app = FastAPI()
ALWAYS
ALWAYS
app = FastAPI(
docs_url=None if os.environ.get("ENV") == "production" else "/docs",
redoc_url=None
)
undefinedapp = FastAPI(
docs_url=None if os.environ.get("ENV") == "production" else "/docs",
redoc_url=None
)
undefinedAnti-Pattern 4: Weak JWT Configuration
反模式4:弱JWT配置
python
undefinedpython
undefinedNEVER
NEVER
jwt.encode(data, "secret", algorithm="HS256") # Hardcoded weak secret
jwt.encode(data, "secret", algorithm="HS256") # Hardcoded weak secret
ALWAYS
ALWAYS
jwt.encode(data, os.environ["JWT_SECRET"], algorithm="RS256") # Env var, strong algo
undefinedjwt.encode(data, os.environ["JWT_SECRET"], algorithm="RS256") # Env var, strong algo
undefinedAnti-Pattern 5: File Extension Validation Only
反模式5:仅验证文件扩展名
python
undefinedpython
undefinedNEVER
NEVER
if file.filename.endswith('.jpg'): ...
if file.filename.endswith('.jpg'): ...
ALWAYS
ALWAYS
mime = magic.from_buffer(content, mime=True)
if mime not in ALLOWED_TYPES: ...
---mime = magic.from_buffer(content, mime=True)
if mime not in ALLOWED_TYPES: ...
---13. Pre-Deployment Checklist
13. 部署前检查清单
- FastAPI 0.115.3+ / Starlette 0.40.0+
- Security headers middleware configured
- CORS restrictive (no wildcard with credentials)
- Rate limiting on all endpoints
- Stricter limits on auth endpoints
- JWT with strong secret from environment
- Pydantic validation on all inputs
- File uploads check magic bytes
- Docs disabled in production
- Error handlers don't leak internals
- HTTPS enforced
- FastAPI 0.115.3+ / Starlette 0.40.0+
- 已配置安全头中间件
- CORS配置严格(不使用通配符且启用凭证)
- 所有端点已设置速率限制
- 身份验证端点设置更严格的速率限制
- JWT使用环境变量中的强密钥
- 所有输入已通过Pydantic验证
- 文件上传通过魔术字节验证
- 生产环境已禁用API文档
- 错误处理器不会泄露内部细节
- 已强制使用HTTPS
14. Summary
14. 总结
Your goal is to create FastAPI applications that are:
- Secure: Validated inputs, rate limits, security headers
- Performant: Async operations, proper connection pooling
- Maintainable: Type-safe, well-structured, tested
Security Reminder:
- Upgrade Starlette to 0.40.0+ (CVE-2024-47874)
- Rate limit all endpoints, especially authentication
- Validate file uploads by magic bytes, not extension
- Never use wildcard CORS with credentials
- Disable API docs in production
你的目标是创建具备以下特性的FastAPI应用:
- 安全:输入验证、速率限制、安全头
- 高性能:异步操作、合理的连接池
- 可维护:类型安全、结构清晰、测试充分
安全提醒:
- 升级Starlette至0.40.0+(修复CVE-2024-47874)
- 为所有端点设置速率限制,尤其是身份验证端点
- 通过魔术字节验证文件上传,而非仅依赖扩展名
- 绝不在启用凭证的情况下使用通配符CORS
- 生产环境禁用API文档