multi-system-sso-authentication
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseMulti-System SSO Authentication Skill
多系统SSO认证技能
Overview
概述
This skill provides comprehensive patterns for implementing enterprise SSO authentication that supports multiple identity providers. It covers JWT RS256 token validation, backwards verification with authoritative systems, Laravel session decryption, permission mapping, and Redis session management.
本技能提供了实现支持多身份提供商的企业级SSO认证的完整模式,涵盖JWT RS256令牌校验、与权威系统的反向验证、Laravel会话解密、权限映射以及Redis会话管理等内容。
When to Use This Skill
适用场景
- Integrating with multiple enterprise SSO systems
- Implementing secure JWT token validation with backwards verification
- Supporting legacy session-based authentication alongside JWT
- Building unified authentication adapters for microservices
- Mapping permissions across different systems
- Implementing token introspection and revocation
- Handling OAuth2 flows with multiple providers
- 集成多个企业SSO系统
- 实现带反向验证的安全JWT令牌校验
- 同时支持基于会话的传统认证与JWT认证
- 为微服务构建统一认证适配器
- 实现不同系统间的权限映射
- 实现令牌 introspection 与吊销功能
- 处理多提供商的OAuth2流程
Core Concepts
核心概念
Authentication Architecture
认证架构
┌─────────────────────────────────────────────────────────┐
│ Your Application │
│ ┌────────────────────────────────────────────────────┐ │
│ │ UnifiedAuthAdapter (Router) │ │
│ │ ┌──────────────────────────────────────────────┐ │ │
│ │ │ Check token issuer (iss claim) │ │ │
│ │ │ Route to appropriate adapter │ │ │
│ │ └──────────────────────────────────────────────┘ │ │
│ │ ▼ ▼ ▼ ▼ │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ CORP │ │ SGF │ │ GED │ │ CARRINHO│ │ │
│ │ │ Adapter │ │ Adapter │ │ Adapter │ │ Adapter │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ │
│ └────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌───────────┐ ┌───────────┐ ┌───────────┐ ┌───────────┐
│ Corporativo│ │ SGF │ │ GED │ │ Carrinho │
│ SSO │ │ API │ │ API │ │ API │
└───────────┘ └───────────┘ └───────────┘ └───────────┘┌─────────────────────────────────────────────────────────┐
│ Your Application │
│ ┌────────────────────────────────────────────────────┐ │
│ │ UnifiedAuthAdapter (Router) │ │
│ │ ┌──────────────────────────────────────────────┐ │ │
│ │ │ Check token issuer (iss claim) │ │ │
│ │ │ Route to appropriate adapter │ │ │
│ │ └──────────────────────────────────────────────┘ │ │
│ │ ▼ ▼ ▼ ▼ │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ CORP │ │ SGF │ │ GED │ │ CARRINHO│ │ │
│ │ │ Adapter │ │ Adapter │ │ Adapter │ │ Adapter │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ │
│ └────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌───────────┐ ┌───────────┐ ┌───────────┐ ┌───────────┐
│ Corporativo│ │ SGF │ │ GED │ │ Carrinho │
│ SSO │ │ API │ │ API │ │ API │
└───────────┘ └───────────┘ └───────────┘ └───────────┘Token Flow
令牌流程
- User authenticates with external SSO system
- SSO system issues JWT with issuer (iss) and audience (aud) claims
- Your app receives token from request headers
- UnifiedAuthAdapter routes to appropriate adapter based on issuer
- Adapter validates JWT signature with public key
- Backwards verification checks token validity with issuing system
- Permissions mapped from SSO format to your app's format
- User session created in Redis for future requests
- 用户认证:用户通过外部SSO系统完成认证
- SSO系统签发JWT:包含issuer(iss)和audience(aud)声明
- 应用接收令牌:从请求头中获取令牌
- UnifiedAuthAdapter路由:根据issuer将令牌路由至对应的适配器
- 适配器校验令牌:使用公钥验证JWT签名
- 反向验证:向签发系统校验令牌有效性
- 权限映射:将SSO格式的权限转换为应用内部格式
- 创建用户会话:在Redis中创建用户会话,用于后续请求
Project Structure
项目结构
src/
├── api/
│ ├── middlewares/
│ │ └── auth.py # AuthMiddleware
│ └── path/
│ └── auth.py # Authentication endpoints
├── domain/
│ └── modules/
│ └── auth/
│ ├── entity.py # User entity
│ ├── session.py # Session management
│ └── permissions.py # Permission definitions
└── infra/
├── adapters/
│ └── auth/
│ ├── unified_adapter.py # Router for all adapters
│ ├── corporativo_adapter.py # Corporativo SSO
│ ├── sgf_adapter.py # SGF integration
│ ├── ged_adapter.py # GED integration
│ └── carrinho_adapter.py # Carrinho integration
├── cache/
│ └── redis_session.py # Redis session storage
└── services/
└── permission_mapper.py # Permission mappingsrc/
├── api/
│ ├── middlewares/
│ │ └── auth.py # AuthMiddleware
│ └── path/
│ └── auth.py # Authentication endpoints
├── domain/
│ └── modules/
│ └── auth/
│ ├── entity.py # User entity
│ ├── session.py # Session management
│ └── permissions.py # Permission definitions
└── infra/
├── adapters/
│ └── auth/
│ ├── unified_adapter.py # Router for all adapters
│ ├── corporativo_adapter.py # Corporativo SSO
│ ├── sgf_adapter.py # SGF integration
│ ├── ged_adapter.py # GED integration
│ └── carrinho_adapter.py # Carrinho integration
├── cache/
│ └── redis_session.py # Redis session storage
└── services/
└── permission_mapper.py # Permission mappingImplementation Patterns
实现模式
1. Unified Authentication Adapter (Router)
1. 统一认证适配器(路由型)
python
undefinedpython
undefinedsrc/infra/adapters/auth/unified_adapter.py
src/infra/adapters/auth/unified_adapter.py
from typing import Dict, Any
from jose import jwt, JWTError
from src.infra.adapters.auth.corporativo_adapter import CorporativoAuthAdapter
from src.infra.adapters.auth.sgf_adapter import SGFAuthAdapter
from src.infra.adapters.auth.ged_adapter import GEDAuthAdapter
from src.infra.adapters.auth.carrinho_adapter import CarrinhoAuthAdapter
from src.config.settings import app_settings
class UnifiedAuthAdapter:
"""Unified authentication adapter that routes tokens to appropriate SSO adapter.
Routes based on JWT issuer claim (iss).
"""
def __init__(
self,
corporativo_adapter: CorporativoAuthAdapter,
sgf_adapter: SGFAuthAdapter,
ged_adapter: GEDAuthAdapter,
carrinho_adapter: CarrinhoAuthAdapter,
):
self.adapters = {
"corporativo": corporativo_adapter,
"sgf": sgf_adapter,
"ged": ged_adapter,
"carrinho": carrinho_adapter,
}
# Map issuer URLs to adapter names
self.issuer_map = {
app_settings.CORPORATIVO_API_URL: "corporativo",
app_settings.SGF_API_URL: "sgf",
app_settings.GED_API_URL: "ged",
app_settings.CARRINHO_API_URL: "carrinho",
"gefin-backend": "corporativo", # Self-issued tokens
}
async def validate_token(self, token: str) -> Dict[str, Any]:
"""Validate token and route to appropriate adapter.
Args:
token: JWT token string
Returns:
User data dictionary with permissions
Raises:
JWTError: If token is invalid or from unknown issuer
"""
# Decode without verification to check issuer
try:
unverified = jwt.get_unverified_claims(token)
issuer = unverified.get("iss")
except JWTError as e:
raise JWTError(f"Invalid JWT format: {e}")
# Map issuer to adapter
adapter_name = self.issuer_map.get(issuer)
if not adapter_name:
raise JWTError(f"Unknown token issuer: {issuer}")
# Check if adapter is enabled
enabled_systems = app_settings.ENABLED_AUTH_SYSTEMS
if adapter_name not in enabled_systems:
raise JWTError(f"Authentication system '{adapter_name}' is disabled")
# Route to appropriate adapter
adapter = self.adapters[adapter_name]
return await adapter.validate_token(token)
async def validate_session(self, session_id: str) -> Dict[str, Any]:
"""Validate session cookie (for legacy systems).
Routes to Corporativo adapter (primary session provider).
"""
return await self.adapters["corporativo"].validate_session(session_id)undefinedfrom typing import Dict, Any
from jose import jwt, JWTError
from src.infra.adapters.auth.corporativo_adapter import CorporativoAuthAdapter
from src.infra.adapters.auth.sgf_adapter import SGFAuthAdapter
from src.infra.adapters.auth.ged_adapter import GEDAuthAdapter
from src.infra.adapters.auth.carrinho_adapter import CarrinhoAuthAdapter
from src.config.settings import app_settings
class UnifiedAuthAdapter:
"""Unified authentication adapter that routes tokens to appropriate SSO adapter.
Routes based on JWT issuer claim (iss).
"""
def __init__(
self,
corporativo_adapter: CorporativoAuthAdapter,
sgf_adapter: SGFAuthAdapter,
ged_adapter: GEDAuthAdapter,
carrinho_adapter: CarrinhoAuthAdapter,
):
self.adapters = {
"corporativo": corporativo_adapter,
"sgf": sgf_adapter,
"ged": ged_adapter,
"carrinho": carrinho_adapter,
}
# Map issuer URLs to adapter names
self.issuer_map = {
app_settings.CORPORATIVO_API_URL: "corporativo",
app_settings.SGF_API_URL: "sgf",
app_settings.GED_API_URL: "ged",
app_settings.CARRINHO_API_URL: "carrinho",
"gefin-backend": "corporativo", # Self-issued tokens
}
async def validate_token(self, token: str) -> Dict[str, Any]:
"""Validate token and route to appropriate adapter.
Args:
token: JWT token string
Returns:
User data dictionary with permissions
Raises:
JWTError: If token is invalid or from unknown issuer
"""
# Decode without verification to check issuer
try:
unverified = jwt.get_unverified_claims(token)
issuer = unverified.get("iss")
except JWTError as e:
raise JWTError(f"Invalid JWT format: {e}")
# Map issuer to adapter
adapter_name = self.issuer_map.get(issuer)
if not adapter_name:
raise JWTError(f"Unknown token issuer: {issuer}")
# Check if adapter is enabled
enabled_systems = app_settings.ENABLED_AUTH_SYSTEMS
if adapter_name not in enabled_systems:
raise JWTError(f"Authentication system '{adapter_name}' is disabled")
# Route to appropriate adapter
adapter = self.adapters[adapter_name]
return await adapter.validate_token(token)
async def validate_session(self, session_id: str) -> Dict[str, Any]:
"""Validate session cookie (for legacy systems).
Routes to Corporativo adapter (primary session provider).
"""
return await self.adapters["corporativo"].validate_session(session_id)undefined2. Base Auth Adapter Pattern
2. 基础认证适配器模式
python
undefinedpython
undefinedsrc/infra/adapters/auth/base_adapter.py
src/infra/adapters/auth/base_adapter.py
from abc import ABC, abstractmethod
from typing import Dict, Any
class IAuthAdapter(ABC):
"""Abstract base class for authentication adapters.
All SSO adapters must implement this interface.
"""
@abstractmethod
async def validate_token(self, token: str) -> Dict[str, Any]:
"""Validate JWT token and return user data.
Args:
token: JWT token string
Returns:
User data with permissions
Raises:
JWTError: If token is invalid
"""
pass
@abstractmethod
async def validate_session(self, session_id: str) -> Dict[str, Any]:
"""Validate session ID and return user data.
Args:
session_id: Session identifier
Returns:
User data with permissions
Raises:
SessionError: If session is invalid
"""
pass
@abstractmethod
def get_permissions(self, user_data: Dict[str, Any]) -> list[str]:
"""Extract and map permissions from user data.
Args:
user_data: User data from SSO system
Returns:
List of permission strings in app format
"""
passundefinedfrom abc import ABC, abstractmethod
from typing import Dict, Any
class IAuthAdapter(ABC):
"""Abstract base class for authentication adapters.
All SSO adapters must implement this interface.
"""
@abstractmethod
async def validate_token(self, token: str) -> Dict[str, Any]:
"""Validate JWT token and return user data.
Args:
token: JWT token string
Returns:
User data with permissions
Raises:
JWTError: If token is invalid
"""
pass
@abstractmethod
async def validate_session(self, session_id: str) -> Dict[str, Any]:
"""Validate session ID and return user data.
Args:
session_id: Session identifier
Returns:
User data with permissions
Raises:
SessionError: If session is invalid
"""
pass
@abstractmethod
def get_permissions(self, user_data: Dict[str, Any]) -> list[str]:
"""Extract and map permissions from user data.
Args:
user_data: User data from SSO system
Returns:
List of permission strings in app format
"""
passundefined3. JWT RS256 Token Validation with Backwards Verification
3. 带反向验证的JWT RS256令牌校验
python
undefinedpython
undefinedsrc/infra/adapters/auth/corporativo_adapter.py
src/infra/adapters/auth/corporativo_adapter.py
import httpx
from datetime import datetime, timedelta
from jose import jwt, JWTError
from src.infra.adapters.auth.base_adapter import IAuthAdapter
from src.infra.cache.redis_session import RedisSessionManager
class CorporativoAuthAdapter(IAuthAdapter):
"""Corporativo SSO authentication adapter.
Implements JWT RS256 validation with backwards verification.
"""
def __init__(
self,
public_key: str,
private_key: str,
api_url: str,
session_manager: RedisSessionManager,
):
self.public_key = public_key
self.private_key = private_key
self.api_url = api_url
self.session_manager = session_manager
self._validation_cache: Dict[str, tuple[Dict, datetime]] = {}
self._cache_ttl = 30 # 30 seconds
async def validate_token(self, token: str) -> Dict[str, Any]:
"""Validate JWT token with backwards verification.
Steps:
1. Verify JWT signature with RSA public key
2. Check issuer and audience claims
3. Perform backwards verification with SSO system
4. Map permissions to app format
"""
try:
# Verify signature and decode token
payload = jwt.decode(
token,
self.public_key,
algorithms=["RS256"],
options={"verify_iss": False, "verify_aud": False}, # Manual validation
)
# Manual issuer validation
accepted_issuers = ["gefin-backend", self.api_url]
if payload.get("iss") not in accepted_issuers:
raise JWTError(f"Invalid issuer: {payload.get('iss')}")
# Manual audience validation
accepted_audiences = ["gefin-api", "gefin"]
aud = payload.get("aud")
if isinstance(aud, list):
if not any(a in accepted_audiences for a in aud):
raise JWTError(f"Invalid audience: {aud}")
elif aud not in accepted_audiences:
raise JWTError(f"Invalid audience: {aud}")
# Check expiration
exp = payload.get("exp")
if exp and datetime.fromtimestamp(exp) < datetime.now():
raise JWTError("Token has expired")
# Backwards verification (if not self-issued)
if payload.get("iss") != "gefin-backend":
await self._verify_with_corporativo(token, payload)
return payload
except JWTError as e:
raise JWTError(f"Token validation failed: {e}")
async def _verify_with_corporativo(
self,
token: str,
payload: Dict[str, Any]
) -> None:
"""Verify token validity with Corporativo SSO system.
Implements backwards verification with caching.
"""
# Check cache first
cache_key = payload.get("sub")
if cache_key in self._validation_cache:
cached_data, cached_at = self._validation_cache[cache_key]
if datetime.now() - cached_at < timedelta(seconds=self._cache_ttl):
return # Valid in cache
# Call Corporativo /api/me endpoint
headers = {"Authorization": f"Bearer {token}"}
try:
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.get(
f"{self.api_url}/api/me",
headers=headers,
)
response.raise_for_status()
# Cache validation result
self._validation_cache[cache_key] = (payload, datetime.now())
except httpx.HTTPStatusError as e:
if e.response.status_code == 401:
raise JWTError("Token is not valid in Corporativo system")
# Network error - extend cache if exists
if cache_key in self._validation_cache:
cached_data, cached_at = self._validation_cache[cache_key]
# Extend cache to 5 minutes on network failure
if datetime.now() - cached_at < timedelta(minutes=5):
return
raise JWTError("Unable to verify token with Corporativo")
except httpx.RequestError:
# Network error - graceful degradation
if cache_key in self._validation_cache:
return
raise JWTError("Network error verifying token")
def get_permissions(self, user_data: Dict[str, Any]) -> list[str]:
"""Map Corporativo permissions to app format.
Example mapping:
"Ver anuidade" -> "gefin.boleto.read"
"Editar anuidade" -> "gefin.boleto.write"
"""
corporativo_permissions = user_data.get("permissions", [])
permission_map = {
"Ver anuidade": "gefin.boleto.read",
"Editar anuidade": "gefin.boleto.write",
"Ver parcelamento": "gefin.parcela.read",
"Editar parcelamento": "gefin.parcela.write",
"Ver publicações": "gefin.publicacao.read",
"Editar publicações": "gefin.publicacao.write",
# ... more mappings
}
mapped_permissions = []
for corp_perm in corporativo_permissions:
if corp_perm == "*": # Admin wildcard
return ["*"]
app_perm = permission_map.get(corp_perm)
if app_perm:
mapped_permissions.append(app_perm)
# Ensure at least read permission
if not any(p.endswith(".read") for p in mapped_permissions):
mapped_permissions.append("gefin.user.read")
return mapped_permissions
async def validate_session(self, session_id: str) -> Dict[str, Any]:
"""Validate session from Redis.
Falls back to Laravel session decryption if Redis unavailable.
"""
# Try Redis first
session_data = await self.session_manager.get_session(session_id)
if session_data:
return session_data
# Fall back to Laravel session decryption
return await self._decrypt_laravel_session(session_id)
async def _decrypt_laravel_session(self, session_cookie: str) -> Dict[str, Any]:
"""Decrypt Laravel AES-256-CBC session cookie.
Laravel session format:
- base64(iv:encrypted_payload:mac)
- Encrypted with APP_KEY from .env
"""
# Implementation omitted for brevity
# See Laravel session decryption pattern below
passundefinedimport httpx
from datetime import datetime, timedelta
from jose import jwt, JWTError
from src.infra.adapters.auth.base_adapter import IAuthAdapter
from src.infra.cache.redis_session import RedisSessionManager
class CorporativoAuthAdapter(IAuthAdapter):
"""Corporativo SSO authentication adapter.
Implements JWT RS256 validation with backwards verification.
"""
def __init__(
self,
public_key: str,
private_key: str,
api_url: str,
session_manager: RedisSessionManager,
):
self.public_key = public_key
self.private_key = private_key
self.api_url = api_url
self.session_manager = session_manager
self._validation_cache: Dict[str, tuple[Dict, datetime]] = {}
self._cache_ttl = 30 # 30 seconds
async def validate_token(self, token: str) -> Dict[str, Any]:
"""Validate JWT token with backwards verification.
Steps:
1. Verify JWT signature with RSA public key
2. Check issuer and audience claims
3. Perform backwards verification with SSO system
4. Map permissions to app format
"""
try:
# Verify signature and decode token
payload = jwt.decode(
token,
self.public_key,
algorithms=["RS256"],
options={"verify_iss": False, "verify_aud": False}, # Manual validation
)
# Manual issuer validation
accepted_issuers = ["gefin-backend", self.api_url]
if payload.get("iss") not in accepted_issuers:
raise JWTError(f"Invalid issuer: {payload.get('iss')}")
# Manual audience validation
accepted_audiences = ["gefin-api", "gefin"]
aud = payload.get("aud")
if isinstance(aud, list):
if not any(a in accepted_audiences for a in aud):
raise JWTError(f"Invalid audience: {aud}")
elif aud not in accepted_audiences:
raise JWTError(f"Invalid audience: {aud}")
# Check expiration
exp = payload.get("exp")
if exp and datetime.fromtimestamp(exp) < datetime.now():
raise JWTError("Token has expired")
# Backwards verification (if not self-issued)
if payload.get("iss") != "gefin-backend":
await self._verify_with_corporativo(token, payload)
return payload
except JWTError as e:
raise JWTError(f"Token validation failed: {e}")
async def _verify_with_corporativo(
self,
token: str,
payload: Dict[str, Any]
) -> None:
"""Verify token validity with Corporativo SSO system.
Implements backwards verification with caching.
"""
# Check cache first
cache_key = payload.get("sub")
if cache_key in self._validation_cache:
cached_data, cached_at = self._validation_cache[cache_key]
if datetime.now() - cached_at < timedelta(seconds=self._cache_ttl):
return # Valid in cache
# Call Corporativo /api/me endpoint
headers = {"Authorization": f"Bearer {token}"}
try:
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.get(
f"{self.api_url}/api/me",
headers=headers,
)
response.raise_for_status()
# Cache validation result
self._validation_cache[cache_key] = (payload, datetime.now())
except httpx.HTTPStatusError as e:
if e.response.status_code == 401:
raise JWTError("Token is not valid in Corporativo system")
# Network error - extend cache if exists
if cache_key in self._validation_cache:
cached_data, cached_at = self._validation_cache[cache_key]
# Extend cache to 5 minutes on network failure
if datetime.now() - cached_at < timedelta(minutes=5):
return
raise JWTError("Unable to verify token with Corporativo")
except httpx.RequestError:
# Network error - graceful degradation
if cache_key in self._validation_cache:
return
raise JWTError("Network error verifying token")
def get_permissions(self, user_data: Dict[str, Any]) -> list[str]:
"""Map Corporativo permissions to app format.
Example mapping:
"Ver anuidade" -> "gefin.boleto.read"
"Editar anuidade" -> "gefin.boleto.write"
"""
corporativo_permissions = user_data.get("permissions", [])
permission_map = {
"Ver anuidade": "gefin.boleto.read",
"Editar anuidade": "gefin.boleto.write",
"Ver parcelamento": "gefin.parcela.read",
"Editar parcelamento": "gefin.parcela.write",
"Ver publicações": "gefin.publicacao.read",
"Editar publicações": "gefin.publicacao.write",
# ... more mappings
}
mapped_permissions = []
for corp_perm in corporativo_permissions:
if corp_perm == "*": # Admin wildcard
return ["*"]
app_perm = permission_map.get(corp_perm)
if app_perm:
mapped_permissions.append(app_perm)
# Ensure at least read permission
if not any(p.endswith(".read") for p in mapped_permissions):
mapped_permissions.append("gefin.user.read")
return mapped_permissions
async def validate_session(self, session_id: str) -> Dict[str, Any]:
"""Validate session from Redis.
Falls back to Laravel session decryption if Redis unavailable.
"""
# Try Redis first
session_data = await self.session_manager.get_session(session_id)
if session_data:
return session_data
# Fall back to Laravel session decryption
return await self._decrypt_laravel_session(session_id)
async def _decrypt_laravel_session(self, session_cookie: str) -> Dict[str, Any]:
"""Decrypt Laravel AES-256-CBC session cookie.
Laravel session format:
- base64(iv:encrypted_payload:mac)
- Encrypted with APP_KEY from .env
"""
# Implementation omitted for brevity
# See Laravel session decryption pattern below
passundefined4. Laravel Session Decryption
4. Laravel会话解密
python
undefinedpython
undefinedsrc/infra/adapters/auth/laravel_session.py
src/infra/adapters/auth/laravel_session.py
import base64
import json
import hashlib
import hmac
from Cryptodome.Cipher import AES
from Cryptodome.Util.Padding import unpad
import phpserialize
class LaravelSessionDecryptor:
"""Decrypt Laravel AES-256-CBC encrypted sessions.
Handles Laravel's session encryption format.
"""
def __init__(self, app_key: str):
"""Initialize with Laravel APP_KEY.
Args:
app_key: Laravel APP_KEY from .env (base64: prefix)
"""
# Remove 'base64:' prefix if present
if app_key.startswith("base64:"):
app_key = app_key[7:]
self.key = base64.b64decode(app_key)
def decrypt(self, encrypted_value: str) -> str:
"""Decrypt Laravel encrypted value.
Format: base64(json({"iv": "...", "value": "...", "mac": "..."}))
"""
# Decode base64
decoded = base64.b64decode(encrypted_value)
payload = json.loads(decoded)
# Verify MAC signature
if not self._valid_mac(payload):
raise ValueError("Invalid MAC signature")
# Decrypt
iv = base64.b64decode(payload["iv"])
encrypted = base64.b64decode(payload["value"])
cipher = AES.new(self.key, AES.MODE_CBC, iv)
decrypted = unpad(cipher.decrypt(encrypted), AES.block_size)
return decrypted.decode("utf-8")
def _valid_mac(self, payload: dict) -> bool:
"""Verify MAC signature."""
mac = payload.get("mac")
if not mac:
return False
# Calculate expected MAC
message = base64.b64encode(
json.dumps({"iv": payload["iv"], "value": payload["value"]}).encode()
)
expected_mac = hmac.new(
self.key,
message,
hashlib.sha256,
).hexdigest()
return hmac.compare_digest(mac, expected_mac)
def decrypt_session(self, session_cookie: str) -> dict:
"""Decrypt Laravel session cookie and extract user data.
Args:
session_cookie: Laravel session cookie value
Returns:
Dictionary with user_id and other session data
"""
# Decrypt session
decrypted = self.decrypt(session_cookie)
# Unserialize PHP session data
session_data = phpserialize.loads(decrypted.encode())
# Extract user ID from various Laravel guard patterns
user_id = None
# Pattern 1: login_web_{guard}_*
for key in session_data:
if isinstance(key, bytes):
key_str = key.decode()
if key_str.startswith("login_web_"):
user_id = session_data[key]
break
# Pattern 2: Direct user_id key
if not user_id and b"user_id" in session_data:
user_id = session_data[b"user_id"]
if not user_id:
raise ValueError("No user_id found in session")
return {
"user_id": user_id.decode() if isinstance(user_id, bytes) else user_id,
"session_data": session_data,
}undefinedimport base64
import json
import hashlib
import hmac
from Cryptodome.Cipher import AES
from Cryptodome.Util.Padding import unpad
import phpserialize
class LaravelSessionDecryptor:
"""Decrypt Laravel AES-256-CBC encrypted sessions.
Handles Laravel's session encryption format.
"""
def __init__(self, app_key: str):
"""Initialize with Laravel APP_KEY.
Args:
app_key: Laravel APP_KEY from .env (base64: prefix)
"""
# Remove 'base64:' prefix if present
if app_key.startswith("base64:"):
app_key = app_key[7:]
self.key = base64.b64decode(app_key)
def decrypt(self, encrypted_value: str) -> str:
"""Decrypt Laravel encrypted value.
Format: base64(json({"iv": "...", "value": "...", "mac": "..."}))
"""
# Decode base64
decoded = base64.b64decode(encrypted_value)
payload = json.loads(decoded)
# Verify MAC signature
if not self._valid_mac(payload):
raise ValueError("Invalid MAC signature")
# Decrypt
iv = base64.b64decode(payload["iv"])
encrypted = base64.b64decode(payload["value"])
cipher = AES.new(self.key, AES.MODE_CBC, iv)
decrypted = unpad(cipher.decrypt(encrypted), AES.block_size)
return decrypted.decode("utf-8")
def _valid_mac(self, payload: dict) -> bool:
"""Verify MAC signature."""
mac = payload.get("mac")
if not mac:
return False
# Calculate expected MAC
message = base64.b64encode(
json.dumps({"iv": payload["iv"], "value": payload["value"]}).encode()
)
expected_mac = hmac.new(
self.key,
message,
hashlib.sha256,
).hexdigest()
return hmac.compare_digest(mac, expected_mac)
def decrypt_session(self, session_cookie: str) -> dict:
"""Decrypt Laravel session cookie and extract user data.
Args:
session_cookie: Laravel session cookie value
Returns:
Dictionary with user_id and other session data
"""
# Decrypt session
decrypted = self.decrypt(session_cookie)
# Unserialize PHP session data
session_data = phpserialize.loads(decrypted.encode())
# Extract user ID from various Laravel guard patterns
user_id = None
# Pattern 1: login_web_{guard}_*
for key in session_data:
if isinstance(key, bytes):
key_str = key.decode()
if key_str.startswith("login_web_"):
user_id = session_data[key]
break
# Pattern 2: Direct user_id key
if not user_id and b"user_id" in session_data:
user_id = session_data[b"user_id"]
if not user_id:
raise ValueError("No user_id found in session")
return {
"user_id": user_id.decode() if isinstance(user_id, bytes) else user_id,
"session_data": session_data,
}undefined5. Redis Session Management
5. Redis会话管理
python
undefinedpython
undefinedsrc/infra/cache/redis_session.py
src/infra/cache/redis_session.py
import json
from datetime import timedelta
from redis.asyncio import Redis
class RedisSessionManager:
"""Manage user sessions in Redis.
Stores session data with TTL for automatic expiration.
"""
def __init__(self, redis_client: Redis, ttl_seconds: int = 28800):
"""Initialize session manager.
Args:
redis_client: Async Redis client
ttl_seconds: Session TTL (default 8 hours)
"""
self.redis = redis_client
self.ttl = ttl_seconds
async def create_session(self, user_data: dict) -> str:
"""Create new session and return session ID.
Args:
user_data: User data to store
Returns:
Session ID (UUID)
"""
import uuid
session_id = str(uuid.uuid4())
# Store in Redis
session_key = f"session:{session_id}"
await self.redis.setex(
session_key,
self.ttl,
json.dumps(user_data),
)
return session_id
async def get_session(self, session_id: str) -> dict | None:
"""Retrieve session data.
Args:
session_id: Session identifier
Returns:
User data dictionary or None if not found
"""
session_key = f"session:{session_id}"
data = await self.redis.get(session_key)
if not data:
return None
# Refresh TTL on access
await self.redis.expire(session_key, self.ttl)
return json.loads(data)
async def delete_session(self, session_id: str) -> bool:
"""Delete session.
Args:
session_id: Session identifier
Returns:
True if deleted, False if not found
"""
session_key = f"session:{session_id}"
result = await self.redis.delete(session_key)
return result > 0
async def update_session(self, session_id: str, user_data: dict) -> bool:
"""Update existing session data.
Args:
session_id: Session identifier
user_data: Updated user data
Returns:
True if updated, False if session not found
"""
session_key = f"session:{session_id}"
exists = await self.redis.exists(session_key)
if not exists:
return False
await self.redis.setex(
session_key,
self.ttl,
json.dumps(user_data),
)
return Trueundefinedimport json
from datetime import timedelta
from redis.asyncio import Redis
class RedisSessionManager:
"""Manage user sessions in Redis.
Stores session data with TTL for automatic expiration.
"""
def __init__(self, redis_client: Redis, ttl_seconds: int = 28800):
"""Initialize session manager.
Args:
redis_client: Async Redis client
ttl_seconds: Session TTL (default 8 hours)
"""
self.redis = redis_client
self.ttl = ttl_seconds
async def create_session(self, user_data: dict) -> str:
"""Create new session and return session ID.
Args:
user_data: User data to store
Returns:
Session ID (UUID)
"""
import uuid
session_id = str(uuid.uuid4())
# Store in Redis
session_key = f"session:{session_id}"
await self.redis.setex(
session_key,
self.ttl,
json.dumps(user_data),
)
return session_id
async def get_session(self, session_id: str) -> dict | None:
"""Retrieve session data.
Args:
session_id: Session identifier
Returns:
User data dictionary or None if not found
"""
session_key = f"session:{session_id}"
data = await self.redis.get(session_key)
if not data:
return None
# Refresh TTL on access
await self.redis.expire(session_key, self.ttl)
return json.loads(data)
async def delete_session(self, session_id: str) -> bool:
"""Delete session.
Args:
session_id: Session identifier
Returns:
True if deleted, False if not found
"""
session_key = f"session:{session_id}"
result = await self.redis.delete(session_key)
return result > 0
async def update_session(self, session_id: str, user_data: dict) -> bool:
"""Update existing session data.
Args:
session_id: Session identifier
user_data: Updated user data
Returns:
True if updated, False if session not found
"""
session_key = f"session:{session_id}"
exists = await self.redis.exists(session_key)
if not exists:
return False
await self.redis.setex(
session_key,
self.ttl,
json.dumps(user_data),
)
return Trueundefined6. Permission Checking Middleware
6. 权限校验中间件
python
undefinedpython
undefinedsrc/api/middlewares/auth.py
src/api/middlewares/auth.py
from fastapi import Request, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from src.infra.adapters.auth.unified_adapter import UnifiedAuthAdapter
security = HTTPBearer()
class ProtectedResource:
"""FastAPI dependency for protected endpoints.
Usage:
@app.get("/protected", dependencies=[Depends(ProtectedResource.check)])
"""
def __init__(self, unified_adapter: UnifiedAuthAdapter):
self.unified_adapter = unified_adapter
async def check(
self,
credentials: HTTPAuthorizationCredentials = Depends(security),
) -> dict:
"""Validate token and return user data.
Raises:
HTTPException: 401 if token invalid, 403 if insufficient permissions
"""
token = credentials.credentials
try:
user_data = await self.unified_adapter.validate_token(token)
return user_data
except JWTError as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=str(e),
)
async def check_permissions(
self,
credentials: HTTPAuthorizationCredentials,
required_permissions: list[str],
) -> dict:
"""Validate token and check permissions.
Args:
credentials: Bearer token
required_permissions: List of required permissions
Returns:
User data if authorized
Raises:
HTTPException: 401 unauthorized, 403 forbidden
"""
user_data = await self.check(credentials)
user_permissions = user_data.get("permissions", [])
# Check for admin wildcard
if "*" in user_permissions:
return user_data
# Check required permissions
has_permission = any(
perm in user_permissions for perm in required_permissions
)
if not has_permission:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Missing required permissions: {required_permissions}",
)
return user_dataundefinedfrom fastapi import Request, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from src.infra.adapters.auth.unified_adapter import UnifiedAuthAdapter
security = HTTPBearer()
class ProtectedResource:
"""FastAPI dependency for protected endpoints.
Usage:
@app.get("/protected", dependencies=[Depends(ProtectedResource.check)])
"""
def __init__(self, unified_adapter: UnifiedAuthAdapter):
self.unified_adapter = unified_adapter
async def check(
self,
credentials: HTTPAuthorizationCredentials = Depends(security),
) -> dict:
"""Validate token and return user data.
Raises:
HTTPException: 401 if token invalid, 403 if insufficient permissions
"""
token = credentials.credentials
try:
user_data = await self.unified_adapter.validate_token(token)
return user_data
except JWTError as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=str(e),
)
async def check_permissions(
self,
credentials: HTTPAuthorizationCredentials,
required_permissions: list[str],
) -> dict:
"""Validate token and check permissions.
Args:
credentials: Bearer token
required_permissions: List of required permissions
Returns:
User data if authorized
Raises:
HTTPException: 401 unauthorized, 403 forbidden
"""
user_data = await self.check(credentials)
user_permissions = user_data.get("permissions", [])
# Check for admin wildcard
if "*" in user_permissions:
return user_data
# Check required permissions
has_permission = any(
perm in user_permissions for perm in required_permissions
)
if not has_permission:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Missing required permissions: {required_permissions}",
)
return user_dataundefined7. Multi-System Authentication Endpoints
7. 多系统认证端点
python
undefinedpython
undefinedsrc/api/path/auth.py
src/api/path/auth.py
from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel
from src.infra.adapters.auth.unified_adapter import UnifiedAuthAdapter
from src.infra.cache.redis_session import RedisSessionManager
router = APIRouter(prefix="/v1/auth", tags=["auth"])
class LoginRequest(BaseModel):
username: str
password: str
class SSOLoginRequest(BaseModel):
corporativo_session: str # Cookie from Corporativo
class TokenResponse(BaseModel):
access_token: str
token_type: str = "bearer"
expires_in: int
@router.post("/login", response_model=TokenResponse)
async def login(
request: LoginRequest,
adapter: UnifiedAuthAdapter = Depends(),
):
"""Login with username/password (Corporativo).
Returns JWT access token.
"""
# Delegate to Corporativo adapter
result = await adapter.adapters["corporativo"].authenticate_credentials(
username=request.username,
password=request.password,
)
return TokenResponse(
access_token=result["access_token"],
expires_in=3600,
)@router.post("/sso-login", response_model=TokenResponse)
async def sso_login(
request: SSOLoginRequest,
adapter: UnifiedAuthAdapter = Depends(),
session_manager: RedisSessionManager = Depends(),
):
"""SSO login using Corporativo session cookie.
Validates session, creates local session, returns JWT.
"""
# Validate Corporativo session
user_data = await adapter.validate_session(request.corporativo_session)
# Create local session
session_id = await session_manager.create_session(user_data)
# Generate JWT
token = adapter.adapters["corporativo"].generate_token(user_data)
return TokenResponse(
access_token=token,
expires_in=3600,
)@router.get("/me")
async def get_current_user(
user_data: dict = Depends(ProtectedResource.check),
):
"""Get current authenticated user info."""
return {
"cpf": user_data.get("sub"),
"name": user_data.get("name"),
"email": user_data.get("email"),
"permissions": user_data.get("permissions"),
"systems": user_data.get("systems", []),
}
@router.post("/logout")
async def logout(
session_id: str,
session_manager: RedisSessionManager = Depends(),
):
"""Logout and invalidate session."""
await session_manager.delete_session(session_id)
return {"message": "Logged out successfully"}
undefinedfrom fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel
from src.infra.adapters.auth.unified_adapter import UnifiedAuthAdapter
from src.infra.cache.redis_session import RedisSessionManager
router = APIRouter(prefix="/v1/auth", tags=["auth"])
class LoginRequest(BaseModel):
username: str
password: str
class SSOLoginRequest(BaseModel):
corporativo_session: str # Cookie from Corporativo
class TokenResponse(BaseModel):
access_token: str
token_type: str = "bearer"
expires_in: int
@router.post("/login", response_model=TokenResponse)
async def login(
request: LoginRequest,
adapter: UnifiedAuthAdapter = Depends(),
):
"""Login with username/password (Corporativo).
Returns JWT access token.
"""
# Delegate to Corporativo adapter
result = await adapter.adapters["corporativo"].authenticate_credentials(
username=request.username,
password=request.password,
)
return TokenResponse(
access_token=result["access_token"],
expires_in=3600,
)@router.post("/sso-login", response_model=TokenResponse)
async def sso_login(
request: SSOLoginRequest,
adapter: UnifiedAuthAdapter = Depends(),
session_manager: RedisSessionManager = Depends(),
):
"""SSO login using Corporativo session cookie.
Validates session, creates local session, returns JWT.
"""
# Validate Corporativo session
user_data = await adapter.validate_session(request.corporativo_session)
# Create local session
session_id = await session_manager.create_session(user_data)
# Generate JWT
token = adapter.adapters["corporativo"].generate_token(user_data)
return TokenResponse(
access_token=token,
expires_in=3600,
)@router.get("/me")
async def get_current_user(
user_data: dict = Depends(ProtectedResource.check),
):
"""Get current authenticated user info."""
return {
"cpf": user_data.get("sub"),
"name": user_data.get("name"),
"email": user_data.get("email"),
"permissions": user_data.get("permissions"),
"systems": user_data.get("systems", []),
}
@router.post("/logout")
async def logout(
session_id: str,
session_manager: RedisSessionManager = Depends(),
):
"""Logout and invalidate session."""
await session_manager.delete_session(session_id)
return {"message": "Logged out successfully"}
undefinedConfiguration
配置
Environment Variables
环境变量
python
undefinedpython
undefinedsrc/config/settings.py
src/config/settings.py
from pydantic_settings import BaseSettings
class AppSettings(BaseSettings):
"""Multi-system authentication settings."""
# Feature flags
ENABLE_MULTI_SYSTEM_AUTH: bool = True
ENABLED_AUTH_SYSTEMS: list[str] = ["corporativo", "sgf", "ged", "carrinho"]
# JWT configuration
JWT_ALGORITHM: str = "RS256"
JWT_PUBLIC_KEY_PATH: str = "./keys/jwt_public.pem"
JWT_PRIVATE_KEY_PATH: str = "./keys/jwt_private.pem"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 60
REFRESH_TOKEN_EXPIRE_HOURS: int = 8
# SSO systems
CORPORATIVO_API_URL: str
CORPORATIVO_APP_KEY: str # Laravel APP_KEY for session decryption
SGF_API_URL: str
SGF_API_KEY: str
GED_API_URL: str
GED_API_KEY: str
CARRINHO_API_URL: str
CARRINHO_API_KEY: str
# Redis
REDIS_URL: str = "redis://localhost:6379/0"
SESSION_TTL_SECONDS: int = 28800 # 8 hours
# Backwards verification
ENABLE_BACKWARDS_VERIFICATION: bool = True
VERIFICATION_CACHE_TTL: int = 30 # seconds
VERIFICATION_TIMEOUT: int = 5 # secondsapp_settings = AppSettings()
undefinedfrom pydantic_settings import BaseSettings
class AppSettings(BaseSettings):
"""Multi-system authentication settings."""
# Feature flags
ENABLE_MULTI_SYSTEM_AUTH: bool = True
ENABLED_AUTH_SYSTEMS: list[str] = ["corporativo", "sgf", "ged", "carrinho"]
# JWT configuration
JWT_ALGORITHM: str = "RS256"
JWT_PUBLIC_KEY_PATH: str = "./keys/jwt_public.pem"
JWT_PRIVATE_KEY_PATH: str = "./keys/jwt_private.pem"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 60
REFRESH_TOKEN_EXPIRE_HOURS: int = 8
# SSO systems
CORPORATIVO_API_URL: str
CORPORATIVO_APP_KEY: str # Laravel APP_KEY for session decryption
SGF_API_URL: str
SGF_API_KEY: str
GED_API_URL: str
GED_API_KEY: str
CARRINHO_API_URL: str
CARRINHO_API_KEY: str
# Redis
REDIS_URL: str = "redis://localhost:6379/0"
SESSION_TTL_SECONDS: int = 28800 # 8 hours
# Backwards verification
ENABLE_BACKWARDS_VERIFICATION: bool = True
VERIFICATION_CACHE_TTL: int = 30 # seconds
VERIFICATION_TIMEOUT: int = 5 # secondsapp_settings = AppSettings()
undefinedRSA Key Pair Management
RSA密钥对管理
bash
undefinedbash
undefinedGenerate RSA key pair for JWT signing
Generate RSA key pair for JWT signing
openssl genrsa -out keys/jwt_private.pem 4096
openssl rsa -in keys/jwt_private.pem -pubout -out keys/jwt_public.pem
openssl genrsa -out keys/jwt_private.pem 4096
openssl rsa -in keys/jwt_private.pem -pubout -out keys/jwt_public.pem
Set proper permissions
Set proper permissions
chmod 600 keys/jwt_private.pem
chmod 644 keys/jwt_public.pem
chmod 600 keys/jwt_private.pem
chmod 644 keys/jwt_public.pem
Add to .gitignore
Add to .gitignore
echo "keys/jwt_private.pem" >> .gitignore
undefinedecho "keys/jwt_private.pem" >> .gitignore
undefinedTesting Strategy
测试策略
Unit Tests (Token Validation)
单元测试(令牌校验)
python
undefinedpython
undefinedtests/infra/adapters/auth/test_corporativo_adapter.py
tests/infra/adapters/auth/test_corporativo_adapter.py
import pytest
from jose import jwt
from datetime import datetime, timedelta
@pytest.fixture
def valid_token(private_key):
"""Generate valid JWT token."""
payload = {
"sub": "12345678901",
"name": "Test User",
"email": "test@example.com",
"permissions": ["gefin.boleto.read"],
"iss": "gefin-backend",
"aud": "gefin-api",
"exp": datetime.utcnow() + timedelta(hours=1),
}
return jwt.encode(payload, private_key, algorithm="RS256")
@pytest.mark.asyncio
async def test_validate_token_success(corporativo_adapter, valid_token):
"""Test successful token validation."""
user_data = await corporativo_adapter.validate_token(valid_token)
assert user_data["sub"] == "12345678901"
assert "gefin.boleto.read" in user_data["permissions"]@pytest.mark.asyncio
async def test_validate_token_invalid_signature(corporativo_adapter):
"""Test token with invalid signature."""
invalid_token = "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.invalid.signature"
with pytest.raises(JWTError):
await corporativo_adapter.validate_token(invalid_token)@pytest.mark.asyncio
async def test_validate_token_expired(corporativo_adapter, private_key):
"""Test expired token."""
payload = {
"sub": "12345678901",
"exp": datetime.utcnow() - timedelta(hours=1), # Expired
"iss": "gefin-backend",
"aud": "gefin-api",
}
expired_token = jwt.encode(payload, private_key, algorithm="RS256")
with pytest.raises(JWTError, match="expired"):
await corporativo_adapter.validate_token(expired_token)undefinedimport pytest
from jose import jwt
from datetime import datetime, timedelta
@pytest.fixture
def valid_token(private_key):
"""Generate valid JWT token."""
payload = {
"sub": "12345678901",
"name": "Test User",
"email": "test@example.com",
"permissions": ["gefin.boleto.read"],
"iss": "gefin-backend",
"aud": "gefin-api",
"exp": datetime.utcnow() + timedelta(hours=1),
}
return jwt.encode(payload, private_key, algorithm="RS256")
@pytest.mark.asyncio
async def test_validate_token_success(corporativo_adapter, valid_token):
"""Test successful token validation."""
user_data = await corporativo_adapter.validate_token(valid_token)
assert user_data["sub"] == "12345678901"
assert "gefin.boleto.read" in user_data["permissions"]@pytest.mark.asyncio
async def test_validate_token_invalid_signature(corporativo_adapter):
"""Test token with invalid signature."""
invalid_token = "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.invalid.signature"
with pytest.raises(JWTError):
await corporativo_adapter.validate_token(invalid_token)@pytest.mark.asyncio
async def test_validate_token_expired(corporativo_adapter, private_key):
"""Test expired token."""
payload = {
"sub": "12345678901",
"exp": datetime.utcnow() - timedelta(hours=1), # Expired
"iss": "gefin-backend",
"aud": "gefin-api",
}
expired_token = jwt.encode(payload, private_key, algorithm="RS256")
with pytest.raises(JWTError, match="expired"):
await corporativo_adapter.validate_token(expired_token)undefinedIntegration Tests (Backwards Verification)
集成测试(反向验证)
python
undefinedpython
undefinedtests/integration/test_backwards_verification.py
tests/integration/test_backwards_verification.py
@pytest.mark.asyncio
async def test_backwards_verification_valid_token(
corporativo_adapter,
mock_corporativo_api,
):
"""Test backwards verification with valid token."""
# Mock Corporativo /api/me endpoint
mock_corporativo_api.get("/api/me").returns(
status=200,
json={"cpf": "12345678901", "name": "Test User"},
)
token = generate_corporativo_token()
user_data = await corporativo_adapter.validate_token(token)
assert user_data["sub"] == "12345678901"@pytest.mark.asyncio
async def test_backwards_verification_invalid_token(
corporativo_adapter,
mock_corporativo_api,
):
"""Test backwards verification with invalid token."""
mock_corporativo_api.get("/api/me").returns(status=401)
token = generate_corporativo_token()
with pytest.raises(JWTError, match="not valid in Corporativo"):
await corporativo_adapter.validate_token(token)undefined@pytest.mark.asyncio
async def test_backwards_verification_valid_token(
corporativo_adapter,
mock_corporativo_api,
):
"""Test backwards verification with valid token."""
# Mock Corporativo /api/me endpoint
mock_corporativo_api.get("/api/me").returns(
status=200,
json={"cpf": "12345678901", "name": "Test User"},
)
token = generate_corporativo_token()
user_data = await corporativo_adapter.validate_token(token)
assert user_data["sub"] == "12345678901"@pytest.mark.asyncio
async def test_backwards_verification_invalid_token(
corporativo_adapter,
mock_corporativo_api,
):
"""Test backwards verification with invalid token."""
mock_corporativo_api.get("/api/me").returns(status=401)
token = generate_corporativo_token()
with pytest.raises(JWTError, match="not valid in Corporativo"):
await corporativo_adapter.validate_token(token)undefinedBest Practices
最佳实践
Security
安全
- ✅ Always verify JWT signatures before trusting payload
- ✅ Implement backwards verification for external tokens
- ✅ Use RS256 (asymmetric) instead of HS256 for multi-service environments
- ✅ Rotate keys periodically
- ✅ Cache validation results with short TTL (30s)
- ✅ Implement graceful degradation on network failures
- ✅ Never log tokens or secrets
- ✅ 在信任令牌 payload 前始终验证JWT签名
- ✅ 对外部令牌实现反向验证
- ✅ 在多服务环境中使用RS256(非对称加密)而非HS256
- ✅ 定期轮换密钥
- ✅ 为校验结果设置短TTL缓存(30秒)
- ✅ 在网络故障时实现优雅降级
- ✅ 切勿记录令牌或密钥
Performance
性能
- ✅ Cache token validation results
- ✅ Use Redis for session storage
- ✅ Set reasonable timeouts for backwards verification
- ✅ Skip backwards verification for self-issued tokens
- ✅ Use connection pooling for HTTP clients
- ✅ Implement circuit breakers for external APIs
- ✅ 缓存令牌校验结果
- ✅ 使用Redis存储会话
- ✅ 为反向验证设置合理的超时时间
- ✅ 对自签发令牌跳过反向验证
- ✅ 为HTTP客户端使用连接池
- ✅ 为外部API实现断路器
Permission Mapping
权限映射
- ✅ Define clear permission mapping tables
- ✅ Support wildcard permissions for admins
- ✅ Provide default read permissions for authenticated users
- ✅ Map Portuguese permissions to English format
- ✅ Log permission mapping failures
- ✅ 定义清晰的权限映射表
- ✅ 为管理员支持通配符权限
- ✅ 为已认证用户提供默认读取权限
- ✅ 将葡萄牙语权限映射为英文格式
- ✅ 记录权限映射失败情况
Session Management
会话管理
- ✅ Use UUIDs for session IDs
- ✅ Set appropriate TTLs (8 hours default)
- ✅ Refresh TTL on session access
- ✅ Implement session cleanup on logout
- ✅ Support both token and session authentication
- ✅ 使用UUID作为会话ID
- ✅ 设置合适的TTL(默认8小时)
- ✅ 在会话访问时刷新TTL
- ✅ 在登出时实现会话清理
- ✅ 同时支持令牌与会话认证
Common Pitfalls
常见陷阱
-
Not Verifying Issuer/Audience
- ❌ Accepting any JWT without checking claims
- ✅ Manually verify iss and aud claims
-
Using HS256 in Multi-Service Environments
- ❌ Symmetric keys shared across services
- ✅ Use RS256 with public/private key pairs
-
No Backwards Verification
- ❌ Trusting JWT without checking with issuer
- ✅ Implement backwards verification for security
-
Hardcoded Permission Mappings
- ❌ Magic strings in code
- ✅ Use configuration/database for mappings
-
Not Handling Network Failures
- ❌ Failing all requests when SSO is down
- ✅ Implement graceful degradation with cache
-
Token Leakage in Logs
- ❌ Logging full tokens in error messages
- ✅ Log only token metadata (sub, iss)
-
未验证Issuer/Audience
- ❌ 不检查声明就接受任意JWT
- ✅ 手动验证iss和aud声明
-
在多服务环境中使用HS256
- ❌ 对称密钥在多服务间共享
- ✅ 使用带公钥/私钥对的RS256
-
未实现反向验证
- ❌ 信任JWT而不向签发系统校验
- ✅ 为安全性实现反向验证
-
硬编码权限映射
- ❌ 代码中使用魔法字符串
- ✅ 使用配置/数据库存储映射关系
-
未处理网络故障
- ❌ 当SSO不可用时所有请求失败
- ✅ 结合缓存实现优雅降级
-
日志中泄露令牌
- ❌ 在错误信息中记录完整令牌
- ✅ 仅记录令牌元数据(sub、iss)
Architecture Decisions
架构决策
Why Multi-Adapter Pattern?
为什么使用多适配器模式?
- Separation of Concerns: Each SSO system has its own adapter
- Extensibility: Easy to add new SSO providers
- Testability: Mock individual adapters independently
- Maintainability: Changes to one SSO don't affect others
- 关注点分离:每个SSO系统有独立的适配器
- 可扩展性:轻松添加新的SSO提供商
- 可测试性:可独立模拟各个适配器
- 可维护性:对一个SSO的修改不会影响其他系统
Why Backwards Verification?
为什么需要反向验证?
- Security: Prevent token replay attacks
- Session Validation: Check if user is still active
- Revocation Support: Detect revoked tokens
- Trust Verification: Confirm token with authoritative system
- 安全性:防止令牌重放攻击
- 会话校验:检查用户是否仍处于活跃状态
- 吊销支持:检测已吊销的令牌
- 信任校验:向权威系统确认令牌有效性
Why RS256 Over HS256?
为什么选择RS256而非HS256?
- Key Distribution: Public key can be shared safely
- Trust Boundary: Services verify without shared secret
- Rotation: Easier key rotation strategy
- Industry Standard: OAuth2/OIDC best practice
- 密钥分发:公钥可安全共享
- 信任边界:服务无需共享密钥即可完成校验
- 轮换:更简单的密钥轮换策略
- 行业标准:OAuth2/OIDC最佳实践
Production Deployment
生产部署
Key Management
密钥管理
bash
undefinedbash
undefinedProduction key generation
Production key generation
openssl genrsa -out jwt_private.pem 4096
openssl rsa -in jwt_private.pem -pubout -out jwt_public.pem
openssl genrsa -out jwt_private.pem 4096
openssl rsa -in jwt_private.pem -pubout -out jwt_public.pem
Secure storage (AWS Secrets Manager, HashiCorp Vault, etc.)
Secure storage (AWS Secrets Manager, HashiCorp Vault, etc.)
aws secretsmanager create-secret
--name gefin/jwt-private-key
--secret-string file://jwt_private.pem
--name gefin/jwt-private-key
--secret-string file://jwt_private.pem
undefinedaws secretsmanager create-secret
--name gefin/jwt-private-key
--secret-string file://jwt_private.pem
--name gefin/jwt-private-key
--secret-string file://jwt_private.pem
undefinedMonitoring
监控
python
undefinedpython
undefinedLog authentication events
Log authentication events
import structlog
logger = structlog.get_logger()
async def validate_token(self, token: str):
logger.info(
"token_validation_started",
issuer=self._get_issuer(token),
)
try:
user_data = await self._validate(token)
logger.info(
"token_validation_success",
user_id=user_data["sub"],
issuer=user_data["iss"],
)
return user_data
except JWTError as e:
logger.warning(
"token_validation_failed",
error=str(e),
)
raiseundefinedimport structlog
logger = structlog.get_logger()
async def validate_token(self, token: str):
logger.info(
"token_validation_started",
issuer=self._get_issuer(token),
)
try:
user_data = await self._validate(token)
logger.info(
"token_validation_success",
user_id=user_data["sub"],
issuer=user_data["iss"],
)
return user_data
except JWTError as e:
logger.warning(
"token_validation_failed",
error=str(e),
)
raiseundefinedReferences
参考资料
Production Examples
生产示例
Based on patterns from:
- GEFIN Backend: Multi-system SSO with Corporativo, SGF, GED, CARRINHO
- Enterprise SSO: JWT RS256 with backwards verification
- Laravel Integration: Session decryption for legacy systems
基于以下项目的模式:
- GEFIN Backend:集成Corporativo、SGF、GED、CARRINHO的多系统SSO
- Enterprise SSO:带反向验证的JWT RS256认证
- Laravel Integration:面向传统系统的会话解密功能