authentication
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseAuthentication Skill
身份验证Skill
Provides comprehensive authentication and authorization capabilities for the Golden Armada AI Agent Fleet Platform.
为Golden Armada AI Agent Fleet Platform提供全面的身份验证与授权能力。
When to Use This Skill
何时使用此Skill
Activate this skill when working with:
- User authentication flows
- JWT token management
- OAuth2 integration
- Session management
- Role-based access control (RBAC)
在处理以下场景时启用此Skill:
- 用户身份验证流程
- JWT令牌管理
- OAuth2集成
- 会话管理
- 基于角色的访问控制(RBAC)
JWT Authentication
JWT身份验证
Token Generation
令牌生成
```python
from jose import jwt
from datetime import datetime, timedelta
from passlib.context import CryptContext
SECRET_KEY = os.environ["JWT_SECRET"]
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
REFRESH_TOKEN_EXPIRE_DAYS = 7
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def create_access_token(user_id: str, roles: list[str]) -> str:
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
payload = {
"sub": user_id,
"roles": roles,
"exp": expire,
"iat": datetime.utcnow(),
"type": "access"
}
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
def create_refresh_token(user_id: str) -> str:
expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
payload = {
"sub": user_id,
"exp": expire,
"type": "refresh"
}
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
def verify_token(token: str) -> dict:
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token expired")
except jwt.JWTError:
raise HTTPException(status_code=401, detail="Invalid token")
```
python
from jose import jwt
from datetime import datetime, timedelta
from passlib.context import CryptContext
SECRET_KEY = os.environ["JWT_SECRET"]
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
REFRESH_TOKEN_EXPIRE_DAYS = 7
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def create_access_token(user_id: str, roles: list[str]) -> str:
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
payload = {
"sub": user_id,
"roles": roles,
"exp": expire,
"iat": datetime.utcnow(),
"type": "access"
}
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
def create_refresh_token(user_id: str) -> str:
expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
payload = {
"sub": user_id,
"exp": expire,
"type": "refresh"
}
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
def verify_token(token: str) -> dict:
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token expired")
except jwt.JWTError:
raise HTTPException(status_code=401, detail="Invalid token")Password Hashing
密码哈希
```python
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
python
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)Usage
Usage
async def authenticate_user(email: str, password: str) -> User | None:
user = await get_user_by_email(email)
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user
```
async def authenticate_user(email: str, password: str) -> User | None:
user = await get_user_by_email(email)
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user
undefinedFastAPI Auth Dependencies
FastAPI认证依赖
```python
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/token")
async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
payload = verify_token(token)
user = await get_user(payload["sub"])
if not user:
raise HTTPException(status_code=401, detail="User not found")
return user
async def get_current_active_user(user: User = Depends(get_current_user)) -> User:
if not user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
return user
python
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/token")
async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
payload = verify_token(token)
user = await get_user(payload["sub"])
if not user:
raise HTTPException(status_code=401, detail="User not found")
return user
async def get_current_active_user(user: User = Depends(get_current_user)) -> User:
if not user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
return userRole-based access
Role-based access
def require_roles(*roles: str):
async def role_checker(user: User = Depends(get_current_user)):
if not any(role in user.roles for role in roles):
raise HTTPException(status_code=403, detail="Insufficient permissions")
return user
return role_checker
def require_roles(*roles: str):
async def role_checker(user: User = Depends(get_current_user)):
if not any(role in user.roles for role in roles):
raise HTTPException(status_code=403, detail="Insufficient permissions")
return user
return role_checker
Usage
Usage
@app.get("/admin")
async def admin_route(user: User = Depends(require_roles("admin"))):
return {"message": "Admin access granted"}
```
@app.get("/admin")
async def admin_route(user: User = Depends(require_roles("admin"))):
return {"message": "Admin access granted"}
undefinedOAuth2 Integration
OAuth2集成
Google OAuth2
Google OAuth2
```python
from authlib.integrations.starlette_client import OAuth
oauth = OAuth()
oauth.register(
name='google',
client_id=os.environ['GOOGLE_CLIENT_ID'],
client_secret=os.environ['GOOGLE_CLIENT_SECRET'],
server_metadata_url='https://accounts.google.com/.well-known/openid-configuration',
client_kwargs={'scope': 'openid email profile'}
)
@app.get('/auth/google')
async def google_login(request: Request):
redirect_uri = request.url_for('google_callback')
return await oauth.google.authorize_redirect(request, redirect_uri)
@app.get('/auth/google/callback')
async def google_callback(request: Request):
token = await oauth.google.authorize_access_token(request)
user_info = token.get('userinfo')
# Find or create user
user = await get_or_create_user(
email=user_info['email'],
name=user_info['name'],
provider='google'
)
# Generate JWT
access_token = create_access_token(user.id, user.roles)
return {"access_token": access_token, "token_type": "bearer"}```
python
from authlib.integrations.starlette_client import OAuth
oauth = OAuth()
oauth.register(
name='google',
client_id=os.environ['GOOGLE_CLIENT_ID'],
client_secret=os.environ['GOOGLE_CLIENT_SECRET'],
server_metadata_url='https://accounts.google.com/.well-known/openid-configuration',
client_kwargs={'scope': 'openid email profile'}
)
@app.get('/auth/google')
async def google_login(request: Request):
redirect_uri = request.url_for('google_callback')
return await oauth.google.authorize_redirect(request, redirect_uri)
@app.get('/auth/google/callback')
async def google_callback(request: Request):
token = await oauth.google.authorize_access_token(request)
user_info = token.get('userinfo')
# Find or create user
user = await get_or_create_user(
email=user_info['email'],
name=user_info['name'],
provider='google'
)
# Generate JWT
access_token = create_access_token(user.id, user.roles)
return {"access_token": access_token, "token_type": "bearer"}GitHub OAuth2
GitHub OAuth2
```python
oauth.register(
name='github',
client_id=os.environ['GITHUB_CLIENT_ID'],
client_secret=os.environ['GITHUB_CLIENT_SECRET'],
authorize_url='https://github.com/login/oauth/authorize',
access_token_url='https://github.com/login/oauth/access_token',
api_base_url='https://api.github.com/',
client_kwargs={'scope': 'user:email'}
)
```
python
oauth.register(
name='github',
client_id=os.environ['GITHUB_CLIENT_ID'],
client_secret=os.environ['GITHUB_CLIENT_SECRET'],
authorize_url='https://github.com/login/oauth/authorize',
access_token_url='https://github.com/login/oauth/access_token',
api_base_url='https://api.github.com/',
client_kwargs={'scope': 'user:email'}
)Session Management
会话管理
```python
from fastapi import Request, Response
import secrets
async def create_session(user_id: str, response: Response) -> str:
session_id = secrets.token_urlsafe(32)
# Store in Redis
await redis.hset(f"session:{session_id}", mapping={
"user_id": user_id,
"created_at": datetime.utcnow().isoformat()
})
await redis.expire(f"session:{session_id}", 86400) # 24 hours
# Set cookie
response.set_cookie(
key="session_id",
value=session_id,
httponly=True,
secure=True,
samesite="lax",
max_age=86400
)
return session_idasync def get_session(request: Request) -> dict | None:
session_id = request.cookies.get("session_id")
if not session_id:
return None
session = await redis.hgetall(f"session:{session_id}")
if not session:
return None
# Refresh TTL
await redis.expire(f"session:{session_id}", 86400)
return sessionasync def destroy_session(request: Request, response: Response):
session_id = request.cookies.get("session_id")
if session_id:
await redis.delete(f"session:{session_id}")
response.delete_cookie("session_id")
```
python
from fastapi import Request, Response
import secrets
async def create_session(user_id: str, response: Response) -> str:
session_id = secrets.token_urlsafe(32)
# Store in Redis
await redis.hset(f"session:{session_id}", mapping={
"user_id": user_id,
"created_at": datetime.utcnow().isoformat()
})
await redis.expire(f"session:{session_id}", 86400) # 24 hours
# Set cookie
response.set_cookie(
key="session_id",
value=session_id,
httponly=True,
secure=True,
samesite="lax",
max_age=86400
)
return session_id
async def get_session(request: Request) -> dict | None:
session_id = request.cookies.get("session_id")
if not session_id:
return None
session = await redis.hgetall(f"session:{session_id}")
if not session:
return None
# Refresh TTL
await redis.expire(f"session:{session_id}", 86400)
return session
async def destroy_session(request: Request, response: Response):
session_id = request.cookies.get("session_id")
if session_id:
await redis.delete(f"session:{session_id}")
response.delete_cookie("session_id")RBAC Implementation
RBAC实现
```python
from enum import Enum
from typing import Set
class Permission(str, Enum):
READ_AGENTS = "read:agents"
WRITE_AGENTS = "write:agents"
DELETE_AGENTS = "delete:agents"
ADMIN = "admin"
ROLE_PERMISSIONS: dict[str, Set[Permission]] = {
"viewer": {Permission.READ_AGENTS},
"operator": {Permission.READ_AGENTS, Permission.WRITE_AGENTS},
"admin": {Permission.READ_AGENTS, Permission.WRITE_AGENTS, Permission.DELETE_AGENTS, Permission.ADMIN},
}
def has_permission(user_roles: list[str], required: Permission) -> bool:
for role in user_roles:
if role in ROLE_PERMISSIONS and required in ROLE_PERMISSIONS[role]:
return True
return False
def require_permission(permission: Permission):
async def permission_checker(user: User = Depends(get_current_user)):
if not has_permission(user.roles, permission):
raise HTTPException(status_code=403, detail="Permission denied")
return user
return permission_checker
python
from enum import Enum
from typing import Set
class Permission(str, Enum):
READ_AGENTS = "read:agents"
WRITE_AGENTS = "write:agents"
DELETE_AGENTS = "delete:agents"
ADMIN = "admin"
ROLE_PERMISSIONS: dict[str, Set[Permission]] = {
"viewer": {Permission.READ_AGENTS},
"operator": {Permission.READ_AGENTS, Permission.WRITE_AGENTS},
"admin": {Permission.READ_AGENTS, Permission.WRITE_AGENTS, Permission.DELETE_AGENTS, Permission.ADMIN},
}
def has_permission(user_roles: list[str], required: Permission) -> bool:
for role in user_roles:
if role in ROLE_PERMISSIONS and required in ROLE_PERMISSIONS[role]:
return True
return False
def require_permission(permission: Permission):
async def permission_checker(user: User = Depends(get_current_user)):
if not has_permission(user.roles, permission):
raise HTTPException(status_code=403, detail="Permission denied")
return user
return permission_checkerUsage
Usage
@app.delete("/agents/{id}")
async def delete_agent(
id: str,
user: User = Depends(require_permission(Permission.DELETE_AGENTS))
):
await agent_service.delete(id)
return {"status": "deleted"}
```
@app.delete("/agents/{id}")
async def delete_agent(
id: str,
user: User = Depends(require_permission(Permission.DELETE_AGENTS))
):
await agent_service.delete(id)
return {"status": "deleted"}
undefinedSecurity Best Practices
安全最佳实践
- Use HTTPS always in production
- Hash passwords with bcrypt or argon2
- Short-lived access tokens (15-30 minutes)
- Refresh token rotation on each use
- HttpOnly, Secure cookies for tokens
- Rate limit authentication endpoints
- Log authentication events for auditing
- Implement account lockout after failed attempts
- 始终在生产环境使用HTTPS
- 使用bcrypt或argon2哈希密码
- 短生命周期访问令牌(15-30分钟)
- 每次使用时轮换刷新令牌
- 为令牌使用HttpOnly、Secure Cookie
- 对认证端点进行速率限制
- 记录认证事件用于审计
- 失败尝试后实现账户锁定
OAuth 2.0 & OIDC Best Practices
OAuth 2.0 & OIDC最佳实践
OAuth 2.0 Grant Types
OAuth 2.0授权类型
Authorization Code Flow (Recommended for Web Apps)
授权码流程(Web应用推荐)
python
from authlib.integrations.starlette_client import OAuth
oauth = OAuth()
oauth.register(
name='custom_provider',
client_id=os.environ['OAUTH_CLIENT_ID'],
client_secret=os.environ['OAUTH_CLIENT_SECRET'],
authorize_url='https://provider.com/oauth/authorize',
authorize_params=None,
access_token_url='https://provider.com/oauth/token',
access_token_params=None,
refresh_token_url=None,
client_kwargs={'scope': 'openid profile email'}
)
@app.get('/auth/login')
async def login(request: Request):
# Generate state for CSRF protection
state = secrets.token_urlsafe(32)
await redis.set(f"oauth_state:{state}", "1", ex=600)
redirect_uri = request.url_for('auth_callback')
return await oauth.custom_provider.authorize_redirect(
request,
redirect_uri,
state=state
)
@app.get('/auth/callback')
async def auth_callback(request: Request):
# Verify state (CSRF protection)
state = request.query_params.get('state')
if not await redis.get(f"oauth_state:{state}"):
raise HTTPException(status_code=400, detail="Invalid state")
await redis.delete(f"oauth_state:{state}")
# Exchange authorization code for tokens
token = await oauth.custom_provider.authorize_access_token(request)
user_info = token.get('userinfo')
# Create or update user
user = await upsert_user(user_info)
# Issue application tokens
access_token = create_access_token(user.id, user.roles)
refresh_token = create_refresh_token(user.id)
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer"
}python
from authlib.integrations.starlette_client import OAuth
oauth = OAuth()
oauth.register(
name='custom_provider',
client_id=os.environ['OAUTH_CLIENT_ID'],
client_secret=os.environ['OAUTH_CLIENT_SECRET'],
authorize_url='https://provider.com/oauth/authorize',
authorize_params=None,
access_token_url='https://provider.com/oauth/token',
access_token_params=None,
refresh_token_url=None,
client_kwargs={'scope': 'openid profile email'}
)
@app.get('/auth/login')
async def login(request: Request):
# Generate state for CSRF protection
state = secrets.token_urlsafe(32)
await redis.set(f"oauth_state:{state}", "1", ex=600)
redirect_uri = request.url_for('auth_callback')
return await oauth.custom_provider.authorize_redirect(
request,
redirect_uri,
state=state
)
@app.get('/auth/callback')
async def auth_callback(request: Request):
# Verify state (CSRF protection)
state = request.query_params.get('state')
if not await redis.get(f"oauth_state:{state}"):
raise HTTPException(status_code=400, detail="Invalid state")
await redis.delete(f"oauth_state:{state}")
# Exchange authorization code for tokens
token = await oauth.custom_provider.authorize_access_token(request)
user_info = token.get('userinfo')
# Create or update user
user = await upsert_user(user_info)
# Issue application tokens
access_token = create_access_token(user.id, user.roles)
refresh_token = create_refresh_token(user.id)
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer"
}PKCE (Proof Key for Code Exchange) for SPAs
PKCE(授权码流程扩展,适用于SPA)
python
undefinedpython
undefinedFrontend (JavaScript/TypeScript)
Frontend (JavaScript/TypeScript)
import { generateCodeVerifier, generateCodeChallenge } from 'oauth-pkce'
// Generate PKCE parameters
const codeVerifier = generateCodeVerifier()
const codeChallenge = await generateCodeChallenge(codeVerifier)
// Store verifier for later use
sessionStorage.setItem('code_verifier', codeVerifier)
// Redirect to authorization endpoint
const authUrl = new URL('https://provider.com/oauth/authorize')
authUrl.searchParams.set('client_id', CLIENT_ID)
authUrl.searchParams.set('redirect_uri', REDIRECT_URI)
authUrl.searchParams.set('response_type', 'code')
authUrl.searchParams.set('scope', 'openid profile email')
authUrl.searchParams.set('code_challenge', codeChallenge)
authUrl.searchParams.set('code_challenge_method', 'S256')
authUrl.searchParams.set('state', generateState())
window.location.href = authUrl.toString()
// In callback handler
const code = new URLSearchParams(window.location.search).get('code')
const codeVerifier = sessionStorage.getItem('code_verifier')
const tokenResponse = await fetch('https://provider.com/oauth/token', {
method: 'POST',
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
body: new URLSearchParams({
grant_type: 'authorization_code',
code: code,
redirect_uri: REDIRECT_URI,
client_id: CLIENT_ID,
code_verifier: codeVerifier
})
})
undefinedimport { generateCodeVerifier, generateCodeChallenge } from 'oauth-pkce'
// Generate PKCE parameters
const codeVerifier = generateCodeVerifier()
const codeChallenge = await generateCodeChallenge(codeVerifier)
// Store verifier for later use
sessionStorage.setItem('code_verifier', codeVerifier)
// Redirect to authorization endpoint
const authUrl = new URL('https://provider.com/oauth/authorize')
authUrl.searchParams.set('client_id', CLIENT_ID)
authUrl.searchParams.set('redirect_uri', REDIRECT_URI)
authUrl.searchParams.set('response_type', 'code')
authUrl.searchParams.set('scope', 'openid profile email')
authUrl.searchParams.set('code_challenge', codeChallenge)
authUrl.searchParams.set('code_challenge_method', 'S256')
authUrl.searchParams.set('state', generateState())
window.location.href = authUrl.toString()
// In callback handler
const code = new URLSearchParams(window.location.search).get('code')
const codeVerifier = sessionStorage.getItem('code_verifier')
const tokenResponse = await fetch('https://provider.com/oauth/token', {
method: 'POST',
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
body: new URLSearchParams({
grant_type: 'authorization_code',
code: code,
redirect_uri: REDIRECT_URI,
client_id: CLIENT_ID,
code_verifier: codeVerifier
})
})
undefinedClient Credentials Flow (Service-to-Service)
客户端凭证流程(服务间通信)
python
import httpx
from datetime import datetime, timedelta
class ServiceAuthClient:
def __init__(self):
self.token = None
self.expires_at = None
async def get_token(self) -> str:
# Return cached token if still valid
if self.token and self.expires_at > datetime.utcnow():
return self.token
# Request new token
async with httpx.AsyncClient() as client:
response = await client.post(
'https://provider.com/oauth/token',
data={
'grant_type': 'client_credentials',
'client_id': os.environ['SERVICE_CLIENT_ID'],
'client_secret': os.environ['SERVICE_CLIENT_SECRET'],
'scope': 'api.read api.write'
}
)
response.raise_for_status()
data = response.json()
self.token = data['access_token']
self.expires_at = datetime.utcnow() + timedelta(seconds=data['expires_in'] - 60)
return self.tokenpython
import httpx
from datetime import datetime, timedelta
class ServiceAuthClient:
def __init__(self):
self.token = None
self.expires_at = None
async def get_token(self) -> str:
# Return cached token if still valid
if self.token and self.expires_at > datetime.utcnow():
return self.token
# Request new token
async with httpx.AsyncClient() as client:
response = await client.post(
'https://provider.com/oauth/token',
data={
'grant_type': 'client_credentials',
'client_id': os.environ['SERVICE_CLIENT_ID'],
'client_secret': os.environ['SERVICE_CLIENT_SECRET'],
'scope': 'api.read api.write'
}
)
response.raise_for_status()
data = response.json()
self.token = data['access_token']
self.expires_at = datetime.utcnow() + timedelta(seconds=data['expires_in'] - 60)
return self.tokenUsage
Usage
auth_client = ServiceAuthClient()
async def call_protected_api():
token = await auth_client.get_token()
async with httpx.AsyncClient() as client:
response = await client.get(
'https://api.service.com/resource',
headers={'Authorization': f'Bearer {token}'}
)
return response.json()
undefinedauth_client = ServiceAuthClient()
async def call_protected_api():
token = await auth_client.get_token()
async with httpx.AsyncClient() as client:
response = await client.get(
'https://api.service.com/resource',
headers={'Authorization': f'Bearer {token}'}
)
return response.json()
undefinedOpenID Connect (OIDC)
OpenID Connect (OIDC)
ID Token Validation
ID令牌验证
python
from jose import jwt, jwk
from jose.utils import base64url_decode
import httpx
class OIDCValidator:
def __init__(self, issuer: str, client_id: str):
self.issuer = issuer
self.client_id = client_id
self.jwks = None
self.jwks_updated_at = None
async def get_jwks(self) -> dict:
# Refresh JWKS if stale (cache for 24 hours)
if not self.jwks or (datetime.utcnow() - self.jwks_updated_at).seconds > 86400:
async with httpx.AsyncClient() as client:
response = await client.get(f"{self.issuer}/.well-known/jwks.json")
response.raise_for_status()
self.jwks = response.json()
self.jwks_updated_at = datetime.utcnow()
return self.jwks
async def validate_id_token(self, id_token: str) -> dict:
# Decode header to get key ID
header = jwt.get_unverified_header(id_token)
kid = header['kid']
# Get JWKS and find matching key
jwks = await self.get_jwks()
key = next((k for k in jwks['keys'] if k['kid'] == kid), None)
if not key:
raise ValueError("Public key not found in JWKS")
# Convert JWK to PEM
public_key = jwk.construct(key)
# Validate and decode ID token
try:
claims = jwt.decode(
id_token,
public_key.to_pem().decode('utf-8'),
algorithms=['RS256'],
audience=self.client_id,
issuer=self.issuer,
options={
'verify_exp': True,
'verify_iat': True,
'verify_aud': True,
'verify_iss': True
}
)
# Additional validations
if claims.get('nonce'):
# Verify nonce matches what was sent in auth request
pass
return claims
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="ID token expired")
except jwt.JWTClaimsError as e:
raise HTTPException(status_code=401, detail=f"Invalid ID token claims: {e}")
except Exception as e:
raise HTTPException(status_code=401, detail=f"ID token validation failed: {e}")python
from jose import jwt, jwk
from jose.utils import base64url_decode
import httpx
class OIDCValidator:
def __init__(self, issuer: str, client_id: str):
self.issuer = issuer
self.client_id = client_id
self.jwks = None
self.jwks_updated_at = None
async def get_jwks(self) -> dict:
# Refresh JWKS if stale (cache for 24 hours)
if not self.jwks or (datetime.utcnow() - self.jwks_updated_at).seconds > 86400:
async with httpx.AsyncClient() as client:
response = await client.get(f"{self.issuer}/.well-known/jwks.json")
response.raise_for_status()
self.jwks = response.json()
self.jwks_updated_at = datetime.utcnow()
return self.jwks
async def validate_id_token(self, id_token: str) -> dict:
# Decode header to get key ID
header = jwt.get_unverified_header(id_token)
kid = header['kid']
# Get JWKS and find matching key
jwks = await self.get_jwks()
key = next((k for k in jwks['keys'] if k['kid'] == kid), None)
if not key:
raise ValueError("Public key not found in JWKS")
# Convert JWK to PEM
public_key = jwk.construct(key)
# Validate and decode ID token
try:
claims = jwt.decode(
id_token,
public_key.to_pem().decode('utf-8'),
algorithms=['RS256'],
audience=self.client_id,
issuer=self.issuer,
options={
'verify_exp': True,
'verify_iat': True,
'verify_aud': True,
'verify_iss': True
}
)
# Additional validations
if claims.get('nonce'):
# Verify nonce matches what was sent in auth request
pass
return claims
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="ID token expired")
except jwt.JWTClaimsError as e:
raise HTTPException(status_code=401, detail=f"Invalid ID token claims: {e}")
except Exception as e:
raise HTTPException(status_code=401, detail=f"ID token validation failed: {e}")Usage
Usage
validator = OIDCValidator(
issuer="https://provider.com",
client_id=os.environ['OIDC_CLIENT_ID']
)
@app.post("/auth/oidc/callback")
async def oidc_callback(id_token: str):
claims = await validator.validate_id_token(id_token)
# Extract user information
user = await get_or_create_user(
email=claims['email'],
name=claims['name'],
sub=claims['sub']
)
return {"user": user, "claims": claims}undefinedvalidator = OIDCValidator(
issuer="https://provider.com",
client_id=os.environ['OIDC_CLIENT_ID']
)
@app.post("/auth/oidc/callback")
async def oidc_callback(id_token: str):
claims = await validator.validate_id_token(id_token)
# Extract user information
user = await get_or_create_user(
email=claims['email'],
name=claims['name'],
sub=claims['sub']
)
return {"user": user, "claims": claims}undefinedJWT Security Best Practices
JWT安全最佳实践
Secure Token Generation
安全令牌生成
python
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend
import ospython
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend
import osUse RS256 (asymmetric) instead of HS256 for public verification
Use RS256 (asymmetric) instead of HS256 for public verification
PRIVATE_KEY_PATH = os.environ.get('JWT_PRIVATE_KEY_PATH')
PUBLIC_KEY_PATH = os.environ.get('JWT_PUBLIC_KEY_PATH')
def load_keys():
with open(PRIVATE_KEY_PATH, 'rb') as f:
private_key = serialization.load_pem_private_key(
f.read(),
password=None,
backend=default_backend()
)
with open(PUBLIC_KEY_PATH, 'rb') as f:
public_key = serialization.load_pem_public_key(
f.read(),
backend=default_backend()
)
return private_key, public_keyPRIVATE_KEY, PUBLIC_KEY = load_keys()
def create_secure_access_token(
user_id: str,
roles: list[str],
tenant_id: str = None,
custom_claims: dict = None
) -> str:
now = datetime.utcnow()
payload = {
# Standard claims (RFC 7519)
"iss": "https://api.yourdomain.com", # Issuer
"sub": user_id, # Subject (user ID)
"aud": ["https://api.yourdomain.com"], # Audience
"exp": now + timedelta(minutes=15), # Expiration (short-lived)
"nbf": now, # Not before
"iat": now, # Issued at
"jti": secrets.token_urlsafe(16), # JWT ID (unique token identifier)
# Custom claims
"roles": roles,
"type": "access",
}
# Add tenant context for multi-tenant applications
if tenant_id:
payload["tenant_id"] = tenant_id
# Add any custom claims
if custom_claims:
payload.update(custom_claims)
return jwt.encode(payload, PRIVATE_KEY, algorithm="RS256")def verify_secure_token(token: str) -> dict:
try:
payload = jwt.decode(
token,
PUBLIC_KEY,
algorithms=["RS256"],
audience=["https://api.yourdomain.com"],
issuer="https://api.yourdomain.com",
options={
'verify_exp': True,
'verify_nbf': True,
'verify_iat': True,
'verify_aud': True,
'verify_iss': True,
'require_exp': True,
'require_iat': True,
'require_nbf': True
}
)
# Validate token type
if payload.get('type') != 'access':
raise HTTPException(status_code=401, detail="Invalid token type")
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token expired")
except jwt.InvalidTokenError as e:
raise HTTPException(status_code=401, detail=f"Invalid token: {e}")undefinedPRIVATE_KEY_PATH = os.environ.get('JWT_PRIVATE_KEY_PATH')
PUBLIC_KEY_PATH = os.environ.get('JWT_PUBLIC_KEY_PATH')
def load_keys():
with open(PRIVATE_KEY_PATH, 'rb') as f:
private_key = serialization.load_pem_private_key(
f.read(),
password=None,
backend=default_backend()
)
with open(PUBLIC_KEY_PATH, 'rb') as f:
public_key = serialization.load_pem_public_key(
f.read(),
backend=default_backend()
)
return private_key, public_keyPRIVATE_KEY, PUBLIC_KEY = load_keys()
def create_secure_access_token(
user_id: str,
roles: list[str],
tenant_id: str = None,
custom_claims: dict = None
) -> str:
now = datetime.utcnow()
payload = {
# Standard claims (RFC 7519)
"iss": "https://api.yourdomain.com", # Issuer
"sub": user_id, # Subject (user ID)
"aud": ["https://api.yourdomain.com"], # Audience
"exp": now + timedelta(minutes=15), # Expiration (short-lived)
"nbf": now, # Not before
"iat": now, # Issued at
"jti": secrets.token_urlsafe(16), # JWT ID (unique token identifier)
# Custom claims
"roles": roles,
"type": "access",
}
# Add tenant context for multi-tenant applications
if tenant_id:
payload["tenant_id"] = tenant_id
# Add any custom claims
if custom_claims:
payload.update(custom_claims)
return jwt.encode(payload, PRIVATE_KEY, algorithm="RS256")def verify_secure_token(token: str) -> dict:
try:
payload = jwt.decode(
token,
PUBLIC_KEY,
algorithms=["RS256"],
audience=["https://api.yourdomain.com"],
issuer="https://api.yourdomain.com",
options={
'verify_exp': True,
'verify_nbf': True,
'verify_iat': True,
'verify_aud': True,
'verify_iss': True,
'require_exp': True,
'require_iat': True,
'require_nbf': True
}
)
# Validate token type
if payload.get('type') != 'access':
raise HTTPException(status_code=401, detail="Invalid token type")
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token expired")
except jwt.InvalidTokenError as e:
raise HTTPException(status_code=401, detail=f"Invalid token: {e}")undefinedToken Revocation with Blacklist
基于黑名单的令牌吊销
python
class TokenBlacklist:
"""Redis-based token blacklist for revoked tokens"""
def __init__(self, redis_client):
self.redis = redis_client
async def revoke_token(self, jti: str, exp: int):
"""Add token to blacklist until expiration"""
ttl = exp - int(datetime.utcnow().timestamp())
if ttl > 0:
await self.redis.set(f"blacklist:{jti}", "1", ex=ttl)
async def is_revoked(self, jti: str) -> bool:
"""Check if token is blacklisted"""
return await self.redis.exists(f"blacklist:{jti}")python
class TokenBlacklist:
"""Redis-based token blacklist for revoked tokens"""
def __init__(self, redis_client):
self.redis = redis_client
async def revoke_token(self, jti: str, exp: int):
"""Add token to blacklist until expiration"""
ttl = exp - int(datetime.utcnow().timestamp())
if ttl > 0:
await self.redis.set(f"blacklist:{jti}", "1", ex=ttl)
async def is_revoked(self, jti: str) -> bool:
"""Check if token is blacklisted"""
return await self.redis.exists(f"blacklist:{jti}")Global blacklist instance
Global blacklist instance
token_blacklist = TokenBlacklist(redis)
async def get_current_user_with_revocation_check(
token: str = Depends(oauth2_scheme)
) -> User:
payload = verify_secure_token(token)
# Check if token has been revoked
if await token_blacklist.is_revoked(payload['jti']):
raise HTTPException(status_code=401, detail="Token has been revoked")
user = await get_user(payload["sub"])
if not user:
raise HTTPException(status_code=401, detail="User not found")
return user@app.post("/auth/logout")
async def logout(token: str = Depends(oauth2_scheme)):
payload = verify_secure_token(token)
await token_blacklist.revoke_token(payload['jti'], payload['exp'])
return {"message": "Logged out successfully"}
undefinedtoken_blacklist = TokenBlacklist(redis)
async def get_current_user_with_revocation_check(
token: str = Depends(oauth2_scheme)
) -> User:
payload = verify_secure_token(token)
# Check if token has been revoked
if await token_blacklist.is_revoked(payload['jti']):
raise HTTPException(status_code=401, detail="Token has been revoked")
user = await get_user(payload["sub"])
if not user:
raise HTTPException(status_code=401, detail="User not found")
return user@app.post("/auth/logout")
async def logout(token: str = Depends(oauth2_scheme)):
payload = verify_secure_token(token)
await token_blacklist.revoke_token(payload['jti'], payload['exp'])
return {"message": "Logged out successfully"}
undefinedSecurity Analysis with Extended Thinking
安全分析与深度思考
When reviewing authentication flows, use extended thinking for comprehensive security analysis.
在审查认证流程时,使用深度思考进行全面的安全分析。
Authentication Flow Security Review Template
认证流程安全审查模板
markdown
undefinedmarkdown
undefinedAuthentication Flow Security Review
认证流程安全审查
Flow: [Login/OAuth/SSO/API Authentication]
Date: [YYYY-MM-DD]
Reviewer: [Name/Agent]
流程: [登录/OAuth/SSO/API认证]
日期: [YYYY-MM-DD]
审查者: [名称/Agent]
Flow Diagram
流程图
[Document the authentication flow step-by-step]
[逐步记录认证流程]
Security Analysis Checklist
安全分析检查表
Confidentiality
保密性
- Credentials transmitted over HTTPS only
- Passwords hashed with strong algorithm (bcrypt/argon2)
- Tokens encrypted in transit
- Sensitive data not logged
- Secrets stored securely (env vars, secrets manager)
- 凭证仅通过HTTPS传输
- 使用强算法(bcrypt/argon2)哈希密码
- 令牌在传输中加密
- 敏感数据不被记录
- 密钥安全存储(环境变量、密钥管理器)
Integrity
完整性
- CSRF protection implemented
- Request tampering prevented
- Token signature validation
- State parameter validated (OAuth)
- Nonce validated (OIDC)
- 实现CSRF防护
- 防止请求篡改
- 令牌签名验证
- 验证OAuth的state参数
- 验证OIDC的nonce
Availability
可用性
- Rate limiting on auth endpoints
- Account lockout after failed attempts
- DDoS protection in place
- Graceful degradation strategy
- Session timeout configured
- 认证端点实施速率限制
- 失败尝试后账户锁定
- 具备DDoS防护
- 优雅降级策略
- 配置会话超时
Authentication
身份验证
- MFA available for sensitive accounts
- Password complexity requirements
- Credential stuffing protection
- Brute force mitigation
- Session fixation prevention
- 敏感账户支持MFA
- 密码复杂度要求
- 凭证填充防护
- 暴力破解缓解
- 防止会话固定
Authorization
授权
- Principle of least privilege
- Role-based access control
- Permission checks on every request
- Token scope validation
- Tenant isolation (multi-tenant apps)
- 最小权限原则
- 基于角色的访问控制
- 每个请求都进行权限检查
- 令牌作用域验证
- 租户隔离(多租户应用)
Session Management
会话管理
- Secure session tokens
- HttpOnly, Secure, SameSite cookies
- Session timeout implemented
- Logout functionality
- Concurrent session handling
- 安全会话令牌
- HttpOnly、Secure、SameSite Cookie
- 实现会话超时
- 登出功能
- 并发会话处理
Extended Thinking Analysis
深度思考分析
Use Claude with extended thinking for deep security review:
python
import anthropic
client = anthropic.Anthropic()
security_review_prompt = """
Perform a comprehensive security analysis of this authentication flow:
[Paste authentication code/flow description]
Analyze for:
1. OWASP Top 10 vulnerabilities
2. Authentication bypass possibilities
3. Token security weaknesses
4. Session management issues
5. Input validation gaps
6. Race conditions
7. Logic flaws
Provide specific findings with:
- Severity (Critical/High/Medium/Low)
- Location (file:line)
- Attack vector
- Remediation steps
"""
response = client.messages.create(
model="claude-opus-4-5-20250514",
max_tokens=32000,
thinking={
"type": "enabled",
"budget_tokens": 20000 # High budget for security analysis
},
messages=[{
"role": "user",
"content": security_review_prompt
}]
)使用Claude的深度思考功能进行深度安全审查:
python
import anthropic
client = anthropic.Anthropic()
security_review_prompt = """
Perform a comprehensive security analysis of this authentication flow:
[Paste authentication code/flow description]
Analyze for:
1. OWASP Top 10 vulnerabilities
2. Authentication bypass possibilities
3. Token security weaknesses
4. Session management issues
5. Input validation gaps
6. Race conditions
7. Logic flaws
Provide specific findings with:
- Severity (Critical/High/Medium/Low)
- Location (file:line)
- Attack vector
- Remediation steps
"""
response = client.messages.create(
model="claude-opus-4-5-20250514",
max_tokens=32000,
thinking={
"type": "enabled",
"budget_tokens": 20000 # High budget for security analysis
},
messages=[{
"role": "user",
"content": security_review_prompt
}]
)Extract thinking and analysis
Extract thinking and analysis
for block in response.content:
if block.type == "thinking":
print(f"Deep Analysis:\n{block.thinking}\n")
elif block.type == "text":
print(f"Findings:\n{block.text}")
undefinedfor block in response.content:
if block.type == "thinking":
print(f"Deep Analysis:\n{block.thinking}\n")
elif block.type == "text":
print(f"Findings:\n{block.text}")
undefinedThreat Modeling for Authentication
认证系统威胁建模
STRIDE Threat Model for Auth Systems
认证系统的STRIDE威胁模型
Adapted from [[deep-analysis]] skill threat modeling template:
markdown
undefined改编自[[deep-analysis]] Skill的威胁建模模板:
markdown
undefinedAuthentication Threat Model
认证威胁模型
System: [Auth System Name]
系统: [认证系统名称]
Version: 1.0
Last Updated: [Date]
版本: 1.0
最后更新: [日期]
Trust Boundaries
信任边界
┌─────────────────────────────────────────┐
│ External (Untrusted) │
│ [End Users] [Credential Stuffers] │
│ [MITM Attackers] │
└──────────────────┬──────────────────────┘
│ TLS/HTTPS
┌──────────────────┴──────────────────────┐
│ Public API (Semi-trusted) │
│ [API Gateway] [Auth Endpoints] │
│ [OAuth Providers] [OIDC IdP] │
└──────────────────┬──────────────────────┘
│ Internal Auth
┌──────────────────┴──────────────────────┐
│ Application Layer (Trusted) │
│ [Business Logic] [User Management] │
└──────────────────┬──────────────────────┘
│ DB Protocol
┌──────────────────┴──────────────────────┐
│ Data Layer (Highly Trusted) │
│ [User DB] [Session Store] [Secrets] │
└─────────────────────────────────────────┘┌─────────────────────────────────────────┐
│ External (Untrusted) │
│ [End Users] [Credential Stuffers] │
│ [MITM Attackers] │
└──────────────────┬──────────────────────┘
│ TLS/HTTPS
┌──────────────────┴──────────────────────┐
│ Public API (Semi-trusted) │
│ [API Gateway] [Auth Endpoints] │
│ [OAuth Providers] [OIDC IdP] │
└──────────────────┬──────────────────────┘
│ Internal Auth
┌──────────────────┴──────────────────────┐
│ Application Layer (Trusted) │
│ [Business Logic] [User Management] │
└──────────────────┬──────────────────────┘
│ DB Protocol
┌──────────────────┴──────────────────────┐
│ Data Layer (Highly Trusted) │
│ [User DB] [Session Store] [Secrets] │
└─────────────────────────────────────────┘STRIDE Analysis
STRIDE分析
Spoofing Identity
身份冒充
| Threat | Likelihood | Impact | Mitigation | Status |
|---|---|---|---|---|
| Credential theft via phishing | High | Critical | MFA, user education | ✅ |
| Session hijacking | Medium | High | Secure cookies, HTTPS | ✅ |
| Token replay attacks | Medium | High | Short token lifetime, JTI tracking | ✅ |
| OAuth state manipulation | Low | Medium | Cryptographic state validation | ✅ |
| Impersonation via stolen refresh token | Medium | Critical | Refresh token rotation, device binding | ⚠️ |
| 威胁 | 可能性 | 影响 | 缓解措施 | 状态 |
|---|---|---|---|---|
| 钓鱼导致凭证被盗 | 高 | 严重 | MFA、用户教育 | ✅ |
| 会话劫持 | 中 | 高 | 安全Cookie、HTTPS | ✅ |
| 令牌重放攻击 | 中 | 高 | 短令牌生命周期、JTI跟踪 | ✅ |
| OAuth state参数篡改 | 低 | 中 | 加密state验证 | ✅ |
| 窃取刷新令牌进行冒充 | 中 | 严重 | 刷新令牌轮换、设备绑定 | ⚠️ |
Tampering with Data
数据篡改
| Threat | Likelihood | Impact | Mitigation | Status |
|---|---|---|---|---|
| JWT payload manipulation | Low | Critical | Signature verification, RS256 | ✅ |
| Cookie tampering | Low | High | Signed cookies, HMAC validation | ✅ |
| OAuth callback manipulation | Medium | High | Redirect URI validation | ✅ |
| Password reset token tampering | Low | High | Cryptographic tokens, time limits | ✅ |
| 威胁 | 可能性 | 影响 | 缓解措施 | 状态 |
|---|---|---|---|---|
| JWT负载篡改 | 低 | 严重 | 签名验证、RS256 | ✅ |
| Cookie篡改 | 低 | 高 | 签名Cookie、HMAC验证 | ✅ |
| OAuth回调篡改 | 中 | 高 | 重定向URI验证 | ✅ |
| 密码重置令牌篡改 | 低 | 高 | 加密令牌、时间限制 | ✅ |
Repudiation
抵赖
| Threat | Likelihood | Impact | Mitigation | Status |
|---|---|---|---|---|
| Deny unauthorized access | Medium | Medium | Comprehensive audit logging | ✅ |
| Dispute authentication events | Low | Low | Immutable audit trail, timestamps | ✅ |
| 威胁 | 可能性 | 影响 | 缓解措施 | 状态 |
|---|---|---|---|---|
| 否认未授权访问 | 中 | 中 | 全面审计日志 | ✅ |
| 争议认证事件 | 低 | 低 | 不可变审计轨迹、时间戳 | ✅ |
Information Disclosure
信息泄露
| Threat | Likelihood | Impact | Mitigation | Status |
|---|---|---|---|---|
| Credentials in logs | Medium | Critical | Sanitize logs, secret detection | ✅ |
| User enumeration via login | High | Medium | Generic error messages | ⚠️ |
| Token leakage in URLs | Low | High | Tokens in headers only | ✅ |
| Timing attacks on password check | Medium | Medium | Constant-time comparison | ✅ |
| JWKS endpoint information leak | Low | Low | Rate limiting, monitoring | ✅ |
| 威胁 | 可能性 | 影响 | 缓解措施 | 状态 |
|---|---|---|---|---|
| 凭证出现在日志中 | 中 | 严重 | 日志脱敏、密钥检测 | ✅ |
| 登录接口用户枚举 | 高 | 中 | 通用错误信息 | ⚠️ |
| 令牌在URL中泄露 | 低 | 高 | 仅在Header中传递令牌 | ✅ |
| 密码检查时序攻击 | 中 | 中 | 恒定时间比较 | ✅ |
| JWKS端点信息泄露 | 低 | 低 | 速率限制、监控 | ✅ |
Denial of Service
拒绝服务
| Threat | Likelihood | Impact | Mitigation | Status |
|---|---|---|---|---|
| Brute force attacks | High | High | Rate limiting, CAPTCHA, account lockout | ✅ |
| Resource exhaustion (bcrypt) | Medium | Medium | Request throttling, async processing | ✅ |
| Session store exhaustion | Low | High | Session limits per user, TTL | ✅ |
| OAuth callback flooding | Medium | Medium | State validation, rate limiting | ⚠️ |
| 威胁 | 可能性 | 影响 | 缓解措施 | 状态 |
|---|---|---|---|---|
| 暴力破解攻击 | 高 | 高 | 速率限制、CAPTCHA账户锁定 | ✅ |
| 资源耗尽(bcrypt) | 中 | 中 | 请求限流、异步处理 | ✅ |
| 会话存储耗尽 | 低 | 高 | 每用户会话限制、TTL | ✅ |
| OAuth回调洪水攻击 | 中 | 中 | State验证、速率限制 | ⚠️ |
Elevation of Privilege
权限提升
| Threat | Likelihood | Impact | Mitigation | Status |
|---|---|---|---|---|
| Role manipulation in JWT | Low | Critical | Server-side role verification | ✅ |
| Privilege escalation via API | Medium | Critical | Permission checks on every request | ✅ |
| Admin impersonation | Low | Critical | Additional auth for admin actions | ⚠️ |
| OAuth scope escalation | Low | High | Strict scope validation | ✅ |
| 威胁 | 可能性 | 影响 | 缓解措施 | 状态 |
|---|---|---|---|---|
| JWT中角色篡改 | 低 | 严重 | 服务器端角色验证 | ✅ |
| 接口权限提升 | 中 | 严重 | 每个请求都检查权限 | ✅ |
| 管理员冒充 | 低 | 严重 | 管理员操作额外认证 | ⚠️ |
| OAuth作用域提升 | 低 | 高 | 严格作用域验证 | ✅ |
Risk Matrix
风险矩阵
| Threat ID | Threat | Likelihood | Impact | Risk Score | Priority |
|---|---|---|---|---|---|
| T1 | Credential stuffing attack | High | Critical | 9 | P0 |
| T2 | Refresh token theft | Medium | Critical | 8 | P1 |
| T3 | User enumeration | High | Medium | 6 | P2 |
| T4 | OAuth callback flooding | Medium | Medium | 4 | P2 |
| T5 | Admin impersonation | Low | Critical | 7 | P1 |
| 威胁ID | 威胁 | 可能性 | 影响 | 风险评分 | 优先级 |
|---|---|---|---|---|---|
| T1 | 凭证填充攻击 | 高 | 严重 | 9 | P0 |
| T2 | 刷新令牌窃取 | 中 | 严重 | 8 | P1 |
| T3 | 用户枚举 | 高 | 中 | 6 | P2 |
| T4 | OAuth回调洪水 | 中 | 中 | 4 | P2 |
| T5 | 管理员冒充 | 低 | 严重 | 7 | P1 |
Attack Scenarios
攻击场景
Scenario 1: Credential Stuffing Attack
场景1:凭证填充攻击
Attacker Goal: Gain unauthorized access using leaked credentials
Attack Steps:
- Obtain credential database from breach
- Automate login attempts across accounts
- Bypass rate limiting with distributed IPs
- Identify valid credentials
- Access user accounts
Defenses:
- Rate limiting per IP and per account
- CAPTCHA after N failed attempts
- Anomaly detection (impossible travel, new device)
- Breach database monitoring (HaveIBeenPwned)
- Mandatory MFA for sensitive accounts
攻击者目标: 使用泄露的凭证获取未授权访问
攻击步骤:
- 从数据泄露中获取凭证数据库
- 自动化尝试登录多个账户
- 使用分布式IP绕过速率限制
- 识别有效凭证
- 访问用户账户
防御措施:
- 基于IP和账户的速率限制
- N次失败尝试后启用CAPTCHA
- 异常检测(不可能的旅行、新设备)
- 泄露数据库监控(HaveIBeenPwned)
- 敏感账户强制启用MFA
Scenario 2: Token Theft via XSS
场景2:XSS令牌窃取
Attacker Goal: Steal access token to impersonate user
Attack Steps:
- Inject malicious script via vulnerable input
- Script reads token from localStorage
- Exfiltrate token to attacker server
- Use token to access API as victim
Defenses:
- Store tokens in HttpOnly cookies (not accessible to JS)
- Content Security Policy (CSP)
- Input sanitization and validation
- Regular security audits
- Short token lifetimes
攻击者目标: 窃取访问令牌冒充用户
攻击步骤:
- 通过漏洞输入注入恶意脚本
- 脚本从localStorage读取令牌
- 将令牌泄露到攻击者服务器
- 使用令牌以受害者身份访问API
防御措施:
- 将令牌存储在HttpOnly Cookie中(JS无法访问)
- 内容安全策略(CSP)
- 输入 sanitization 与验证
- 定期安全审计
- 短令牌生命周期
Recommendations by Priority
按优先级排序的建议
P0 (Critical - Immediate Action)
P0(严重 - 立即处理)
- Implement credential stuffing protection
- Add device fingerprinting for anomaly detection
- Enable MFA for all admin accounts
- 实现凭证填充防护
- 添加设备指纹用于异常检测
- 所有管理员账户启用MFA
P1 (High - Within Sprint)
P1(高 - 迭代内处理)
- Implement refresh token rotation
- Add additional auth step for admin impersonation
- Strengthen OAuth callback validation
- 实现刷新令牌轮换
- 管理员操作添加额外认证步骤
- 强化OAuth回调验证
P2 (Medium - Next Quarter)
P2(中 - 下一季度处理)
- Improve user enumeration protection
- Implement risk-based authentication
- Add behavioral biometrics
undefined- 提升用户枚举防护
- 实现基于风险的认证
- 添加行为生物识别
undefinedCommon Vulnerabilities & Mitigations
常见漏洞与缓解措施
OWASP Top 10 for Authentication
认证相关的OWASP Top 10
A01: Broken Access Control
A01:访问控制失效
python
undefinedpython
undefinedVULNERABLE: Client-side role check only
存在漏洞:仅客户端侧角色检查
@app.get("/admin/users")
async def get_users(user: User = Depends(get_current_user)):
# No server-side permission check!
return await db.users.find_all()
@app.get("/admin/users")
async def get_users(user: User = Depends(get_current_user)):
# 无服务器端权限检查!
return await db.users.find_all()
SECURE: Server-side permission enforcement
安全:服务器端权限强制执行
@app.get("/admin/users")
async def get_users(user: User = Depends(require_permission(Permission.ADMIN))):
# Permission verified on server
return await db.users.find_all()
undefined@app.get("/admin/users")
async def get_users(user: User = Depends(require_permission(Permission.ADMIN))):
# 服务器端验证权限
return await db.users.find_all()
undefinedA02: Cryptographic Failures
A02:加密失败
python
undefinedpython
undefinedVULNERABLE: Weak hashing
存在漏洞:弱哈希
hashed = hashlib.md5(password.encode()).hexdigest()
hashed = hashlib.md5(password.encode()).hexdigest()
SECURE: Strong adaptive hashing
安全:强自适应哈希
from passlib.context import CryptContext
pwd_context = CryptContext(
schemes=["bcrypt"],
deprecated="auto",
bcrypt__rounds=12 # Adjust based on security/performance needs
)
hashed = pwd_context.hash(password)
undefinedfrom passlib.context import CryptContext
pwd_context = CryptContext(
schemes=["bcrypt"],
deprecated="auto",
bcrypt__rounds=12 # 根据安全/性能需求调整
)
hashed = pwd_context.hash(password)
undefinedA03: Injection
A03:注入
python
undefinedpython
undefinedVULNERABLE: SQL injection in auth query
存在漏洞:认证查询SQL注入
query = f"SELECT * FROM users WHERE email = '{email}' AND password = '{password}'"
query = f"SELECT * FROM users WHERE email = '{email}' AND password = '{password}'"
SECURE: Parameterized queries
安全:参数化查询
query = "SELECT * FROM users WHERE email = $1"
user = await db.fetch_one(query, email)
if user and pwd_context.verify(password, user.hashed_password):
return user
undefinedquery = "SELECT * FROM users WHERE email = $1"
user = await db.fetch_one(query, email)
if user and pwd_context.verify(password, user.hashed_password):
return user
undefinedA07: Identification and Authentication Failures
A07:身份验证与识别失败
python
undefinedpython
undefinedVULNERABLE: Weak session management
存在漏洞:弱会话管理
session_id = hashlib.md5(str(time.time()).encode()).hexdigest()
session_id = hashlib.md5(str(time.time()).encode()).hexdigest()
SECURE: Cryptographically secure session tokens
安全:加密安全的会话令牌
import secrets
session_id = secrets.token_urlsafe(32)
import secrets
session_id = secrets.token_urlsafe(32)
VULNERABLE: No rate limiting
存在漏洞:无速率限制
@app.post("/auth/login")
async def login(credentials: LoginRequest):
return await authenticate(credentials)
@app.post("/auth/login")
async def login(credentials: LoginRequest):
return await authenticate(credentials)
SECURE: Rate limiting with slowdown
安全:带减速的速率限制
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
@app.post("/auth/login")
@limiter.limit("5/minute") # 5 attempts per minute
async def login(request: Request, credentials: LoginRequest):
return await authenticate(credentials)
undefinedfrom slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
@app.post("/auth/login")
@limiter.limit("5/minute") # 每分钟5次尝试
async def login(request: Request, credentials: LoginRequest):
return await authenticate(credentials)
undefinedIntegration with Keycloak
与Keycloak集成
For enterprise-grade auth, integrate with Keycloak (see [[keycloak]] skill):
python
from keycloak import KeycloakOpenID对于企业级认证,可与Keycloak集成(参见[[keycloak]] Skill):
python
from keycloak import KeycloakOpenIDConfigure Keycloak client
配置Keycloak客户端
keycloak_openid = KeycloakOpenID(
server_url="http://localhost:8080/",
client_id="your-client",
realm_name="your-realm",
client_secret_key="your-secret"
)
keycloak_openid = KeycloakOpenID(
server_url="http://localhost:8080/",
client_id="your-client",
realm_name="your-realm",
client_secret_key="your-secret"
)
Get token
获取令牌
token = keycloak_openid.token(username, password)
token = keycloak_openid.token(username, password)
Validate token
验证令牌
token_info = keycloak_openid.introspect(token['access_token'])
token_info = keycloak_openid.introspect(token['access_token'])
Get user info
获取用户信息
user_info = keycloak_openid.userinfo(token['access_token'])
user_info = keycloak_openid.userinfo(token['access_token'])
Decode and verify token locally
本地解码并验证令牌
KEYCLOAK_PUBLIC_KEY = "-----BEGIN PUBLIC KEY-----\n" +
keycloak_openid.public_key() +
"\n-----END PUBLIC KEY-----"
keycloak_openid.public_key() +
"\n-----END PUBLIC KEY-----"
options = {"verify_signature": True, "verify_aud": True, "verify_exp": True}
token_info = keycloak_openid.decode_token(
token['access_token'],
key=KEYCLOAK_PUBLIC_KEY,
options=options
)
undefinedKEYCLOAK_PUBLIC_KEY = "-----BEGIN PUBLIC KEY-----\n" +
keycloak_openid.public_key() +
"\n-----END PUBLIC KEY-----"
keycloak_openid.public_key() +
"\n-----END PUBLIC KEY-----"
options = {"verify_signature": True, "verify_aud": True, "verify_exp": True}
token_info = keycloak_openid.decode_token(
token['access_token'],
key=KEYCLOAK_PUBLIC_KEY,
options=options
)
undefinedRelated Skills & Resources
相关Skill与资源
Skills
Skills
- [[keycloak]] - Enterprise identity and access management
- [[deep-analysis]] - Security audit templates and threat modeling
- [[extended-thinking]] - Enable deep reasoning for security analysis
- [[complex-reasoning]] - Hypothesis-driven debugging for auth issues
- [[keycloak]] - 企业级身份与访问管理
- [[deep-analysis]] - 安全审计模板与威胁建模
- [[extended-thinking]] - 启用深度推理进行安全分析
- [[complex-reasoning]] - 基于假设的认证问题调试
Keycloak Agents
Keycloak Agents
- keycloak-realm-admin - Realm and client management
- keycloak-security-auditor - Security review and compliance
- keycloak-auth-flow-designer - Custom authentication flows
- keycloak-identity-specialist - Federation and SSO setup
- keycloak-realm-admin - 领域与客户端管理
- keycloak-security-auditor - 安全审查与合规
- keycloak-auth-flow-designer - 自定义认证流程
- keycloak-identity-specialist - 联邦与SSO设置
External Resources
外部资源
Troubleshooting
故障排除
Common Issues
常见问题
Token Verification Failures
令牌验证失败
bash
undefinedbash
undefinedDebug JWT token
调试JWT令牌
python -c "import jwt; print(jwt.decode('YOUR_TOKEN', options={'verify_signature': False}))"
python -c "import jwt; print(jwt.decode('YOUR_TOKEN', options={'verify_signature': False}))"
Verify token signature
验证令牌签名
openssl dgst -sha256 -verify public_key.pem -signature signature.bin token_payload.txt
openssl dgst -sha256 -verify public_key.pem -signature signature.bin token_payload.txt
Check token expiration
检查令牌过期时间
date -d @$(python -c "import jwt; print(jwt.decode('YOUR_TOKEN', options={'verify_signature': False})['exp'])")
undefineddate -d @$(python -c "import jwt; print(jwt.decode('YOUR_TOKEN', options={'verify_signature': False})['exp'])")
undefinedOAuth Flow Issues
OAuth流程问题
python
undefinedpython
undefinedEnable debug logging
启用调试日志
import logging
logging.basicConfig(level=logging.DEBUG)
import logging
logging.basicConfig(level=logging.DEBUG)
Log OAuth flow steps
记录OAuth流程步骤
logger.debug(f"Redirect URI: {redirect_uri}")
logger.debug(f"State: {state}")
logger.debug(f"Code: {code}")
logger.debug(f"Token response: {token_response}")
undefinedlogger.debug(f"Redirect URI: {redirect_uri}")
logger.debug(f"State: {state}")
logger.debug(f"Code: {code}")
logger.debug(f"Token response: {token_response}")
undefinedSession Issues
会话问题
bash
undefinedbash
undefinedCheck Redis session data
检查Redis会话数据
redis-cli
KEYS session:* HGETALL session:abc123 TTL session:abc123
---
_Last Updated: 2025-12-12_
_Version: 2.0.0_
_Enhanced with security analysis, threat modeling, and OIDC/OAuth 2.0 best practices_redis-cli
KEYS session:* HGETALL session:abc123 TTL session:abc123
---
_最后更新: 2025-12-12_
_版本: 2.0.0_
_新增安全分析、威胁建模及OIDC/OAuth 2.0最佳实践_