authentication

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Authentication Skill

身份验证Skill

Provides comprehensive authentication and authorization capabilities for the Golden Armada AI Agent Fleet Platform.
为Golden Armada AI Agent Fleet Platform提供全面的身份验证与授权能力。

When to Use This Skill

何时使用此Skill

Activate this skill when working with:
  • User authentication flows
  • JWT token management
  • OAuth2 integration
  • Session management
  • Role-based access control (RBAC)
在处理以下场景时启用此Skill:
  • 用户身份验证流程
  • JWT令牌管理
  • OAuth2集成
  • 会话管理
  • 基于角色的访问控制(RBAC)

JWT Authentication

JWT身份验证

Token Generation

令牌生成

```python from jose import jwt from datetime import datetime, timedelta from passlib.context import CryptContext
SECRET_KEY = os.environ["JWT_SECRET"] ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30 REFRESH_TOKEN_EXPIRE_DAYS = 7
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def create_access_token(user_id: str, roles: list[str]) -> str: expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) payload = { "sub": user_id, "roles": roles, "exp": expire, "iat": datetime.utcnow(), "type": "access" } return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
def create_refresh_token(user_id: str) -> str: expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS) payload = { "sub": user_id, "exp": expire, "type": "refresh" } return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
def verify_token(token: str) -> dict: try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) return payload except jwt.ExpiredSignatureError: raise HTTPException(status_code=401, detail="Token expired") except jwt.JWTError: raise HTTPException(status_code=401, detail="Invalid token") ```
python
from jose import jwt
from datetime import datetime, timedelta
from passlib.context import CryptContext

SECRET_KEY = os.environ["JWT_SECRET"]
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
REFRESH_TOKEN_EXPIRE_DAYS = 7

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def create_access_token(user_id: str, roles: list[str]) -> str:
    expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    payload = {
        "sub": user_id,
        "roles": roles,
        "exp": expire,
        "iat": datetime.utcnow(),
        "type": "access"
    }
    return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)

def create_refresh_token(user_id: str) -> str:
    expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
    payload = {
        "sub": user_id,
        "exp": expire,
        "type": "refresh"
    }
    return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)

def verify_token(token: str) -> dict:
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        return payload
    except jwt.ExpiredSignatureError:
        raise HTTPException(status_code=401, detail="Token expired")
    except jwt.JWTError:
        raise HTTPException(status_code=401, detail="Invalid token")

Password Hashing

密码哈希

```python def hash_password(password: str) -> str: return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool: return pwd_context.verify(plain_password, hashed_password)
python
def hash_password(password: str) -> str:
    return pwd_context.hash(password)

def verify_password(plain_password: str, hashed_password: str) -> bool:
    return pwd_context.verify(plain_password, hashed_password)

Usage

Usage

async def authenticate_user(email: str, password: str) -> User | None: user = await get_user_by_email(email) if not user: return None if not verify_password(password, user.hashed_password): return None return user ```
async def authenticate_user(email: str, password: str) -> User | None: user = await get_user_by_email(email) if not user: return None if not verify_password(password, user.hashed_password): return None return user
undefined

FastAPI Auth Dependencies

FastAPI认证依赖

```python from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/token")
async def get_current_user(token: str = Depends(oauth2_scheme)) -> User: payload = verify_token(token) user = await get_user(payload["sub"]) if not user: raise HTTPException(status_code=401, detail="User not found") return user
async def get_current_active_user(user: User = Depends(get_current_user)) -> User: if not user.is_active: raise HTTPException(status_code=400, detail="Inactive user") return user
python
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/token")

async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
    payload = verify_token(token)
    user = await get_user(payload["sub"])
    if not user:
        raise HTTPException(status_code=401, detail="User not found")
    return user

async def get_current_active_user(user: User = Depends(get_current_user)) -> User:
    if not user.is_active:
        raise HTTPException(status_code=400, detail="Inactive user")
    return user

Role-based access

Role-based access

def require_roles(*roles: str): async def role_checker(user: User = Depends(get_current_user)): if not any(role in user.roles for role in roles): raise HTTPException(status_code=403, detail="Insufficient permissions") return user return role_checker
def require_roles(*roles: str): async def role_checker(user: User = Depends(get_current_user)): if not any(role in user.roles for role in roles): raise HTTPException(status_code=403, detail="Insufficient permissions") return user return role_checker

Usage

Usage

@app.get("/admin") async def admin_route(user: User = Depends(require_roles("admin"))): return {"message": "Admin access granted"} ```
@app.get("/admin") async def admin_route(user: User = Depends(require_roles("admin"))): return {"message": "Admin access granted"}
undefined

OAuth2 Integration

OAuth2集成

Google OAuth2

Google OAuth2

```python from authlib.integrations.starlette_client import OAuth
oauth = OAuth() oauth.register( name='google', client_id=os.environ['GOOGLE_CLIENT_ID'], client_secret=os.environ['GOOGLE_CLIENT_SECRET'], server_metadata_url='https://accounts.google.com/.well-known/openid-configuration', client_kwargs={'scope': 'openid email profile'} )
@app.get('/auth/google') async def google_login(request: Request): redirect_uri = request.url_for('google_callback') return await oauth.google.authorize_redirect(request, redirect_uri)
@app.get('/auth/google/callback') async def google_callback(request: Request): token = await oauth.google.authorize_access_token(request) user_info = token.get('userinfo')
# Find or create user
user = await get_or_create_user(
    email=user_info['email'],
    name=user_info['name'],
    provider='google'
)

# Generate JWT
access_token = create_access_token(user.id, user.roles)
return {"access_token": access_token, "token_type": "bearer"}
```
python
from authlib.integrations.starlette_client import OAuth

oauth = OAuth()
oauth.register(
    name='google',
    client_id=os.environ['GOOGLE_CLIENT_ID'],
    client_secret=os.environ['GOOGLE_CLIENT_SECRET'],
    server_metadata_url='https://accounts.google.com/.well-known/openid-configuration',
    client_kwargs={'scope': 'openid email profile'}
)

@app.get('/auth/google')
async def google_login(request: Request):
    redirect_uri = request.url_for('google_callback')
    return await oauth.google.authorize_redirect(request, redirect_uri)

@app.get('/auth/google/callback')
async def google_callback(request: Request):
    token = await oauth.google.authorize_access_token(request)
    user_info = token.get('userinfo')

    # Find or create user
    user = await get_or_create_user(
        email=user_info['email'],
        name=user_info['name'],
        provider='google'
    )

    # Generate JWT
    access_token = create_access_token(user.id, user.roles)
    return {"access_token": access_token, "token_type": "bearer"}

GitHub OAuth2

GitHub OAuth2

```python oauth.register( name='github', client_id=os.environ['GITHUB_CLIENT_ID'], client_secret=os.environ['GITHUB_CLIENT_SECRET'], authorize_url='https://github.com/login/oauth/authorize', access_token_url='https://github.com/login/oauth/access_token', api_base_url='https://api.github.com/', client_kwargs={'scope': 'user:email'} ) ```
python
oauth.register(
    name='github',
    client_id=os.environ['GITHUB_CLIENT_ID'],
    client_secret=os.environ['GITHUB_CLIENT_SECRET'],
    authorize_url='https://github.com/login/oauth/authorize',
    access_token_url='https://github.com/login/oauth/access_token',
    api_base_url='https://api.github.com/',
    client_kwargs={'scope': 'user:email'}
)

Session Management

会话管理

```python from fastapi import Request, Response import secrets
async def create_session(user_id: str, response: Response) -> str: session_id = secrets.token_urlsafe(32)
# Store in Redis
await redis.hset(f"session:{session_id}", mapping={
    "user_id": user_id,
    "created_at": datetime.utcnow().isoformat()
})
await redis.expire(f"session:{session_id}", 86400)  # 24 hours

# Set cookie
response.set_cookie(
    key="session_id",
    value=session_id,
    httponly=True,
    secure=True,
    samesite="lax",
    max_age=86400
)

return session_id
async def get_session(request: Request) -> dict | None: session_id = request.cookies.get("session_id") if not session_id: return None
session = await redis.hgetall(f"session:{session_id}")
if not session:
    return None

# Refresh TTL
await redis.expire(f"session:{session_id}", 86400)
return session
async def destroy_session(request: Request, response: Response): session_id = request.cookies.get("session_id") if session_id: await redis.delete(f"session:{session_id}") response.delete_cookie("session_id") ```
python
from fastapi import Request, Response
import secrets

async def create_session(user_id: str, response: Response) -> str:
    session_id = secrets.token_urlsafe(32)

    # Store in Redis
    await redis.hset(f"session:{session_id}", mapping={
        "user_id": user_id,
        "created_at": datetime.utcnow().isoformat()
    })
    await redis.expire(f"session:{session_id}", 86400)  # 24 hours

    # Set cookie
    response.set_cookie(
        key="session_id",
        value=session_id,
        httponly=True,
        secure=True,
        samesite="lax",
        max_age=86400
    )

    return session_id

async def get_session(request: Request) -> dict | None:
    session_id = request.cookies.get("session_id")
    if not session_id:
        return None

    session = await redis.hgetall(f"session:{session_id}")
    if not session:
        return None

    # Refresh TTL
    await redis.expire(f"session:{session_id}", 86400)
    return session

async def destroy_session(request: Request, response: Response):
    session_id = request.cookies.get("session_id")
    if session_id:
        await redis.delete(f"session:{session_id}")
    response.delete_cookie("session_id")

RBAC Implementation

RBAC实现

```python from enum import Enum from typing import Set
class Permission(str, Enum): READ_AGENTS = "read:agents" WRITE_AGENTS = "write:agents" DELETE_AGENTS = "delete:agents" ADMIN = "admin"
ROLE_PERMISSIONS: dict[str, Set[Permission]] = { "viewer": {Permission.READ_AGENTS}, "operator": {Permission.READ_AGENTS, Permission.WRITE_AGENTS}, "admin": {Permission.READ_AGENTS, Permission.WRITE_AGENTS, Permission.DELETE_AGENTS, Permission.ADMIN}, }
def has_permission(user_roles: list[str], required: Permission) -> bool: for role in user_roles: if role in ROLE_PERMISSIONS and required in ROLE_PERMISSIONS[role]: return True return False
def require_permission(permission: Permission): async def permission_checker(user: User = Depends(get_current_user)): if not has_permission(user.roles, permission): raise HTTPException(status_code=403, detail="Permission denied") return user return permission_checker
python
from enum import Enum
from typing import Set

class Permission(str, Enum):
    READ_AGENTS = "read:agents"
    WRITE_AGENTS = "write:agents"
    DELETE_AGENTS = "delete:agents"
    ADMIN = "admin"

ROLE_PERMISSIONS: dict[str, Set[Permission]] = {
    "viewer": {Permission.READ_AGENTS},
    "operator": {Permission.READ_AGENTS, Permission.WRITE_AGENTS},
    "admin": {Permission.READ_AGENTS, Permission.WRITE_AGENTS, Permission.DELETE_AGENTS, Permission.ADMIN},
}

def has_permission(user_roles: list[str], required: Permission) -> bool:
    for role in user_roles:
        if role in ROLE_PERMISSIONS and required in ROLE_PERMISSIONS[role]:
            return True
    return False

def require_permission(permission: Permission):
    async def permission_checker(user: User = Depends(get_current_user)):
        if not has_permission(user.roles, permission):
            raise HTTPException(status_code=403, detail="Permission denied")
        return user
    return permission_checker

Usage

Usage

@app.delete("/agents/{id}") async def delete_agent( id: str, user: User = Depends(require_permission(Permission.DELETE_AGENTS)) ): await agent_service.delete(id) return {"status": "deleted"} ```
@app.delete("/agents/{id}") async def delete_agent( id: str, user: User = Depends(require_permission(Permission.DELETE_AGENTS)) ): await agent_service.delete(id) return {"status": "deleted"}
undefined

Security Best Practices

安全最佳实践

  1. Use HTTPS always in production
  2. Hash passwords with bcrypt or argon2
  3. Short-lived access tokens (15-30 minutes)
  4. Refresh token rotation on each use
  5. HttpOnly, Secure cookies for tokens
  6. Rate limit authentication endpoints
  7. Log authentication events for auditing
  8. Implement account lockout after failed attempts
  1. 始终在生产环境使用HTTPS
  2. 使用bcrypt或argon2哈希密码
  3. 短生命周期访问令牌(15-30分钟)
  4. 每次使用时轮换刷新令牌
  5. 为令牌使用HttpOnly、Secure Cookie
  6. 对认证端点进行速率限制
  7. 记录认证事件用于审计
  8. 失败尝试后实现账户锁定

OAuth 2.0 & OIDC Best Practices

OAuth 2.0 & OIDC最佳实践

OAuth 2.0 Grant Types

OAuth 2.0授权类型

Authorization Code Flow (Recommended for Web Apps)

授权码流程(Web应用推荐)

python
from authlib.integrations.starlette_client import OAuth

oauth = OAuth()
oauth.register(
    name='custom_provider',
    client_id=os.environ['OAUTH_CLIENT_ID'],
    client_secret=os.environ['OAUTH_CLIENT_SECRET'],
    authorize_url='https://provider.com/oauth/authorize',
    authorize_params=None,
    access_token_url='https://provider.com/oauth/token',
    access_token_params=None,
    refresh_token_url=None,
    client_kwargs={'scope': 'openid profile email'}
)

@app.get('/auth/login')
async def login(request: Request):
    # Generate state for CSRF protection
    state = secrets.token_urlsafe(32)
    await redis.set(f"oauth_state:{state}", "1", ex=600)

    redirect_uri = request.url_for('auth_callback')
    return await oauth.custom_provider.authorize_redirect(
        request,
        redirect_uri,
        state=state
    )

@app.get('/auth/callback')
async def auth_callback(request: Request):
    # Verify state (CSRF protection)
    state = request.query_params.get('state')
    if not await redis.get(f"oauth_state:{state}"):
        raise HTTPException(status_code=400, detail="Invalid state")

    await redis.delete(f"oauth_state:{state}")

    # Exchange authorization code for tokens
    token = await oauth.custom_provider.authorize_access_token(request)
    user_info = token.get('userinfo')

    # Create or update user
    user = await upsert_user(user_info)

    # Issue application tokens
    access_token = create_access_token(user.id, user.roles)
    refresh_token = create_refresh_token(user.id)

    return {
        "access_token": access_token,
        "refresh_token": refresh_token,
        "token_type": "bearer"
    }
python
from authlib.integrations.starlette_client import OAuth

oauth = OAuth()
oauth.register(
    name='custom_provider',
    client_id=os.environ['OAUTH_CLIENT_ID'],
    client_secret=os.environ['OAUTH_CLIENT_SECRET'],
    authorize_url='https://provider.com/oauth/authorize',
    authorize_params=None,
    access_token_url='https://provider.com/oauth/token',
    access_token_params=None,
    refresh_token_url=None,
    client_kwargs={'scope': 'openid profile email'}
)

@app.get('/auth/login')
async def login(request: Request):
    # Generate state for CSRF protection
    state = secrets.token_urlsafe(32)
    await redis.set(f"oauth_state:{state}", "1", ex=600)

    redirect_uri = request.url_for('auth_callback')
    return await oauth.custom_provider.authorize_redirect(
        request,
        redirect_uri,
        state=state
    )

@app.get('/auth/callback')
async def auth_callback(request: Request):
    # Verify state (CSRF protection)
    state = request.query_params.get('state')
    if not await redis.get(f"oauth_state:{state}"):
        raise HTTPException(status_code=400, detail="Invalid state")

    await redis.delete(f"oauth_state:{state}")

    # Exchange authorization code for tokens
    token = await oauth.custom_provider.authorize_access_token(request)
    user_info = token.get('userinfo')

    # Create or update user
    user = await upsert_user(user_info)

    # Issue application tokens
    access_token = create_access_token(user.id, user.roles)
    refresh_token = create_refresh_token(user.id)

    return {
        "access_token": access_token,
        "refresh_token": refresh_token,
        "token_type": "bearer"
    }

PKCE (Proof Key for Code Exchange) for SPAs

PKCE(授权码流程扩展,适用于SPA)

python
undefined
python
undefined

Frontend (JavaScript/TypeScript)

Frontend (JavaScript/TypeScript)

import { generateCodeVerifier, generateCodeChallenge } from 'oauth-pkce'
// Generate PKCE parameters const codeVerifier = generateCodeVerifier() const codeChallenge = await generateCodeChallenge(codeVerifier)
// Store verifier for later use sessionStorage.setItem('code_verifier', codeVerifier)
// Redirect to authorization endpoint const authUrl = new URL('https://provider.com/oauth/authorize') authUrl.searchParams.set('client_id', CLIENT_ID) authUrl.searchParams.set('redirect_uri', REDIRECT_URI) authUrl.searchParams.set('response_type', 'code') authUrl.searchParams.set('scope', 'openid profile email') authUrl.searchParams.set('code_challenge', codeChallenge) authUrl.searchParams.set('code_challenge_method', 'S256') authUrl.searchParams.set('state', generateState())
window.location.href = authUrl.toString()
// In callback handler const code = new URLSearchParams(window.location.search).get('code') const codeVerifier = sessionStorage.getItem('code_verifier')
const tokenResponse = await fetch('https://provider.com/oauth/token', { method: 'POST', headers: { 'Content-Type': 'application/x-www-form-urlencoded' }, body: new URLSearchParams({ grant_type: 'authorization_code', code: code, redirect_uri: REDIRECT_URI, client_id: CLIENT_ID, code_verifier: codeVerifier }) })
undefined
import { generateCodeVerifier, generateCodeChallenge } from 'oauth-pkce'
// Generate PKCE parameters const codeVerifier = generateCodeVerifier() const codeChallenge = await generateCodeChallenge(codeVerifier)
// Store verifier for later use sessionStorage.setItem('code_verifier', codeVerifier)
// Redirect to authorization endpoint const authUrl = new URL('https://provider.com/oauth/authorize') authUrl.searchParams.set('client_id', CLIENT_ID) authUrl.searchParams.set('redirect_uri', REDIRECT_URI) authUrl.searchParams.set('response_type', 'code') authUrl.searchParams.set('scope', 'openid profile email') authUrl.searchParams.set('code_challenge', codeChallenge) authUrl.searchParams.set('code_challenge_method', 'S256') authUrl.searchParams.set('state', generateState())
window.location.href = authUrl.toString()
// In callback handler const code = new URLSearchParams(window.location.search).get('code') const codeVerifier = sessionStorage.getItem('code_verifier')
const tokenResponse = await fetch('https://provider.com/oauth/token', { method: 'POST', headers: { 'Content-Type': 'application/x-www-form-urlencoded' }, body: new URLSearchParams({ grant_type: 'authorization_code', code: code, redirect_uri: REDIRECT_URI, client_id: CLIENT_ID, code_verifier: codeVerifier }) })
undefined

Client Credentials Flow (Service-to-Service)

客户端凭证流程(服务间通信)

python
import httpx
from datetime import datetime, timedelta

class ServiceAuthClient:
    def __init__(self):
        self.token = None
        self.expires_at = None

    async def get_token(self) -> str:
        # Return cached token if still valid
        if self.token and self.expires_at > datetime.utcnow():
            return self.token

        # Request new token
        async with httpx.AsyncClient() as client:
            response = await client.post(
                'https://provider.com/oauth/token',
                data={
                    'grant_type': 'client_credentials',
                    'client_id': os.environ['SERVICE_CLIENT_ID'],
                    'client_secret': os.environ['SERVICE_CLIENT_SECRET'],
                    'scope': 'api.read api.write'
                }
            )
            response.raise_for_status()
            data = response.json()

            self.token = data['access_token']
            self.expires_at = datetime.utcnow() + timedelta(seconds=data['expires_in'] - 60)

            return self.token
python
import httpx
from datetime import datetime, timedelta

class ServiceAuthClient:
    def __init__(self):
        self.token = None
        self.expires_at = None

    async def get_token(self) -> str:
        # Return cached token if still valid
        if self.token and self.expires_at > datetime.utcnow():
            return self.token

        # Request new token
        async with httpx.AsyncClient() as client:
            response = await client.post(
                'https://provider.com/oauth/token',
                data={
                    'grant_type': 'client_credentials',
                    'client_id': os.environ['SERVICE_CLIENT_ID'],
                    'client_secret': os.environ['SERVICE_CLIENT_SECRET'],
                    'scope': 'api.read api.write'
                }
            )
            response.raise_for_status()
            data = response.json()

            self.token = data['access_token']
            self.expires_at = datetime.utcnow() + timedelta(seconds=data['expires_in'] - 60)

            return self.token

Usage

Usage

auth_client = ServiceAuthClient()
async def call_protected_api(): token = await auth_client.get_token() async with httpx.AsyncClient() as client: response = await client.get( 'https://api.service.com/resource', headers={'Authorization': f'Bearer {token}'} ) return response.json()
undefined
auth_client = ServiceAuthClient()
async def call_protected_api(): token = await auth_client.get_token() async with httpx.AsyncClient() as client: response = await client.get( 'https://api.service.com/resource', headers={'Authorization': f'Bearer {token}'} ) return response.json()
undefined

OpenID Connect (OIDC)

OpenID Connect (OIDC)

ID Token Validation

ID令牌验证

python
from jose import jwt, jwk
from jose.utils import base64url_decode
import httpx

class OIDCValidator:
    def __init__(self, issuer: str, client_id: str):
        self.issuer = issuer
        self.client_id = client_id
        self.jwks = None
        self.jwks_updated_at = None

    async def get_jwks(self) -> dict:
        # Refresh JWKS if stale (cache for 24 hours)
        if not self.jwks or (datetime.utcnow() - self.jwks_updated_at).seconds > 86400:
            async with httpx.AsyncClient() as client:
                response = await client.get(f"{self.issuer}/.well-known/jwks.json")
                response.raise_for_status()
                self.jwks = response.json()
                self.jwks_updated_at = datetime.utcnow()

        return self.jwks

    async def validate_id_token(self, id_token: str) -> dict:
        # Decode header to get key ID
        header = jwt.get_unverified_header(id_token)
        kid = header['kid']

        # Get JWKS and find matching key
        jwks = await self.get_jwks()
        key = next((k for k in jwks['keys'] if k['kid'] == kid), None)

        if not key:
            raise ValueError("Public key not found in JWKS")

        # Convert JWK to PEM
        public_key = jwk.construct(key)

        # Validate and decode ID token
        try:
            claims = jwt.decode(
                id_token,
                public_key.to_pem().decode('utf-8'),
                algorithms=['RS256'],
                audience=self.client_id,
                issuer=self.issuer,
                options={
                    'verify_exp': True,
                    'verify_iat': True,
                    'verify_aud': True,
                    'verify_iss': True
                }
            )

            # Additional validations
            if claims.get('nonce'):
                # Verify nonce matches what was sent in auth request
                pass

            return claims

        except jwt.ExpiredSignatureError:
            raise HTTPException(status_code=401, detail="ID token expired")
        except jwt.JWTClaimsError as e:
            raise HTTPException(status_code=401, detail=f"Invalid ID token claims: {e}")
        except Exception as e:
            raise HTTPException(status_code=401, detail=f"ID token validation failed: {e}")
python
from jose import jwt, jwk
from jose.utils import base64url_decode
import httpx

class OIDCValidator:
    def __init__(self, issuer: str, client_id: str):
        self.issuer = issuer
        self.client_id = client_id
        self.jwks = None
        self.jwks_updated_at = None

    async def get_jwks(self) -> dict:
        # Refresh JWKS if stale (cache for 24 hours)
        if not self.jwks or (datetime.utcnow() - self.jwks_updated_at).seconds > 86400:
            async with httpx.AsyncClient() as client:
                response = await client.get(f"{self.issuer}/.well-known/jwks.json")
                response.raise_for_status()
                self.jwks = response.json()
                self.jwks_updated_at = datetime.utcnow()

        return self.jwks

    async def validate_id_token(self, id_token: str) -> dict:
        # Decode header to get key ID
        header = jwt.get_unverified_header(id_token)
        kid = header['kid']

        # Get JWKS and find matching key
        jwks = await self.get_jwks()
        key = next((k for k in jwks['keys'] if k['kid'] == kid), None)

        if not key:
            raise ValueError("Public key not found in JWKS")

        # Convert JWK to PEM
        public_key = jwk.construct(key)

        # Validate and decode ID token
        try:
            claims = jwt.decode(
                id_token,
                public_key.to_pem().decode('utf-8'),
                algorithms=['RS256'],
                audience=self.client_id,
                issuer=self.issuer,
                options={
                    'verify_exp': True,
                    'verify_iat': True,
                    'verify_aud': True,
                    'verify_iss': True
                }
            )

            # Additional validations
            if claims.get('nonce'):
                # Verify nonce matches what was sent in auth request
                pass

            return claims

        except jwt.ExpiredSignatureError:
            raise HTTPException(status_code=401, detail="ID token expired")
        except jwt.JWTClaimsError as e:
            raise HTTPException(status_code=401, detail=f"Invalid ID token claims: {e}")
        except Exception as e:
            raise HTTPException(status_code=401, detail=f"ID token validation failed: {e}")

Usage

Usage

validator = OIDCValidator( issuer="https://provider.com", client_id=os.environ['OIDC_CLIENT_ID'] )
@app.post("/auth/oidc/callback") async def oidc_callback(id_token: str): claims = await validator.validate_id_token(id_token)
# Extract user information
user = await get_or_create_user(
    email=claims['email'],
    name=claims['name'],
    sub=claims['sub']
)

return {"user": user, "claims": claims}
undefined
validator = OIDCValidator( issuer="https://provider.com", client_id=os.environ['OIDC_CLIENT_ID'] )
@app.post("/auth/oidc/callback") async def oidc_callback(id_token: str): claims = await validator.validate_id_token(id_token)
# Extract user information
user = await get_or_create_user(
    email=claims['email'],
    name=claims['name'],
    sub=claims['sub']
)

return {"user": user, "claims": claims}
undefined

JWT Security Best Practices

JWT安全最佳实践

Secure Token Generation

安全令牌生成

python
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend
import os
python
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend
import os

Use RS256 (asymmetric) instead of HS256 for public verification

Use RS256 (asymmetric) instead of HS256 for public verification

PRIVATE_KEY_PATH = os.environ.get('JWT_PRIVATE_KEY_PATH') PUBLIC_KEY_PATH = os.environ.get('JWT_PUBLIC_KEY_PATH')
def load_keys(): with open(PRIVATE_KEY_PATH, 'rb') as f: private_key = serialization.load_pem_private_key( f.read(), password=None, backend=default_backend() )
with open(PUBLIC_KEY_PATH, 'rb') as f:
    public_key = serialization.load_pem_public_key(
        f.read(),
        backend=default_backend()
    )

return private_key, public_key
PRIVATE_KEY, PUBLIC_KEY = load_keys()
def create_secure_access_token( user_id: str, roles: list[str], tenant_id: str = None, custom_claims: dict = None ) -> str: now = datetime.utcnow()
payload = {
    # Standard claims (RFC 7519)
    "iss": "https://api.yourdomain.com",  # Issuer
    "sub": user_id,                        # Subject (user ID)
    "aud": ["https://api.yourdomain.com"], # Audience
    "exp": now + timedelta(minutes=15),    # Expiration (short-lived)
    "nbf": now,                             # Not before
    "iat": now,                             # Issued at
    "jti": secrets.token_urlsafe(16),      # JWT ID (unique token identifier)

    # Custom claims
    "roles": roles,
    "type": "access",
}

# Add tenant context for multi-tenant applications
if tenant_id:
    payload["tenant_id"] = tenant_id

# Add any custom claims
if custom_claims:
    payload.update(custom_claims)

return jwt.encode(payload, PRIVATE_KEY, algorithm="RS256")
def verify_secure_token(token: str) -> dict: try: payload = jwt.decode( token, PUBLIC_KEY, algorithms=["RS256"], audience=["https://api.yourdomain.com"], issuer="https://api.yourdomain.com", options={ 'verify_exp': True, 'verify_nbf': True, 'verify_iat': True, 'verify_aud': True, 'verify_iss': True, 'require_exp': True, 'require_iat': True, 'require_nbf': True } )
    # Validate token type
    if payload.get('type') != 'access':
        raise HTTPException(status_code=401, detail="Invalid token type")

    return payload

except jwt.ExpiredSignatureError:
    raise HTTPException(status_code=401, detail="Token expired")
except jwt.InvalidTokenError as e:
    raise HTTPException(status_code=401, detail=f"Invalid token: {e}")
undefined
PRIVATE_KEY_PATH = os.environ.get('JWT_PRIVATE_KEY_PATH') PUBLIC_KEY_PATH = os.environ.get('JWT_PUBLIC_KEY_PATH')
def load_keys(): with open(PRIVATE_KEY_PATH, 'rb') as f: private_key = serialization.load_pem_private_key( f.read(), password=None, backend=default_backend() )
with open(PUBLIC_KEY_PATH, 'rb') as f:
    public_key = serialization.load_pem_public_key(
        f.read(),
        backend=default_backend()
    )

return private_key, public_key
PRIVATE_KEY, PUBLIC_KEY = load_keys()
def create_secure_access_token( user_id: str, roles: list[str], tenant_id: str = None, custom_claims: dict = None ) -> str: now = datetime.utcnow()
payload = {
    # Standard claims (RFC 7519)
    "iss": "https://api.yourdomain.com",  # Issuer
    "sub": user_id,                        # Subject (user ID)
    "aud": ["https://api.yourdomain.com"], # Audience
    "exp": now + timedelta(minutes=15),    # Expiration (short-lived)
    "nbf": now,                             # Not before
    "iat": now,                             # Issued at
    "jti": secrets.token_urlsafe(16),      # JWT ID (unique token identifier)

    # Custom claims
    "roles": roles,
    "type": "access",
}

# Add tenant context for multi-tenant applications
if tenant_id:
    payload["tenant_id"] = tenant_id

# Add any custom claims
if custom_claims:
    payload.update(custom_claims)

return jwt.encode(payload, PRIVATE_KEY, algorithm="RS256")
def verify_secure_token(token: str) -> dict: try: payload = jwt.decode( token, PUBLIC_KEY, algorithms=["RS256"], audience=["https://api.yourdomain.com"], issuer="https://api.yourdomain.com", options={ 'verify_exp': True, 'verify_nbf': True, 'verify_iat': True, 'verify_aud': True, 'verify_iss': True, 'require_exp': True, 'require_iat': True, 'require_nbf': True } )
    # Validate token type
    if payload.get('type') != 'access':
        raise HTTPException(status_code=401, detail="Invalid token type")

    return payload

except jwt.ExpiredSignatureError:
    raise HTTPException(status_code=401, detail="Token expired")
except jwt.InvalidTokenError as e:
    raise HTTPException(status_code=401, detail=f"Invalid token: {e}")
undefined

Token Revocation with Blacklist

基于黑名单的令牌吊销

python
class TokenBlacklist:
    """Redis-based token blacklist for revoked tokens"""

    def __init__(self, redis_client):
        self.redis = redis_client

    async def revoke_token(self, jti: str, exp: int):
        """Add token to blacklist until expiration"""
        ttl = exp - int(datetime.utcnow().timestamp())
        if ttl > 0:
            await self.redis.set(f"blacklist:{jti}", "1", ex=ttl)

    async def is_revoked(self, jti: str) -> bool:
        """Check if token is blacklisted"""
        return await self.redis.exists(f"blacklist:{jti}")
python
class TokenBlacklist:
    """Redis-based token blacklist for revoked tokens"""

    def __init__(self, redis_client):
        self.redis = redis_client

    async def revoke_token(self, jti: str, exp: int):
        """Add token to blacklist until expiration"""
        ttl = exp - int(datetime.utcnow().timestamp())
        if ttl > 0:
            await self.redis.set(f"blacklist:{jti}", "1", ex=ttl)

    async def is_revoked(self, jti: str) -> bool:
        """Check if token is blacklisted"""
        return await self.redis.exists(f"blacklist:{jti}")

Global blacklist instance

Global blacklist instance

token_blacklist = TokenBlacklist(redis)
async def get_current_user_with_revocation_check( token: str = Depends(oauth2_scheme) ) -> User: payload = verify_secure_token(token)
# Check if token has been revoked
if await token_blacklist.is_revoked(payload['jti']):
    raise HTTPException(status_code=401, detail="Token has been revoked")

user = await get_user(payload["sub"])
if not user:
    raise HTTPException(status_code=401, detail="User not found")

return user
@app.post("/auth/logout") async def logout(token: str = Depends(oauth2_scheme)): payload = verify_secure_token(token) await token_blacklist.revoke_token(payload['jti'], payload['exp']) return {"message": "Logged out successfully"}
undefined
token_blacklist = TokenBlacklist(redis)
async def get_current_user_with_revocation_check( token: str = Depends(oauth2_scheme) ) -> User: payload = verify_secure_token(token)
# Check if token has been revoked
if await token_blacklist.is_revoked(payload['jti']):
    raise HTTPException(status_code=401, detail="Token has been revoked")

user = await get_user(payload["sub"])
if not user:
    raise HTTPException(status_code=401, detail="User not found")

return user
@app.post("/auth/logout") async def logout(token: str = Depends(oauth2_scheme)): payload = verify_secure_token(token) await token_blacklist.revoke_token(payload['jti'], payload['exp']) return {"message": "Logged out successfully"}
undefined

Security Analysis with Extended Thinking

安全分析与深度思考

When reviewing authentication flows, use extended thinking for comprehensive security analysis.
在审查认证流程时,使用深度思考进行全面的安全分析。

Authentication Flow Security Review Template

认证流程安全审查模板

markdown
undefined
markdown
undefined

Authentication Flow Security Review

认证流程安全审查

Flow: [Login/OAuth/SSO/API Authentication] Date: [YYYY-MM-DD] Reviewer: [Name/Agent]
流程: [登录/OAuth/SSO/API认证] 日期: [YYYY-MM-DD] 审查者: [名称/Agent]

Flow Diagram

流程图

[Document the authentication flow step-by-step]
[逐步记录认证流程]

Security Analysis Checklist

安全分析检查表

Confidentiality

保密性

  • Credentials transmitted over HTTPS only
  • Passwords hashed with strong algorithm (bcrypt/argon2)
  • Tokens encrypted in transit
  • Sensitive data not logged
  • Secrets stored securely (env vars, secrets manager)
  • 凭证仅通过HTTPS传输
  • 使用强算法(bcrypt/argon2)哈希密码
  • 令牌在传输中加密
  • 敏感数据不被记录
  • 密钥安全存储(环境变量、密钥管理器)

Integrity

完整性

  • CSRF protection implemented
  • Request tampering prevented
  • Token signature validation
  • State parameter validated (OAuth)
  • Nonce validated (OIDC)
  • 实现CSRF防护
  • 防止请求篡改
  • 令牌签名验证
  • 验证OAuth的state参数
  • 验证OIDC的nonce

Availability

可用性

  • Rate limiting on auth endpoints
  • Account lockout after failed attempts
  • DDoS protection in place
  • Graceful degradation strategy
  • Session timeout configured
  • 认证端点实施速率限制
  • 失败尝试后账户锁定
  • 具备DDoS防护
  • 优雅降级策略
  • 配置会话超时

Authentication

身份验证

  • MFA available for sensitive accounts
  • Password complexity requirements
  • Credential stuffing protection
  • Brute force mitigation
  • Session fixation prevention
  • 敏感账户支持MFA
  • 密码复杂度要求
  • 凭证填充防护
  • 暴力破解缓解
  • 防止会话固定

Authorization

授权

  • Principle of least privilege
  • Role-based access control
  • Permission checks on every request
  • Token scope validation
  • Tenant isolation (multi-tenant apps)
  • 最小权限原则
  • 基于角色的访问控制
  • 每个请求都进行权限检查
  • 令牌作用域验证
  • 租户隔离(多租户应用)

Session Management

会话管理

  • Secure session tokens
  • HttpOnly, Secure, SameSite cookies
  • Session timeout implemented
  • Logout functionality
  • Concurrent session handling
  • 安全会话令牌
  • HttpOnly、Secure、SameSite Cookie
  • 实现会话超时
  • 登出功能
  • 并发会话处理

Extended Thinking Analysis

深度思考分析

Use Claude with extended thinking for deep security review:
python
import anthropic

client = anthropic.Anthropic()

security_review_prompt = """
Perform a comprehensive security analysis of this authentication flow:

[Paste authentication code/flow description]

Analyze for:
1. OWASP Top 10 vulnerabilities
2. Authentication bypass possibilities
3. Token security weaknesses
4. Session management issues
5. Input validation gaps
6. Race conditions
7. Logic flaws

Provide specific findings with:
- Severity (Critical/High/Medium/Low)
- Location (file:line)
- Attack vector
- Remediation steps
"""

response = client.messages.create(
    model="claude-opus-4-5-20250514",
    max_tokens=32000,
    thinking={
        "type": "enabled",
        "budget_tokens": 20000  # High budget for security analysis
    },
    messages=[{
        "role": "user",
        "content": security_review_prompt
    }]
)
使用Claude的深度思考功能进行深度安全审查:
python
import anthropic

client = anthropic.Anthropic()

security_review_prompt = """
Perform a comprehensive security analysis of this authentication flow:

[Paste authentication code/flow description]

Analyze for:
1. OWASP Top 10 vulnerabilities
2. Authentication bypass possibilities
3. Token security weaknesses
4. Session management issues
5. Input validation gaps
6. Race conditions
7. Logic flaws

Provide specific findings with:
- Severity (Critical/High/Medium/Low)
- Location (file:line)
- Attack vector
- Remediation steps
"""

response = client.messages.create(
    model="claude-opus-4-5-20250514",
    max_tokens=32000,
    thinking={
        "type": "enabled",
        "budget_tokens": 20000  # High budget for security analysis
    },
    messages=[{
        "role": "user",
        "content": security_review_prompt
    }]
)

Extract thinking and analysis

Extract thinking and analysis

for block in response.content: if block.type == "thinking": print(f"Deep Analysis:\n{block.thinking}\n") elif block.type == "text": print(f"Findings:\n{block.text}")
undefined
for block in response.content: if block.type == "thinking": print(f"Deep Analysis:\n{block.thinking}\n") elif block.type == "text": print(f"Findings:\n{block.text}")
undefined

Threat Modeling for Authentication

认证系统威胁建模

STRIDE Threat Model for Auth Systems

认证系统的STRIDE威胁模型

Adapted from [[deep-analysis]] skill threat modeling template:
markdown
undefined
改编自[[deep-analysis]] Skill的威胁建模模板:
markdown
undefined

Authentication Threat Model

认证威胁模型

System: [Auth System Name]

系统: [认证系统名称]

Version: 1.0 Last Updated: [Date]
版本: 1.0 最后更新: [日期]

Trust Boundaries

信任边界

┌─────────────────────────────────────────┐
│         External (Untrusted)            │
│    [End Users] [Credential Stuffers]    │
│         [MITM Attackers]                │
└──────────────────┬──────────────────────┘
                   │ TLS/HTTPS
┌──────────────────┴──────────────────────┐
│         Public API (Semi-trusted)       │
│   [API Gateway] [Auth Endpoints]        │
│   [OAuth Providers] [OIDC IdP]          │
└──────────────────┬──────────────────────┘
                   │ Internal Auth
┌──────────────────┴──────────────────────┐
│        Application Layer (Trusted)      │
│   [Business Logic] [User Management]    │
└──────────────────┬──────────────────────┘
                   │ DB Protocol
┌──────────────────┴──────────────────────┐
│        Data Layer (Highly Trusted)      │
│   [User DB] [Session Store] [Secrets]   │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│         External (Untrusted)            │
│    [End Users] [Credential Stuffers]    │
│         [MITM Attackers]                │
└──────────────────┬──────────────────────┘
                   │ TLS/HTTPS
┌──────────────────┴──────────────────────┐
│         Public API (Semi-trusted)       │
│   [API Gateway] [Auth Endpoints]        │
│   [OAuth Providers] [OIDC IdP]          │
└──────────────────┬──────────────────────┘
                   │ Internal Auth
┌──────────────────┴──────────────────────┐
│        Application Layer (Trusted)      │
│   [Business Logic] [User Management]    │
└──────────────────┬──────────────────────┘
                   │ DB Protocol
┌──────────────────┴──────────────────────┐
│        Data Layer (Highly Trusted)      │
│   [User DB] [Session Store] [Secrets]   │
└─────────────────────────────────────────┘

STRIDE Analysis

STRIDE分析

Spoofing Identity

身份冒充

ThreatLikelihoodImpactMitigationStatus
Credential theft via phishingHighCriticalMFA, user education
Session hijackingMediumHighSecure cookies, HTTPS
Token replay attacksMediumHighShort token lifetime, JTI tracking
OAuth state manipulationLowMediumCryptographic state validation
Impersonation via stolen refresh tokenMediumCriticalRefresh token rotation, device binding⚠️
威胁可能性影响缓解措施状态
钓鱼导致凭证被盗严重MFA、用户教育
会话劫持安全Cookie、HTTPS
令牌重放攻击短令牌生命周期、JTI跟踪
OAuth state参数篡改加密state验证
窃取刷新令牌进行冒充严重刷新令牌轮换、设备绑定⚠️

Tampering with Data

数据篡改

ThreatLikelihoodImpactMitigationStatus
JWT payload manipulationLowCriticalSignature verification, RS256
Cookie tamperingLowHighSigned cookies, HMAC validation
OAuth callback manipulationMediumHighRedirect URI validation
Password reset token tamperingLowHighCryptographic tokens, time limits
威胁可能性影响缓解措施状态
JWT负载篡改严重签名验证、RS256
Cookie篡改签名Cookie、HMAC验证
OAuth回调篡改重定向URI验证
密码重置令牌篡改加密令牌、时间限制

Repudiation

抵赖

ThreatLikelihoodImpactMitigationStatus
Deny unauthorized accessMediumMediumComprehensive audit logging
Dispute authentication eventsLowLowImmutable audit trail, timestamps
威胁可能性影响缓解措施状态
否认未授权访问全面审计日志
争议认证事件不可变审计轨迹、时间戳

Information Disclosure

信息泄露

ThreatLikelihoodImpactMitigationStatus
Credentials in logsMediumCriticalSanitize logs, secret detection
User enumeration via loginHighMediumGeneric error messages⚠️
Token leakage in URLsLowHighTokens in headers only
Timing attacks on password checkMediumMediumConstant-time comparison
JWKS endpoint information leakLowLowRate limiting, monitoring
威胁可能性影响缓解措施状态
凭证出现在日志中严重日志脱敏、密钥检测
登录接口用户枚举通用错误信息⚠️
令牌在URL中泄露仅在Header中传递令牌
密码检查时序攻击恒定时间比较
JWKS端点信息泄露速率限制、监控

Denial of Service

拒绝服务

ThreatLikelihoodImpactMitigationStatus
Brute force attacksHighHighRate limiting, CAPTCHA, account lockout
Resource exhaustion (bcrypt)MediumMediumRequest throttling, async processing
Session store exhaustionLowHighSession limits per user, TTL
OAuth callback floodingMediumMediumState validation, rate limiting⚠️
威胁可能性影响缓解措施状态
暴力破解攻击速率限制、CAPTCHA账户锁定
资源耗尽(bcrypt)请求限流、异步处理
会话存储耗尽每用户会话限制、TTL
OAuth回调洪水攻击State验证、速率限制⚠️

Elevation of Privilege

权限提升

ThreatLikelihoodImpactMitigationStatus
Role manipulation in JWTLowCriticalServer-side role verification
Privilege escalation via APIMediumCriticalPermission checks on every request
Admin impersonationLowCriticalAdditional auth for admin actions⚠️
OAuth scope escalationLowHighStrict scope validation
威胁可能性影响缓解措施状态
JWT中角色篡改严重服务器端角色验证
接口权限提升严重每个请求都检查权限
管理员冒充严重管理员操作额外认证⚠️
OAuth作用域提升严格作用域验证

Risk Matrix

风险矩阵

Threat IDThreatLikelihoodImpactRisk ScorePriority
T1Credential stuffing attackHighCritical9P0
T2Refresh token theftMediumCritical8P1
T3User enumerationHighMedium6P2
T4OAuth callback floodingMediumMedium4P2
T5Admin impersonationLowCritical7P1
威胁ID威胁可能性影响风险评分优先级
T1凭证填充攻击严重9P0
T2刷新令牌窃取严重8P1
T3用户枚举6P2
T4OAuth回调洪水4P2
T5管理员冒充严重7P1

Attack Scenarios

攻击场景

Scenario 1: Credential Stuffing Attack

场景1:凭证填充攻击

Attacker Goal: Gain unauthorized access using leaked credentials
Attack Steps:
  1. Obtain credential database from breach
  2. Automate login attempts across accounts
  3. Bypass rate limiting with distributed IPs
  4. Identify valid credentials
  5. Access user accounts
Defenses:
  • Rate limiting per IP and per account
  • CAPTCHA after N failed attempts
  • Anomaly detection (impossible travel, new device)
  • Breach database monitoring (HaveIBeenPwned)
  • Mandatory MFA for sensitive accounts
攻击者目标: 使用泄露的凭证获取未授权访问
攻击步骤:
  1. 从数据泄露中获取凭证数据库
  2. 自动化尝试登录多个账户
  3. 使用分布式IP绕过速率限制
  4. 识别有效凭证
  5. 访问用户账户
防御措施:
  • 基于IP和账户的速率限制
  • N次失败尝试后启用CAPTCHA
  • 异常检测(不可能的旅行、新设备)
  • 泄露数据库监控(HaveIBeenPwned)
  • 敏感账户强制启用MFA

Scenario 2: Token Theft via XSS

场景2:XSS令牌窃取

Attacker Goal: Steal access token to impersonate user
Attack Steps:
  1. Inject malicious script via vulnerable input
  2. Script reads token from localStorage
  3. Exfiltrate token to attacker server
  4. Use token to access API as victim
Defenses:
  • Store tokens in HttpOnly cookies (not accessible to JS)
  • Content Security Policy (CSP)
  • Input sanitization and validation
  • Regular security audits
  • Short token lifetimes
攻击者目标: 窃取访问令牌冒充用户
攻击步骤:
  1. 通过漏洞输入注入恶意脚本
  2. 脚本从localStorage读取令牌
  3. 将令牌泄露到攻击者服务器
  4. 使用令牌以受害者身份访问API
防御措施:
  • 将令牌存储在HttpOnly Cookie中(JS无法访问)
  • 内容安全策略(CSP)
  • 输入 sanitization 与验证
  • 定期安全审计
  • 短令牌生命周期

Recommendations by Priority

按优先级排序的建议

P0 (Critical - Immediate Action)

P0(严重 - 立即处理)

  1. Implement credential stuffing protection
  2. Add device fingerprinting for anomaly detection
  3. Enable MFA for all admin accounts
  1. 实现凭证填充防护
  2. 添加设备指纹用于异常检测
  3. 所有管理员账户启用MFA

P1 (High - Within Sprint)

P1(高 - 迭代内处理)

  1. Implement refresh token rotation
  2. Add additional auth step for admin impersonation
  3. Strengthen OAuth callback validation
  1. 实现刷新令牌轮换
  2. 管理员操作添加额外认证步骤
  3. 强化OAuth回调验证

P2 (Medium - Next Quarter)

P2(中 - 下一季度处理)

  1. Improve user enumeration protection
  2. Implement risk-based authentication
  3. Add behavioral biometrics
undefined
  1. 提升用户枚举防护
  2. 实现基于风险的认证
  3. 添加行为生物识别
undefined

Common Vulnerabilities & Mitigations

常见漏洞与缓解措施

OWASP Top 10 for Authentication

认证相关的OWASP Top 10

A01: Broken Access Control

A01:访问控制失效

python
undefined
python
undefined

VULNERABLE: Client-side role check only

存在漏洞:仅客户端侧角色检查

@app.get("/admin/users") async def get_users(user: User = Depends(get_current_user)): # No server-side permission check! return await db.users.find_all()
@app.get("/admin/users") async def get_users(user: User = Depends(get_current_user)): # 无服务器端权限检查! return await db.users.find_all()

SECURE: Server-side permission enforcement

安全:服务器端权限强制执行

@app.get("/admin/users") async def get_users(user: User = Depends(require_permission(Permission.ADMIN))): # Permission verified on server return await db.users.find_all()
undefined
@app.get("/admin/users") async def get_users(user: User = Depends(require_permission(Permission.ADMIN))): # 服务器端验证权限 return await db.users.find_all()
undefined

A02: Cryptographic Failures

A02:加密失败

python
undefined
python
undefined

VULNERABLE: Weak hashing

存在漏洞:弱哈希

hashed = hashlib.md5(password.encode()).hexdigest()
hashed = hashlib.md5(password.encode()).hexdigest()

SECURE: Strong adaptive hashing

安全:强自适应哈希

from passlib.context import CryptContext pwd_context = CryptContext( schemes=["bcrypt"], deprecated="auto", bcrypt__rounds=12 # Adjust based on security/performance needs ) hashed = pwd_context.hash(password)
undefined
from passlib.context import CryptContext pwd_context = CryptContext( schemes=["bcrypt"], deprecated="auto", bcrypt__rounds=12 # 根据安全/性能需求调整 ) hashed = pwd_context.hash(password)
undefined

A03: Injection

A03:注入

python
undefined
python
undefined

VULNERABLE: SQL injection in auth query

存在漏洞:认证查询SQL注入

query = f"SELECT * FROM users WHERE email = '{email}' AND password = '{password}'"
query = f"SELECT * FROM users WHERE email = '{email}' AND password = '{password}'"

SECURE: Parameterized queries

安全:参数化查询

query = "SELECT * FROM users WHERE email = $1" user = await db.fetch_one(query, email) if user and pwd_context.verify(password, user.hashed_password): return user
undefined
query = "SELECT * FROM users WHERE email = $1" user = await db.fetch_one(query, email) if user and pwd_context.verify(password, user.hashed_password): return user
undefined

A07: Identification and Authentication Failures

A07:身份验证与识别失败

python
undefined
python
undefined

VULNERABLE: Weak session management

存在漏洞:弱会话管理

session_id = hashlib.md5(str(time.time()).encode()).hexdigest()
session_id = hashlib.md5(str(time.time()).encode()).hexdigest()

SECURE: Cryptographically secure session tokens

安全:加密安全的会话令牌

import secrets session_id = secrets.token_urlsafe(32)
import secrets session_id = secrets.token_urlsafe(32)

VULNERABLE: No rate limiting

存在漏洞:无速率限制

@app.post("/auth/login") async def login(credentials: LoginRequest): return await authenticate(credentials)
@app.post("/auth/login") async def login(credentials: LoginRequest): return await authenticate(credentials)

SECURE: Rate limiting with slowdown

安全:带减速的速率限制

from slowapi import Limiter, _rate_limit_exceeded_handler from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
@app.post("/auth/login") @limiter.limit("5/minute") # 5 attempts per minute async def login(request: Request, credentials: LoginRequest): return await authenticate(credentials)
undefined
from slowapi import Limiter, _rate_limit_exceeded_handler from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
@app.post("/auth/login") @limiter.limit("5/minute") # 每分钟5次尝试 async def login(request: Request, credentials: LoginRequest): return await authenticate(credentials)
undefined

Integration with Keycloak

与Keycloak集成

For enterprise-grade auth, integrate with Keycloak (see [[keycloak]] skill):
python
from keycloak import KeycloakOpenID
对于企业级认证,可与Keycloak集成(参见[[keycloak]] Skill):
python
from keycloak import KeycloakOpenID

Configure Keycloak client

配置Keycloak客户端

keycloak_openid = KeycloakOpenID( server_url="http://localhost:8080/", client_id="your-client", realm_name="your-realm", client_secret_key="your-secret" )
keycloak_openid = KeycloakOpenID( server_url="http://localhost:8080/", client_id="your-client", realm_name="your-realm", client_secret_key="your-secret" )

Get token

获取令牌

token = keycloak_openid.token(username, password)
token = keycloak_openid.token(username, password)

Validate token

验证令牌

token_info = keycloak_openid.introspect(token['access_token'])
token_info = keycloak_openid.introspect(token['access_token'])

Get user info

获取用户信息

user_info = keycloak_openid.userinfo(token['access_token'])
user_info = keycloak_openid.userinfo(token['access_token'])

Decode and verify token locally

本地解码并验证令牌

KEYCLOAK_PUBLIC_KEY = "-----BEGIN PUBLIC KEY-----\n" +
keycloak_openid.public_key() +
"\n-----END PUBLIC KEY-----"
options = {"verify_signature": True, "verify_aud": True, "verify_exp": True} token_info = keycloak_openid.decode_token( token['access_token'], key=KEYCLOAK_PUBLIC_KEY, options=options )
undefined
KEYCLOAK_PUBLIC_KEY = "-----BEGIN PUBLIC KEY-----\n" +
keycloak_openid.public_key() +
"\n-----END PUBLIC KEY-----"
options = {"verify_signature": True, "verify_aud": True, "verify_exp": True} token_info = keycloak_openid.decode_token( token['access_token'], key=KEYCLOAK_PUBLIC_KEY, options=options )
undefined

Related Skills & Resources

相关Skill与资源

Skills

Skills

  • [[keycloak]] - Enterprise identity and access management
  • [[deep-analysis]] - Security audit templates and threat modeling
  • [[extended-thinking]] - Enable deep reasoning for security analysis
  • [[complex-reasoning]] - Hypothesis-driven debugging for auth issues
  • [[keycloak]] - 企业级身份与访问管理
  • [[deep-analysis]] - 安全审计模板与威胁建模
  • [[extended-thinking]] - 启用深度推理进行安全分析
  • [[complex-reasoning]] - 基于假设的认证问题调试

Keycloak Agents

Keycloak Agents

  • keycloak-realm-admin - Realm and client management
  • keycloak-security-auditor - Security review and compliance
  • keycloak-auth-flow-designer - Custom authentication flows
  • keycloak-identity-specialist - Federation and SSO setup
  • keycloak-realm-admin - 领域与客户端管理
  • keycloak-security-auditor - 安全审查与合规
  • keycloak-auth-flow-designer - 自定义认证流程
  • keycloak-identity-specialist - 联邦与SSO设置

External Resources

外部资源

Troubleshooting

故障排除

Common Issues

常见问题

Token Verification Failures

令牌验证失败

bash
undefined
bash
undefined

Debug JWT token

调试JWT令牌

python -c "import jwt; print(jwt.decode('YOUR_TOKEN', options={'verify_signature': False}))"
python -c "import jwt; print(jwt.decode('YOUR_TOKEN', options={'verify_signature': False}))"

Verify token signature

验证令牌签名

openssl dgst -sha256 -verify public_key.pem -signature signature.bin token_payload.txt
openssl dgst -sha256 -verify public_key.pem -signature signature.bin token_payload.txt

Check token expiration

检查令牌过期时间

date -d @$(python -c "import jwt; print(jwt.decode('YOUR_TOKEN', options={'verify_signature': False})['exp'])")
undefined
date -d @$(python -c "import jwt; print(jwt.decode('YOUR_TOKEN', options={'verify_signature': False})['exp'])")
undefined

OAuth Flow Issues

OAuth流程问题

python
undefined
python
undefined

Enable debug logging

启用调试日志

import logging logging.basicConfig(level=logging.DEBUG)
import logging logging.basicConfig(level=logging.DEBUG)

Log OAuth flow steps

记录OAuth流程步骤

logger.debug(f"Redirect URI: {redirect_uri}") logger.debug(f"State: {state}") logger.debug(f"Code: {code}") logger.debug(f"Token response: {token_response}")
undefined
logger.debug(f"Redirect URI: {redirect_uri}") logger.debug(f"State: {state}") logger.debug(f"Code: {code}") logger.debug(f"Token response: {token_response}")
undefined

Session Issues

会话问题

bash
undefined
bash
undefined

Check Redis session data

检查Redis会话数据

redis-cli
KEYS session:* HGETALL session:abc123 TTL session:abc123

---

_Last Updated: 2025-12-12_
_Version: 2.0.0_
_Enhanced with security analysis, threat modeling, and OIDC/OAuth 2.0 best practices_
redis-cli
KEYS session:* HGETALL session:abc123 TTL session:abc123

---

_最后更新: 2025-12-12_
_版本: 2.0.0_
_新增安全分析、威胁建模及OIDC/OAuth 2.0最佳实践_