auth-security

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Authentication & Authorization for FastAPI

FastAPI 认证与授权

Overview

概述

FastAPI provides built-in security utilities based on OpenAPI standards. Use OAuth2 with Password flow + JWT tokens as the standard pattern for API authentication. Combine with bcrypt for password hashing and role-based access control (RBAC) for authorization.
Key packages:
bash
uv add "python-jose[cryptography]" passlib[bcrypt] python-multipart
FastAPI 提供基于OpenAPI标准的内置安全工具。推荐使用OAuth2密码流 + JWT令牌作为API认证的标准方案,结合bcrypt进行密码哈希,并通过基于角色的访问控制(RBAC)实现授权。
关键依赖包:
bash
uv add "python-jose[cryptography]" passlib[bcrypt] python-multipart

or with PyJWT instead of python-jose:

或使用PyJWT替代python-jose:

uv add PyJWT[crypto] passlib[bcrypt] python-multipart

- `python-jose` or `PyJWT` -- JWT token creation and verification
- `passlib[bcrypt]` -- secure password hashing
- `python-multipart` -- required for OAuth2 form data parsing
uv add PyJWT[crypto] passlib[bcrypt] python-multipart

- `python-jose` 或 `PyJWT` -- JWT令牌的创建与验证
- `passlib[bcrypt]` -- 安全的密码哈希处理
- `python-multipart` -- 解析OAuth2表单数据所需

Password Hashing

密码哈希

Never store plaintext passwords. Use bcrypt through passlib:
python
from passlib.context import CryptContext

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


def hash_password(password: str) -> str:
    return pwd_context.hash(password)


def verify_password(plain_password: str, hashed_password: str) -> bool:
    return pwd_context.verify(plain_password, hashed_password)
切勿存储明文密码。通过passlib使用bcrypt:
python
from passlib.context import CryptContext

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


def hash_password(password: str) -> str:
    return pwd_context.hash(password)


def verify_password(plain_password: str, hashed_password: str) -> bool:
    return pwd_context.verify(plain_password, hashed_password)

JWT Token Management

JWT令牌管理

Token Creation

令牌创建

python
from datetime import datetime, timedelta, timezone

from jose import jwt  # or: import jwt (PyJWT)
from pydantic import BaseModel


class TokenConfig(BaseModel):
    secret_key: str = "your-secret-key"  # Use env variable in production
    algorithm: str = "HS256"
    access_token_expire_minutes: int = 30
    refresh_token_expire_days: int = 7


token_config = TokenConfig()


def create_access_token(
    data: dict,
    expires_delta: timedelta | None = None,
) -> str:
    to_encode = data.copy()
    expire = datetime.now(timezone.utc) + (
        expires_delta
        or timedelta(minutes=token_config.access_token_expire_minutes)
    )
    to_encode.update({"exp": expire, "type": "access"})
    return jwt.encode(
        to_encode,
        token_config.secret_key,
        algorithm=token_config.algorithm,
    )


def create_refresh_token(data: dict) -> str:
    to_encode = data.copy()
    expire = datetime.now(timezone.utc) + timedelta(
        days=token_config.refresh_token_expire_days
    )
    to_encode.update({"exp": expire, "type": "refresh"})
    return jwt.encode(
        to_encode,
        token_config.secret_key,
        algorithm=token_config.algorithm,
    )
python
from datetime import datetime, timedelta, timezone

from jose import jwt  # 或:import jwt (PyJWT)
from pydantic import BaseModel


class TokenConfig(BaseModel):
    secret_key: str = "your-secret-key"  # 生产环境请使用环境变量
    algorithm: str = "HS256"
    access_token_expire_minutes: int = 30
    refresh_token_expire_days: int = 7


token_config = TokenConfig()


def create_access_token(
    data: dict,
    expires_delta: timedelta | None = None,
) -> str:
    to_encode = data.copy()
    expire = datetime.now(timezone.utc) + (
        expires_delta
        or timedelta(minutes=token_config.access_token_expire_minutes)
    )
    to_encode.update({"exp": expire, "type": "access"})
    return jwt.encode(
        to_encode,
        token_config.secret_key,
        algorithm=token_config.algorithm,
    )


def create_refresh_token(data: dict) -> str:
    to_encode = data.copy()
    expire = datetime.now(timezone.utc) + timedelta(
        days=token_config.refresh_token_expire_days
    )
    to_encode.update({"exp": expire, "type": "refresh"})
    return jwt.encode(
        to_encode,
        token_config.secret_key,
        algorithm=token_config.algorithm,
    )

Token Verification

令牌验证

python
from jose import JWTError, jwt
from fastapi import HTTPException, status


def verify_token(token: str, expected_type: str = "access") -> dict:
    try:
        payload = jwt.decode(
            token,
            token_config.secret_key,
            algorithms=[token_config.algorithm],
        )
        if payload.get("type") != expected_type:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Invalid token type",
            )
        return payload
    except JWTError:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Could not validate credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )
python
from jose import JWTError, jwt
from fastapi import HTTPException, status


def verify_token(token: str, expected_type: str = "access") -> dict:
    try:
        payload = jwt.decode(
            token,
            token_config.secret_key,
            algorithms=[token_config.algorithm],
        )
        if payload.get("type") != expected_type:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="无效的令牌类型",
            )
        return payload
    except JWTError:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="凭证验证失败",
            headers={"WWW-Authenticate": "Bearer"},
        )

FastAPI Security Dependencies

FastAPI安全依赖项

OAuth2 Password Bearer

OAuth2密码承载

python
from fastapi import Depends
from fastapi.security import OAuth2PasswordBearer

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")


async def get_current_user(
    token: str = Depends(oauth2_scheme),
    db: AsyncSession = Depends(get_db),
) -> User:
    payload = verify_token(token)
    user_id = payload.get("sub")
    if user_id is None:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid token payload",
        )
    user = await db.get(User, int(user_id))
    if user is None or not user.is_active:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="User not found or inactive",
        )
    return user


async def get_current_active_user(
    user: User = Depends(get_current_user),
) -> User:
    if not user.is_active:
        raise HTTPException(status_code=400, detail="Inactive user")
    return user
python
from fastapi import Depends
from fastapi.security import OAuth2PasswordBearer

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")


async def get_current_user(
    token: str = Depends(oauth2_scheme),
    db: AsyncSession = Depends(get_db),
) -> User:
    payload = verify_token(token)
    user_id = payload.get("sub")
    if user_id is None:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="无效的令牌负载",
        )
    user = await db.get(User, int(user_id))
    if user is None or not user.is_active:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="用户不存在或已禁用",
        )
    return user


async def get_current_active_user(
    user: User = Depends(get_current_user),
) -> User:
    if not user.is_active:
        raise HTTPException(status_code=400, detail="用户已禁用")
    return user

Login Endpoint

登录端点

python
from fastapi import APIRouter
from fastapi.security import OAuth2PasswordRequestForm

router = APIRouter(prefix="/auth", tags=["auth"])


@router.post("/login")
async def login(
    form_data: OAuth2PasswordRequestForm = Depends(),
    db: AsyncSession = Depends(get_db),
):
    user = await authenticate_user(db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect email or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token = create_access_token(data={"sub": str(user.id)})
    refresh_token = create_refresh_token(data={"sub": str(user.id)})
    return {
        "access_token": access_token,
        "refresh_token": refresh_token,
        "token_type": "bearer",
    }


async def authenticate_user(
    db: AsyncSession, email: str, password: str
) -> User | None:
    stmt = select(User).where(User.email == email)
    result = await db.execute(stmt)
    user = result.scalar_one_or_none()
    if user and verify_password(password, user.hashed_password):
        return user
    return None
python
from fastapi import APIRouter
from fastapi.security import OAuth2PasswordRequestForm

router = APIRouter(prefix="/auth", tags=["auth"])


@router.post("/login")
async def login(
    form_data: OAuth2PasswordRequestForm = Depends(),
    db: AsyncSession = Depends(get_db),
):
    user = await authenticate_user(db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="邮箱或密码错误",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token = create_access_token(data={"sub": str(user.id)})
    refresh_token = create_refresh_token(data={"sub": str(user.id)})
    return {
        "access_token": access_token,
        "refresh_token": refresh_token,
        "token_type": "bearer",
    }


async def authenticate_user(
    db: AsyncSession, email: str, password: str
) -> User | None:
    stmt = select(User).where(User.email == email)
    result = await db.execute(stmt)
    user = result.scalar_one_or_none()
    if user and verify_password(password, user.hashed_password):
        return user
    return None

Token Refresh

令牌刷新

python
@router.post("/refresh")
async def refresh_token(
    refresh_token: str,
    db: AsyncSession = Depends(get_db),
):
    payload = verify_token(refresh_token, expected_type="refresh")
    user_id = payload.get("sub")
    user = await db.get(User, int(user_id))
    if not user or not user.is_active:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid refresh token",
        )
    new_access_token = create_access_token(data={"sub": str(user.id)})
    return {"access_token": new_access_token, "token_type": "bearer"}
python
@router.post("/refresh")
async def refresh_token(
    refresh_token: str,
    db: AsyncSession = Depends(get_db),
):
    payload = verify_token(refresh_token, expected_type="refresh")
    user_id = payload.get("sub")
    user = await db.get(User, int(user_id))
    if not user or not user.is_active:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="无效的刷新令牌",
        )
    new_access_token = create_access_token(data={"sub": str(user.id)})
    return {"access_token": new_access_token, "token_type": "bearer"}

Protecting Routes

路由保护

python
from fastapi import APIRouter, Depends

router = APIRouter(prefix="/users", tags=["users"])
python
from fastapi import APIRouter, Depends

router = APIRouter(prefix="/users", tags=["users"])

Require authentication

需要认证

@router.get("/me") async def read_current_user( current_user: User = Depends(get_current_user), ): return current_user
@router.get("/me") async def read_current_user( current_user: User = Depends(get_current_user), ): return current_user

Protect all routes in a router

保护路由中的所有端点

protected_router = APIRouter( prefix="/admin", tags=["admin"], dependencies=[Depends(get_current_user)], )
undefined
protected_router = APIRouter( prefix="/admin", tags=["admin"], dependencies=[Depends(get_current_user)], )
undefined

Role-Based Access Control (RBAC)

基于角色的访问控制(RBAC)

Role Model

角色模型

python
from enum import StrEnum


class Role(StrEnum):
    USER = "user"
    ADMIN = "admin"
    MODERATOR = "moderator"


class User(TimestampMixin, Base):
    __tablename__ = "users"

    id: Mapped[int] = mapped_column(primary_key=True)
    email: Mapped[str] = mapped_column(String(255), unique=True)
    role: Mapped[str] = mapped_column(String(20), default=Role.USER)
    # ...
python
from enum import StrEnum


class Role(StrEnum):
    USER = "user"
    ADMIN = "admin"
    MODERATOR = "moderator"


class User(TimestampMixin, Base):
    __tablename__ = "users"

    id: Mapped[int] = mapped_column(primary_key=True)
    email: Mapped[str] = mapped_column(String(255), unique=True)
    role: Mapped[str] = mapped_column(String(20), default=Role.USER)
    # ...

Role Checker Dependency

角色检查依赖项

python
from fastapi import Depends, HTTPException, status


class RoleChecker:
    def __init__(self, allowed_roles: list[Role]):
        self.allowed_roles = allowed_roles

    async def __call__(
        self, user: User = Depends(get_current_user)
    ) -> User:
        if user.role not in self.allowed_roles:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail="Insufficient permissions",
            )
        return user
python
from fastapi import Depends, HTTPException, status


class RoleChecker:
    def __init__(self, allowed_roles: list[Role]):
        self.allowed_roles = allowed_roles

    async def __call__(
        self, user: User = Depends(get_current_user)
    ) -> User:
        if user.role not in self.allowed_roles:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail="权限不足",
            )
        return user

Usage as dependency

依赖项使用示例

allow_admin = RoleChecker([Role.ADMIN]) allow_moderator = RoleChecker([Role.ADMIN, Role.MODERATOR])
@router.delete("/{user_id}") async def delete_user( user_id: int, current_user: User = Depends(allow_admin), db: AsyncSession = Depends(get_db), ): # Only admins can reach this ...
undefined
allow_admin = RoleChecker([Role.ADMIN]) allow_moderator = RoleChecker([Role.ADMIN, Role.MODERATOR])
@router.delete("/{user_id}") async def delete_user( user_id: int, current_user: User = Depends(allow_admin), db: AsyncSession = Depends(get_db), ): # 仅管理员可访问此端点 ...
undefined

API Key Authentication

API密钥认证

For service-to-service or simple API key auth:
python
from fastapi import Security
from fastapi.security import APIKeyHeader

api_key_header = APIKeyHeader(name="X-API-Key")


async def verify_api_key(
    api_key: str = Security(api_key_header),
) -> str:
    if api_key != settings.api_key:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Invalid API key",
        )
    return api_key


@router.get("/external-data", dependencies=[Depends(verify_api_key)])
async def get_external_data():
    ...
适用于服务间通信或简单API密钥认证场景:
python
from fastapi import Security
from fastapi.security import APIKeyHeader

api_key_header = APIKeyHeader(name="X-API-Key")


async def verify_api_key(
    api_key: str = Security(api_key_header),
) -> str:
    if api_key != settings.api_key:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="无效的API密钥",
        )
    return api_key


@router.get("/external-data", dependencies=[Depends(verify_api_key)])
async def get_external_data():
    ...

Security Best Practices

安全最佳实践

  1. Use environment variables for secrets -- never hardcode
    secret_key
    , API keys, or database credentials.
  2. Set token expiry short for access tokens (15-30 min) and longer for refresh tokens (7-30 days).
  3. Use HTTPS in production -- tokens sent over HTTP can be intercepted.
  4. Validate token type -- prevent refresh tokens from being used as access tokens.
  5. Rate limit auth endpoints -- prevent brute-force attacks on login.
  6. Hash passwords with bcrypt -- never use MD5, SHA-256, or other fast hashes for passwords.
  7. Return generic error messages -- "Incorrect email or password" not "User not found" vs "Wrong password".
  8. Log authentication events -- track login attempts, failures, and token refreshes.
  9. Invalidate tokens on password change -- include a token version or
    iat
    claim.
  10. Use
    Annotated
    for cleaner dependency injection
    :
python
from typing import Annotated

CurrentUser = Annotated[User, Depends(get_current_user)]
AdminUser = Annotated[User, Depends(allow_admin)]


@router.get("/me")
async def read_me(user: CurrentUser):
    return user
  1. 使用环境变量存储敏感信息 -- 切勿硬编码
    secret_key
    、API密钥或数据库凭证。
  2. 设置合理的令牌过期时间 -- 访问令牌有效期设为15-30分钟,刷新令牌设为7-30天。
  3. 生产环境使用HTTPS -- HTTP传输令牌可能被拦截。
  4. 验证令牌类型 -- 禁止将刷新令牌用作访问令牌。
  5. 对认证端点限流 -- 防止登录接口遭受暴力破解攻击。
  6. 使用bcrypt哈希密码 -- 切勿使用MD5、SHA-256等快速哈希算法处理密码。
  7. 返回通用错误信息 -- 例如返回“邮箱或密码错误”,而非区分“用户不存在”或“密码错误”。
  8. 记录认证事件 -- 跟踪登录尝试、失败及令牌刷新操作。
  9. 密码变更时失效令牌 -- 可通过令牌版本或
    iat
    声明实现。
  10. 使用
    Annotated
    简化依赖注入
python
from typing import Annotated

CurrentUser = Annotated[User, Depends(get_current_user)]
AdminUser = Annotated[User, Depends(allow_admin)]


@router.get("/me")
async def read_me(user: CurrentUser):
    return user

Cross-References

交叉参考

  • For Pydantic request/response models, consult the
    pydantic
    skill.
  • For database models and sessions, consult the
    database
    skill.
  • For FastAPI routing and middleware, consult the
    app-scaffolding
    skill.
  • For CORS and security middleware, consult the
    app-scaffolding
    references.
  • For testing auth flows, consult the
    test-runner
    skill.
  • 关于Pydantic请求/响应模型,请参考
    pydantic
    技能文档。
  • 关于数据库模型与会话,请参考
    database
    技能文档。
  • 关于FastAPI路由与中间件,请参考
    app-scaffolding
    技能文档。
  • 关于CORS与安全中间件,请参考
    app-scaffolding
    相关文档。
  • 关于认证流程测试,请参考
    test-runner
    技能文档。