auth-security
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseAuthentication & Authorization for FastAPI
FastAPI 认证与授权
Overview
概述
FastAPI provides built-in security utilities based on OpenAPI standards. Use OAuth2 with Password flow + JWT tokens as the standard pattern for API authentication. Combine with bcrypt for password hashing and role-based access control (RBAC) for authorization.
Key packages:
bash
uv add "python-jose[cryptography]" passlib[bcrypt] python-multipartFastAPI 提供基于OpenAPI标准的内置安全工具。推荐使用OAuth2密码流 + JWT令牌作为API认证的标准方案,结合bcrypt进行密码哈希,并通过基于角色的访问控制(RBAC)实现授权。
关键依赖包:
bash
uv add "python-jose[cryptography]" passlib[bcrypt] python-multipartor with PyJWT instead of python-jose:
或使用PyJWT替代python-jose:
uv add PyJWT[crypto] passlib[bcrypt] python-multipart
- `python-jose` or `PyJWT` -- JWT token creation and verification
- `passlib[bcrypt]` -- secure password hashing
- `python-multipart` -- required for OAuth2 form data parsinguv add PyJWT[crypto] passlib[bcrypt] python-multipart
- `python-jose` 或 `PyJWT` -- JWT令牌的创建与验证
- `passlib[bcrypt]` -- 安全的密码哈希处理
- `python-multipart` -- 解析OAuth2表单数据所需Password Hashing
密码哈希
Never store plaintext passwords. Use bcrypt through passlib:
python
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)切勿存储明文密码。通过passlib使用bcrypt:
python
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)JWT Token Management
JWT令牌管理
Token Creation
令牌创建
python
from datetime import datetime, timedelta, timezone
from jose import jwt # or: import jwt (PyJWT)
from pydantic import BaseModel
class TokenConfig(BaseModel):
secret_key: str = "your-secret-key" # Use env variable in production
algorithm: str = "HS256"
access_token_expire_minutes: int = 30
refresh_token_expire_days: int = 7
token_config = TokenConfig()
def create_access_token(
data: dict,
expires_delta: timedelta | None = None,
) -> str:
to_encode = data.copy()
expire = datetime.now(timezone.utc) + (
expires_delta
or timedelta(minutes=token_config.access_token_expire_minutes)
)
to_encode.update({"exp": expire, "type": "access"})
return jwt.encode(
to_encode,
token_config.secret_key,
algorithm=token_config.algorithm,
)
def create_refresh_token(data: dict) -> str:
to_encode = data.copy()
expire = datetime.now(timezone.utc) + timedelta(
days=token_config.refresh_token_expire_days
)
to_encode.update({"exp": expire, "type": "refresh"})
return jwt.encode(
to_encode,
token_config.secret_key,
algorithm=token_config.algorithm,
)python
from datetime import datetime, timedelta, timezone
from jose import jwt # 或:import jwt (PyJWT)
from pydantic import BaseModel
class TokenConfig(BaseModel):
secret_key: str = "your-secret-key" # 生产环境请使用环境变量
algorithm: str = "HS256"
access_token_expire_minutes: int = 30
refresh_token_expire_days: int = 7
token_config = TokenConfig()
def create_access_token(
data: dict,
expires_delta: timedelta | None = None,
) -> str:
to_encode = data.copy()
expire = datetime.now(timezone.utc) + (
expires_delta
or timedelta(minutes=token_config.access_token_expire_minutes)
)
to_encode.update({"exp": expire, "type": "access"})
return jwt.encode(
to_encode,
token_config.secret_key,
algorithm=token_config.algorithm,
)
def create_refresh_token(data: dict) -> str:
to_encode = data.copy()
expire = datetime.now(timezone.utc) + timedelta(
days=token_config.refresh_token_expire_days
)
to_encode.update({"exp": expire, "type": "refresh"})
return jwt.encode(
to_encode,
token_config.secret_key,
algorithm=token_config.algorithm,
)Token Verification
令牌验证
python
from jose import JWTError, jwt
from fastapi import HTTPException, status
def verify_token(token: str, expected_type: str = "access") -> dict:
try:
payload = jwt.decode(
token,
token_config.secret_key,
algorithms=[token_config.algorithm],
)
if payload.get("type") != expected_type:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token type",
)
return payload
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)python
from jose import JWTError, jwt
from fastapi import HTTPException, status
def verify_token(token: str, expected_type: str = "access") -> dict:
try:
payload = jwt.decode(
token,
token_config.secret_key,
algorithms=[token_config.algorithm],
)
if payload.get("type") != expected_type:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无效的令牌类型",
)
return payload
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="凭证验证失败",
headers={"WWW-Authenticate": "Bearer"},
)FastAPI Security Dependencies
FastAPI安全依赖项
OAuth2 Password Bearer
OAuth2密码承载
python
from fastapi import Depends
from fastapi.security import OAuth2PasswordBearer
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: AsyncSession = Depends(get_db),
) -> User:
payload = verify_token(token)
user_id = payload.get("sub")
if user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token payload",
)
user = await db.get(User, int(user_id))
if user is None or not user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found or inactive",
)
return user
async def get_current_active_user(
user: User = Depends(get_current_user),
) -> User:
if not user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
return userpython
from fastapi import Depends
from fastapi.security import OAuth2PasswordBearer
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: AsyncSession = Depends(get_db),
) -> User:
payload = verify_token(token)
user_id = payload.get("sub")
if user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无效的令牌负载",
)
user = await db.get(User, int(user_id))
if user is None or not user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户不存在或已禁用",
)
return user
async def get_current_active_user(
user: User = Depends(get_current_user),
) -> User:
if not user.is_active:
raise HTTPException(status_code=400, detail="用户已禁用")
return userLogin Endpoint
登录端点
python
from fastapi import APIRouter
from fastapi.security import OAuth2PasswordRequestForm
router = APIRouter(prefix="/auth", tags=["auth"])
@router.post("/login")
async def login(
form_data: OAuth2PasswordRequestForm = Depends(),
db: AsyncSession = Depends(get_db),
):
user = await authenticate_user(db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token = create_access_token(data={"sub": str(user.id)})
refresh_token = create_refresh_token(data={"sub": str(user.id)})
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer",
}
async def authenticate_user(
db: AsyncSession, email: str, password: str
) -> User | None:
stmt = select(User).where(User.email == email)
result = await db.execute(stmt)
user = result.scalar_one_or_none()
if user and verify_password(password, user.hashed_password):
return user
return Nonepython
from fastapi import APIRouter
from fastapi.security import OAuth2PasswordRequestForm
router = APIRouter(prefix="/auth", tags=["auth"])
@router.post("/login")
async def login(
form_data: OAuth2PasswordRequestForm = Depends(),
db: AsyncSession = Depends(get_db),
):
user = await authenticate_user(db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="邮箱或密码错误",
headers={"WWW-Authenticate": "Bearer"},
)
access_token = create_access_token(data={"sub": str(user.id)})
refresh_token = create_refresh_token(data={"sub": str(user.id)})
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer",
}
async def authenticate_user(
db: AsyncSession, email: str, password: str
) -> User | None:
stmt = select(User).where(User.email == email)
result = await db.execute(stmt)
user = result.scalar_one_or_none()
if user and verify_password(password, user.hashed_password):
return user
return NoneToken Refresh
令牌刷新
python
@router.post("/refresh")
async def refresh_token(
refresh_token: str,
db: AsyncSession = Depends(get_db),
):
payload = verify_token(refresh_token, expected_type="refresh")
user_id = payload.get("sub")
user = await db.get(User, int(user_id))
if not user or not user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid refresh token",
)
new_access_token = create_access_token(data={"sub": str(user.id)})
return {"access_token": new_access_token, "token_type": "bearer"}python
@router.post("/refresh")
async def refresh_token(
refresh_token: str,
db: AsyncSession = Depends(get_db),
):
payload = verify_token(refresh_token, expected_type="refresh")
user_id = payload.get("sub")
user = await db.get(User, int(user_id))
if not user or not user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无效的刷新令牌",
)
new_access_token = create_access_token(data={"sub": str(user.id)})
return {"access_token": new_access_token, "token_type": "bearer"}Protecting Routes
路由保护
python
from fastapi import APIRouter, Depends
router = APIRouter(prefix="/users", tags=["users"])python
from fastapi import APIRouter, Depends
router = APIRouter(prefix="/users", tags=["users"])Require authentication
需要认证
@router.get("/me")
async def read_current_user(
current_user: User = Depends(get_current_user),
):
return current_user
@router.get("/me")
async def read_current_user(
current_user: User = Depends(get_current_user),
):
return current_user
Protect all routes in a router
保护路由中的所有端点
protected_router = APIRouter(
prefix="/admin",
tags=["admin"],
dependencies=[Depends(get_current_user)],
)
undefinedprotected_router = APIRouter(
prefix="/admin",
tags=["admin"],
dependencies=[Depends(get_current_user)],
)
undefinedRole-Based Access Control (RBAC)
基于角色的访问控制(RBAC)
Role Model
角色模型
python
from enum import StrEnum
class Role(StrEnum):
USER = "user"
ADMIN = "admin"
MODERATOR = "moderator"
class User(TimestampMixin, Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
email: Mapped[str] = mapped_column(String(255), unique=True)
role: Mapped[str] = mapped_column(String(20), default=Role.USER)
# ...python
from enum import StrEnum
class Role(StrEnum):
USER = "user"
ADMIN = "admin"
MODERATOR = "moderator"
class User(TimestampMixin, Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
email: Mapped[str] = mapped_column(String(255), unique=True)
role: Mapped[str] = mapped_column(String(20), default=Role.USER)
# ...Role Checker Dependency
角色检查依赖项
python
from fastapi import Depends, HTTPException, status
class RoleChecker:
def __init__(self, allowed_roles: list[Role]):
self.allowed_roles = allowed_roles
async def __call__(
self, user: User = Depends(get_current_user)
) -> User:
if user.role not in self.allowed_roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Insufficient permissions",
)
return userpython
from fastapi import Depends, HTTPException, status
class RoleChecker:
def __init__(self, allowed_roles: list[Role]):
self.allowed_roles = allowed_roles
async def __call__(
self, user: User = Depends(get_current_user)
) -> User:
if user.role not in self.allowed_roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="权限不足",
)
return userUsage as dependency
依赖项使用示例
allow_admin = RoleChecker([Role.ADMIN])
allow_moderator = RoleChecker([Role.ADMIN, Role.MODERATOR])
@router.delete("/{user_id}")
async def delete_user(
user_id: int,
current_user: User = Depends(allow_admin),
db: AsyncSession = Depends(get_db),
):
# Only admins can reach this
...
undefinedallow_admin = RoleChecker([Role.ADMIN])
allow_moderator = RoleChecker([Role.ADMIN, Role.MODERATOR])
@router.delete("/{user_id}")
async def delete_user(
user_id: int,
current_user: User = Depends(allow_admin),
db: AsyncSession = Depends(get_db),
):
# 仅管理员可访问此端点
...
undefinedAPI Key Authentication
API密钥认证
For service-to-service or simple API key auth:
python
from fastapi import Security
from fastapi.security import APIKeyHeader
api_key_header = APIKeyHeader(name="X-API-Key")
async def verify_api_key(
api_key: str = Security(api_key_header),
) -> str:
if api_key != settings.api_key:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid API key",
)
return api_key
@router.get("/external-data", dependencies=[Depends(verify_api_key)])
async def get_external_data():
...适用于服务间通信或简单API密钥认证场景:
python
from fastapi import Security
from fastapi.security import APIKeyHeader
api_key_header = APIKeyHeader(name="X-API-Key")
async def verify_api_key(
api_key: str = Security(api_key_header),
) -> str:
if api_key != settings.api_key:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="无效的API密钥",
)
return api_key
@router.get("/external-data", dependencies=[Depends(verify_api_key)])
async def get_external_data():
...Security Best Practices
安全最佳实践
- Use environment variables for secrets -- never hardcode , API keys, or database credentials.
secret_key - Set token expiry short for access tokens (15-30 min) and longer for refresh tokens (7-30 days).
- Use HTTPS in production -- tokens sent over HTTP can be intercepted.
- Validate token type -- prevent refresh tokens from being used as access tokens.
- Rate limit auth endpoints -- prevent brute-force attacks on login.
- Hash passwords with bcrypt -- never use MD5, SHA-256, or other fast hashes for passwords.
- Return generic error messages -- "Incorrect email or password" not "User not found" vs "Wrong password".
- Log authentication events -- track login attempts, failures, and token refreshes.
- Invalidate tokens on password change -- include a token version or claim.
iat - Use for cleaner dependency injection:
Annotated
python
from typing import Annotated
CurrentUser = Annotated[User, Depends(get_current_user)]
AdminUser = Annotated[User, Depends(allow_admin)]
@router.get("/me")
async def read_me(user: CurrentUser):
return user- 使用环境变量存储敏感信息 -- 切勿硬编码、API密钥或数据库凭证。
secret_key - 设置合理的令牌过期时间 -- 访问令牌有效期设为15-30分钟,刷新令牌设为7-30天。
- 生产环境使用HTTPS -- HTTP传输令牌可能被拦截。
- 验证令牌类型 -- 禁止将刷新令牌用作访问令牌。
- 对认证端点限流 -- 防止登录接口遭受暴力破解攻击。
- 使用bcrypt哈希密码 -- 切勿使用MD5、SHA-256等快速哈希算法处理密码。
- 返回通用错误信息 -- 例如返回“邮箱或密码错误”,而非区分“用户不存在”或“密码错误”。
- 记录认证事件 -- 跟踪登录尝试、失败及令牌刷新操作。
- 密码变更时失效令牌 -- 可通过令牌版本或声明实现。
iat - 使用简化依赖注入:
Annotated
python
from typing import Annotated
CurrentUser = Annotated[User, Depends(get_current_user)]
AdminUser = Annotated[User, Depends(allow_admin)]
@router.get("/me")
async def read_me(user: CurrentUser):
return userCross-References
交叉参考
- For Pydantic request/response models, consult the skill.
pydantic - For database models and sessions, consult the skill.
database - For FastAPI routing and middleware, consult the skill.
app-scaffolding - For CORS and security middleware, consult the references.
app-scaffolding - For testing auth flows, consult the skill.
test-runner
- 关于Pydantic请求/响应模型,请参考技能文档。
pydantic - 关于数据库模型与会话,请参考技能文档。
database - 关于FastAPI路由与中间件,请参考技能文档。
app-scaffolding - 关于CORS与安全中间件,请参考相关文档。
app-scaffolding - 关于认证流程测试,请参考技能文档。
test-runner