multi-system-sso-authentication

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Multi-System SSO Authentication Skill

多系统SSO认证技能

Overview

概述

This skill provides comprehensive patterns for implementing enterprise SSO authentication that supports multiple identity providers. It covers JWT RS256 token validation, backwards verification with authoritative systems, Laravel session decryption, permission mapping, and Redis session management.
本技能提供了实现支持多身份提供商的企业级SSO认证的完整模式,涵盖JWT RS256令牌校验、与权威系统的反向验证、Laravel会话解密、权限映射以及Redis会话管理等内容。

When to Use This Skill

适用场景

  • Integrating with multiple enterprise SSO systems
  • Implementing secure JWT token validation with backwards verification
  • Supporting legacy session-based authentication alongside JWT
  • Building unified authentication adapters for microservices
  • Mapping permissions across different systems
  • Implementing token introspection and revocation
  • Handling OAuth2 flows with multiple providers
  • 集成多个企业SSO系统
  • 实现带反向验证的安全JWT令牌校验
  • 同时支持基于会话的传统认证与JWT认证
  • 为微服务构建统一认证适配器
  • 实现不同系统间的权限映射
  • 实现令牌 introspection 与吊销功能
  • 处理多提供商的OAuth2流程

Core Concepts

核心概念

Authentication Architecture

认证架构

┌─────────────────────────────────────────────────────────┐
│                     Your Application                     │
│  ┌────────────────────────────────────────────────────┐ │
│  │          UnifiedAuthAdapter (Router)               │ │
│  │  ┌──────────────────────────────────────────────┐ │ │
│  │  │  Check token issuer (iss claim)              │ │ │
│  │  │  Route to appropriate adapter                │ │ │
│  │  └──────────────────────────────────────────────┘ │ │
│  │         ▼          ▼          ▼          ▼        │ │
│  │  ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│  │  │  CORP   │ │   SGF   │ │   GED   │ │ CARRINHO│ │ │
│  │  │ Adapter │ │ Adapter │ │ Adapter │ │ Adapter │ │ │
│  │  └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ │
│  └────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
         │             │             │             │
         ▼             ▼             ▼             ▼
  ┌───────────┐ ┌───────────┐ ┌───────────┐ ┌───────────┐
  │ Corporativo│ │    SGF    │ │    GED    │ │ Carrinho  │
  │    SSO    │ │    API    │ │    API    │ │    API    │
  └───────────┘ └───────────┘ └───────────┘ └───────────┘
┌─────────────────────────────────────────────────────────┐
│                     Your Application                     │
│  ┌────────────────────────────────────────────────────┐ │
│  │          UnifiedAuthAdapter (Router)               │ │
│  │  ┌──────────────────────────────────────────────┐ │ │
│  │  │  Check token issuer (iss claim)              │ │ │
│  │  │  Route to appropriate adapter                │ │ │
│  │  └──────────────────────────────────────────────┘ │ │
│  │         ▼          ▼          ▼          ▼        │ │
│  │  ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│  │  │  CORP   │ │   SGF   │ │   GED   │ │ CARRINHO│ │ │
│  │  │ Adapter │ │ Adapter │ │ Adapter │ │ Adapter │ │ │
│  │  └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ │
│  └────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
         │             │             │             │
         ▼             ▼             ▼             ▼
  ┌───────────┐ ┌───────────┐ ┌───────────┐ ┌───────────┐
  │ Corporativo│ │    SGF    │ │    GED    │ │ Carrinho  │
  │    SSO    │ │    API    │ │    API    │ │    API    │
  └───────────┘ └───────────┘ └───────────┘ └───────────┘

Token Flow

令牌流程

  1. User authenticates with external SSO system
  2. SSO system issues JWT with issuer (iss) and audience (aud) claims
  3. Your app receives token from request headers
  4. UnifiedAuthAdapter routes to appropriate adapter based on issuer
  5. Adapter validates JWT signature with public key
  6. Backwards verification checks token validity with issuing system
  7. Permissions mapped from SSO format to your app's format
  8. User session created in Redis for future requests
  1. 用户认证:用户通过外部SSO系统完成认证
  2. SSO系统签发JWT:包含issuer(iss)和audience(aud)声明
  3. 应用接收令牌:从请求头中获取令牌
  4. UnifiedAuthAdapter路由:根据issuer将令牌路由至对应的适配器
  5. 适配器校验令牌:使用公钥验证JWT签名
  6. 反向验证:向签发系统校验令牌有效性
  7. 权限映射:将SSO格式的权限转换为应用内部格式
  8. 创建用户会话:在Redis中创建用户会话,用于后续请求

Project Structure

项目结构

src/
├── api/
│   ├── middlewares/
│   │   └── auth.py              # AuthMiddleware
│   └── path/
│       └── auth.py              # Authentication endpoints
├── domain/
│   └── modules/
│       └── auth/
│           ├── entity.py        # User entity
│           ├── session.py       # Session management
│           └── permissions.py   # Permission definitions
└── infra/
    ├── adapters/
    │   └── auth/
    │       ├── unified_adapter.py      # Router for all adapters
    │       ├── corporativo_adapter.py  # Corporativo SSO
    │       ├── sgf_adapter.py          # SGF integration
    │       ├── ged_adapter.py          # GED integration
    │       └── carrinho_adapter.py     # Carrinho integration
    ├── cache/
    │   └── redis_session.py     # Redis session storage
    └── services/
        └── permission_mapper.py # Permission mapping
src/
├── api/
│   ├── middlewares/
│   │   └── auth.py              # AuthMiddleware
│   └── path/
│       └── auth.py              # Authentication endpoints
├── domain/
│   └── modules/
│       └── auth/
│           ├── entity.py        # User entity
│           ├── session.py       # Session management
│           └── permissions.py   # Permission definitions
└── infra/
    ├── adapters/
    │   └── auth/
    │       ├── unified_adapter.py      # Router for all adapters
    │       ├── corporativo_adapter.py  # Corporativo SSO
    │       ├── sgf_adapter.py          # SGF integration
    │       ├── ged_adapter.py          # GED integration
    │       └── carrinho_adapter.py     # Carrinho integration
    ├── cache/
    │   └── redis_session.py     # Redis session storage
    └── services/
        └── permission_mapper.py # Permission mapping

Implementation Patterns

实现模式

1. Unified Authentication Adapter (Router)

1. 统一认证适配器(路由型)

python
undefined
python
undefined

src/infra/adapters/auth/unified_adapter.py

src/infra/adapters/auth/unified_adapter.py

from typing import Dict, Any from jose import jwt, JWTError
from src.infra.adapters.auth.corporativo_adapter import CorporativoAuthAdapter from src.infra.adapters.auth.sgf_adapter import SGFAuthAdapter from src.infra.adapters.auth.ged_adapter import GEDAuthAdapter from src.infra.adapters.auth.carrinho_adapter import CarrinhoAuthAdapter from src.config.settings import app_settings
class UnifiedAuthAdapter: """Unified authentication adapter that routes tokens to appropriate SSO adapter.
Routes based on JWT issuer claim (iss).
"""

def __init__(
    self,
    corporativo_adapter: CorporativoAuthAdapter,
    sgf_adapter: SGFAuthAdapter,
    ged_adapter: GEDAuthAdapter,
    carrinho_adapter: CarrinhoAuthAdapter,
):
    self.adapters = {
        "corporativo": corporativo_adapter,
        "sgf": sgf_adapter,
        "ged": ged_adapter,
        "carrinho": carrinho_adapter,
    }

    # Map issuer URLs to adapter names
    self.issuer_map = {
        app_settings.CORPORATIVO_API_URL: "corporativo",
        app_settings.SGF_API_URL: "sgf",
        app_settings.GED_API_URL: "ged",
        app_settings.CARRINHO_API_URL: "carrinho",
        "gefin-backend": "corporativo",  # Self-issued tokens
    }

async def validate_token(self, token: str) -> Dict[str, Any]:
    """Validate token and route to appropriate adapter.

    Args:
        token: JWT token string

    Returns:
        User data dictionary with permissions

    Raises:
        JWTError: If token is invalid or from unknown issuer
    """
    # Decode without verification to check issuer
    try:
        unverified = jwt.get_unverified_claims(token)
        issuer = unverified.get("iss")
    except JWTError as e:
        raise JWTError(f"Invalid JWT format: {e}")

    # Map issuer to adapter
    adapter_name = self.issuer_map.get(issuer)
    if not adapter_name:
        raise JWTError(f"Unknown token issuer: {issuer}")

    # Check if adapter is enabled
    enabled_systems = app_settings.ENABLED_AUTH_SYSTEMS
    if adapter_name not in enabled_systems:
        raise JWTError(f"Authentication system '{adapter_name}' is disabled")

    # Route to appropriate adapter
    adapter = self.adapters[adapter_name]
    return await adapter.validate_token(token)

async def validate_session(self, session_id: str) -> Dict[str, Any]:
    """Validate session cookie (for legacy systems).

    Routes to Corporativo adapter (primary session provider).
    """
    return await self.adapters["corporativo"].validate_session(session_id)
undefined
from typing import Dict, Any from jose import jwt, JWTError
from src.infra.adapters.auth.corporativo_adapter import CorporativoAuthAdapter from src.infra.adapters.auth.sgf_adapter import SGFAuthAdapter from src.infra.adapters.auth.ged_adapter import GEDAuthAdapter from src.infra.adapters.auth.carrinho_adapter import CarrinhoAuthAdapter from src.config.settings import app_settings
class UnifiedAuthAdapter: """Unified authentication adapter that routes tokens to appropriate SSO adapter.
Routes based on JWT issuer claim (iss).
"""

def __init__(
    self,
    corporativo_adapter: CorporativoAuthAdapter,
    sgf_adapter: SGFAuthAdapter,
    ged_adapter: GEDAuthAdapter,
    carrinho_adapter: CarrinhoAuthAdapter,
):
    self.adapters = {
        "corporativo": corporativo_adapter,
        "sgf": sgf_adapter,
        "ged": ged_adapter,
        "carrinho": carrinho_adapter,
    }

    # Map issuer URLs to adapter names
    self.issuer_map = {
        app_settings.CORPORATIVO_API_URL: "corporativo",
        app_settings.SGF_API_URL: "sgf",
        app_settings.GED_API_URL: "ged",
        app_settings.CARRINHO_API_URL: "carrinho",
        "gefin-backend": "corporativo",  # Self-issued tokens
    }

async def validate_token(self, token: str) -> Dict[str, Any]:
    """Validate token and route to appropriate adapter.

    Args:
        token: JWT token string

    Returns:
        User data dictionary with permissions

    Raises:
        JWTError: If token is invalid or from unknown issuer
    """
    # Decode without verification to check issuer
    try:
        unverified = jwt.get_unverified_claims(token)
        issuer = unverified.get("iss")
    except JWTError as e:
        raise JWTError(f"Invalid JWT format: {e}")

    # Map issuer to adapter
    adapter_name = self.issuer_map.get(issuer)
    if not adapter_name:
        raise JWTError(f"Unknown token issuer: {issuer}")

    # Check if adapter is enabled
    enabled_systems = app_settings.ENABLED_AUTH_SYSTEMS
    if adapter_name not in enabled_systems:
        raise JWTError(f"Authentication system '{adapter_name}' is disabled")

    # Route to appropriate adapter
    adapter = self.adapters[adapter_name]
    return await adapter.validate_token(token)

async def validate_session(self, session_id: str) -> Dict[str, Any]:
    """Validate session cookie (for legacy systems).

    Routes to Corporativo adapter (primary session provider).
    """
    return await self.adapters["corporativo"].validate_session(session_id)
undefined

2. Base Auth Adapter Pattern

2. 基础认证适配器模式

python
undefined
python
undefined

src/infra/adapters/auth/base_adapter.py

src/infra/adapters/auth/base_adapter.py

from abc import ABC, abstractmethod from typing import Dict, Any
class IAuthAdapter(ABC): """Abstract base class for authentication adapters.
All SSO adapters must implement this interface.
"""

@abstractmethod
async def validate_token(self, token: str) -> Dict[str, Any]:
    """Validate JWT token and return user data.

    Args:
        token: JWT token string

    Returns:
        User data with permissions

    Raises:
        JWTError: If token is invalid
    """
    pass

@abstractmethod
async def validate_session(self, session_id: str) -> Dict[str, Any]:
    """Validate session ID and return user data.

    Args:
        session_id: Session identifier

    Returns:
        User data with permissions

    Raises:
        SessionError: If session is invalid
    """
    pass

@abstractmethod
def get_permissions(self, user_data: Dict[str, Any]) -> list[str]:
    """Extract and map permissions from user data.

    Args:
        user_data: User data from SSO system

    Returns:
        List of permission strings in app format
    """
    pass
undefined
from abc import ABC, abstractmethod from typing import Dict, Any
class IAuthAdapter(ABC): """Abstract base class for authentication adapters.
All SSO adapters must implement this interface.
"""

@abstractmethod
async def validate_token(self, token: str) -> Dict[str, Any]:
    """Validate JWT token and return user data.

    Args:
        token: JWT token string

    Returns:
        User data with permissions

    Raises:
        JWTError: If token is invalid
    """
    pass

@abstractmethod
async def validate_session(self, session_id: str) -> Dict[str, Any]:
    """Validate session ID and return user data.

    Args:
        session_id: Session identifier

    Returns:
        User data with permissions

    Raises:
        SessionError: If session is invalid
    """
    pass

@abstractmethod
def get_permissions(self, user_data: Dict[str, Any]) -> list[str]:
    """Extract and map permissions from user data.

    Args:
        user_data: User data from SSO system

    Returns:
        List of permission strings in app format
    """
    pass
undefined

3. JWT RS256 Token Validation with Backwards Verification

3. 带反向验证的JWT RS256令牌校验

python
undefined
python
undefined

src/infra/adapters/auth/corporativo_adapter.py

src/infra/adapters/auth/corporativo_adapter.py

import httpx from datetime import datetime, timedelta from jose import jwt, JWTError
from src.infra.adapters.auth.base_adapter import IAuthAdapter from src.infra.cache.redis_session import RedisSessionManager
class CorporativoAuthAdapter(IAuthAdapter): """Corporativo SSO authentication adapter.
Implements JWT RS256 validation with backwards verification.
"""

def __init__(
    self,
    public_key: str,
    private_key: str,
    api_url: str,
    session_manager: RedisSessionManager,
):
    self.public_key = public_key
    self.private_key = private_key
    self.api_url = api_url
    self.session_manager = session_manager
    self._validation_cache: Dict[str, tuple[Dict, datetime]] = {}
    self._cache_ttl = 30  # 30 seconds

async def validate_token(self, token: str) -> Dict[str, Any]:
    """Validate JWT token with backwards verification.

    Steps:
    1. Verify JWT signature with RSA public key
    2. Check issuer and audience claims
    3. Perform backwards verification with SSO system
    4. Map permissions to app format
    """
    try:
        # Verify signature and decode token
        payload = jwt.decode(
            token,
            self.public_key,
            algorithms=["RS256"],
            options={"verify_iss": False, "verify_aud": False},  # Manual validation
        )

        # Manual issuer validation
        accepted_issuers = ["gefin-backend", self.api_url]
        if payload.get("iss") not in accepted_issuers:
            raise JWTError(f"Invalid issuer: {payload.get('iss')}")

        # Manual audience validation
        accepted_audiences = ["gefin-api", "gefin"]
        aud = payload.get("aud")
        if isinstance(aud, list):
            if not any(a in accepted_audiences for a in aud):
                raise JWTError(f"Invalid audience: {aud}")
        elif aud not in accepted_audiences:
            raise JWTError(f"Invalid audience: {aud}")

        # Check expiration
        exp = payload.get("exp")
        if exp and datetime.fromtimestamp(exp) < datetime.now():
            raise JWTError("Token has expired")

        # Backwards verification (if not self-issued)
        if payload.get("iss") != "gefin-backend":
            await self._verify_with_corporativo(token, payload)

        return payload

    except JWTError as e:
        raise JWTError(f"Token validation failed: {e}")

async def _verify_with_corporativo(
    self,
    token: str,
    payload: Dict[str, Any]
) -> None:
    """Verify token validity with Corporativo SSO system.

    Implements backwards verification with caching.
    """
    # Check cache first
    cache_key = payload.get("sub")
    if cache_key in self._validation_cache:
        cached_data, cached_at = self._validation_cache[cache_key]
        if datetime.now() - cached_at < timedelta(seconds=self._cache_ttl):
            return  # Valid in cache

    # Call Corporativo /api/me endpoint
    headers = {"Authorization": f"Bearer {token}"}

    try:
        async with httpx.AsyncClient(timeout=5.0) as client:
            response = await client.get(
                f"{self.api_url}/api/me",
                headers=headers,
            )
            response.raise_for_status()

            # Cache validation result
            self._validation_cache[cache_key] = (payload, datetime.now())

    except httpx.HTTPStatusError as e:
        if e.response.status_code == 401:
            raise JWTError("Token is not valid in Corporativo system")
        # Network error - extend cache if exists
        if cache_key in self._validation_cache:
            cached_data, cached_at = self._validation_cache[cache_key]
            # Extend cache to 5 minutes on network failure
            if datetime.now() - cached_at < timedelta(minutes=5):
                return
        raise JWTError("Unable to verify token with Corporativo")

    except httpx.RequestError:
        # Network error - graceful degradation
        if cache_key in self._validation_cache:
            return
        raise JWTError("Network error verifying token")

def get_permissions(self, user_data: Dict[str, Any]) -> list[str]:
    """Map Corporativo permissions to app format.

    Example mapping:
        "Ver anuidade" -> "gefin.boleto.read"
        "Editar anuidade" -> "gefin.boleto.write"
    """
    corporativo_permissions = user_data.get("permissions", [])
    permission_map = {
        "Ver anuidade": "gefin.boleto.read",
        "Editar anuidade": "gefin.boleto.write",
        "Ver parcelamento": "gefin.parcela.read",
        "Editar parcelamento": "gefin.parcela.write",
        "Ver publicações": "gefin.publicacao.read",
        "Editar publicações": "gefin.publicacao.write",
        # ... more mappings
    }

    mapped_permissions = []
    for corp_perm in corporativo_permissions:
        if corp_perm == "*":  # Admin wildcard
            return ["*"]
        app_perm = permission_map.get(corp_perm)
        if app_perm:
            mapped_permissions.append(app_perm)

    # Ensure at least read permission
    if not any(p.endswith(".read") for p in mapped_permissions):
        mapped_permissions.append("gefin.user.read")

    return mapped_permissions

async def validate_session(self, session_id: str) -> Dict[str, Any]:
    """Validate session from Redis.

    Falls back to Laravel session decryption if Redis unavailable.
    """
    # Try Redis first
    session_data = await self.session_manager.get_session(session_id)
    if session_data:
        return session_data

    # Fall back to Laravel session decryption
    return await self._decrypt_laravel_session(session_id)

async def _decrypt_laravel_session(self, session_cookie: str) -> Dict[str, Any]:
    """Decrypt Laravel AES-256-CBC session cookie.

    Laravel session format:
    - base64(iv:encrypted_payload:mac)
    - Encrypted with APP_KEY from .env
    """
    # Implementation omitted for brevity
    # See Laravel session decryption pattern below
    pass
undefined
import httpx from datetime import datetime, timedelta from jose import jwt, JWTError
from src.infra.adapters.auth.base_adapter import IAuthAdapter from src.infra.cache.redis_session import RedisSessionManager
class CorporativoAuthAdapter(IAuthAdapter): """Corporativo SSO authentication adapter.
Implements JWT RS256 validation with backwards verification.
"""

def __init__(
    self,
    public_key: str,
    private_key: str,
    api_url: str,
    session_manager: RedisSessionManager,
):
    self.public_key = public_key
    self.private_key = private_key
    self.api_url = api_url
    self.session_manager = session_manager
    self._validation_cache: Dict[str, tuple[Dict, datetime]] = {}
    self._cache_ttl = 30  # 30 seconds

async def validate_token(self, token: str) -> Dict[str, Any]:
    """Validate JWT token with backwards verification.

    Steps:
    1. Verify JWT signature with RSA public key
    2. Check issuer and audience claims
    3. Perform backwards verification with SSO system
    4. Map permissions to app format
    """
    try:
        # Verify signature and decode token
        payload = jwt.decode(
            token,
            self.public_key,
            algorithms=["RS256"],
            options={"verify_iss": False, "verify_aud": False},  # Manual validation
        )

        # Manual issuer validation
        accepted_issuers = ["gefin-backend", self.api_url]
        if payload.get("iss") not in accepted_issuers:
            raise JWTError(f"Invalid issuer: {payload.get('iss')}")

        # Manual audience validation
        accepted_audiences = ["gefin-api", "gefin"]
        aud = payload.get("aud")
        if isinstance(aud, list):
            if not any(a in accepted_audiences for a in aud):
                raise JWTError(f"Invalid audience: {aud}")
        elif aud not in accepted_audiences:
            raise JWTError(f"Invalid audience: {aud}")

        # Check expiration
        exp = payload.get("exp")
        if exp and datetime.fromtimestamp(exp) < datetime.now():
            raise JWTError("Token has expired")

        # Backwards verification (if not self-issued)
        if payload.get("iss") != "gefin-backend":
            await self._verify_with_corporativo(token, payload)

        return payload

    except JWTError as e:
        raise JWTError(f"Token validation failed: {e}")

async def _verify_with_corporativo(
    self,
    token: str,
    payload: Dict[str, Any]
) -> None:
    """Verify token validity with Corporativo SSO system.

    Implements backwards verification with caching.
    """
    # Check cache first
    cache_key = payload.get("sub")
    if cache_key in self._validation_cache:
        cached_data, cached_at = self._validation_cache[cache_key]
        if datetime.now() - cached_at < timedelta(seconds=self._cache_ttl):
            return  # Valid in cache

    # Call Corporativo /api/me endpoint
    headers = {"Authorization": f"Bearer {token}"}

    try:
        async with httpx.AsyncClient(timeout=5.0) as client:
            response = await client.get(
                f"{self.api_url}/api/me",
                headers=headers,
            )
            response.raise_for_status()

            # Cache validation result
            self._validation_cache[cache_key] = (payload, datetime.now())

    except httpx.HTTPStatusError as e:
        if e.response.status_code == 401:
            raise JWTError("Token is not valid in Corporativo system")
        # Network error - extend cache if exists
        if cache_key in self._validation_cache:
            cached_data, cached_at = self._validation_cache[cache_key]
            # Extend cache to 5 minutes on network failure
            if datetime.now() - cached_at < timedelta(minutes=5):
                return
        raise JWTError("Unable to verify token with Corporativo")

    except httpx.RequestError:
        # Network error - graceful degradation
        if cache_key in self._validation_cache:
            return
        raise JWTError("Network error verifying token")

def get_permissions(self, user_data: Dict[str, Any]) -> list[str]:
    """Map Corporativo permissions to app format.

    Example mapping:
        "Ver anuidade" -> "gefin.boleto.read"
        "Editar anuidade" -> "gefin.boleto.write"
    """
    corporativo_permissions = user_data.get("permissions", [])
    permission_map = {
        "Ver anuidade": "gefin.boleto.read",
        "Editar anuidade": "gefin.boleto.write",
        "Ver parcelamento": "gefin.parcela.read",
        "Editar parcelamento": "gefin.parcela.write",
        "Ver publicações": "gefin.publicacao.read",
        "Editar publicações": "gefin.publicacao.write",
        # ... more mappings
    }

    mapped_permissions = []
    for corp_perm in corporativo_permissions:
        if corp_perm == "*":  # Admin wildcard
            return ["*"]
        app_perm = permission_map.get(corp_perm)
        if app_perm:
            mapped_permissions.append(app_perm)

    # Ensure at least read permission
    if not any(p.endswith(".read") for p in mapped_permissions):
        mapped_permissions.append("gefin.user.read")

    return mapped_permissions

async def validate_session(self, session_id: str) -> Dict[str, Any]:
    """Validate session from Redis.

    Falls back to Laravel session decryption if Redis unavailable.
    """
    # Try Redis first
    session_data = await self.session_manager.get_session(session_id)
    if session_data:
        return session_data

    # Fall back to Laravel session decryption
    return await self._decrypt_laravel_session(session_id)

async def _decrypt_laravel_session(self, session_cookie: str) -> Dict[str, Any]:
    """Decrypt Laravel AES-256-CBC session cookie.

    Laravel session format:
    - base64(iv:encrypted_payload:mac)
    - Encrypted with APP_KEY from .env
    """
    # Implementation omitted for brevity
    # See Laravel session decryption pattern below
    pass
undefined

4. Laravel Session Decryption

4. Laravel会话解密

python
undefined
python
undefined

src/infra/adapters/auth/laravel_session.py

src/infra/adapters/auth/laravel_session.py

import base64 import json import hashlib import hmac from Cryptodome.Cipher import AES from Cryptodome.Util.Padding import unpad import phpserialize
class LaravelSessionDecryptor: """Decrypt Laravel AES-256-CBC encrypted sessions.
Handles Laravel's session encryption format.
"""

def __init__(self, app_key: str):
    """Initialize with Laravel APP_KEY.

    Args:
        app_key: Laravel APP_KEY from .env (base64: prefix)
    """
    # Remove 'base64:' prefix if present
    if app_key.startswith("base64:"):
        app_key = app_key[7:]

    self.key = base64.b64decode(app_key)

def decrypt(self, encrypted_value: str) -> str:
    """Decrypt Laravel encrypted value.

    Format: base64(json({"iv": "...", "value": "...", "mac": "..."}))
    """
    # Decode base64
    decoded = base64.b64decode(encrypted_value)
    payload = json.loads(decoded)

    # Verify MAC signature
    if not self._valid_mac(payload):
        raise ValueError("Invalid MAC signature")

    # Decrypt
    iv = base64.b64decode(payload["iv"])
    encrypted = base64.b64decode(payload["value"])

    cipher = AES.new(self.key, AES.MODE_CBC, iv)
    decrypted = unpad(cipher.decrypt(encrypted), AES.block_size)

    return decrypted.decode("utf-8")

def _valid_mac(self, payload: dict) -> bool:
    """Verify MAC signature."""
    mac = payload.get("mac")
    if not mac:
        return False

    # Calculate expected MAC
    message = base64.b64encode(
        json.dumps({"iv": payload["iv"], "value": payload["value"]}).encode()
    )
    expected_mac = hmac.new(
        self.key,
        message,
        hashlib.sha256,
    ).hexdigest()

    return hmac.compare_digest(mac, expected_mac)

def decrypt_session(self, session_cookie: str) -> dict:
    """Decrypt Laravel session cookie and extract user data.

    Args:
        session_cookie: Laravel session cookie value

    Returns:
        Dictionary with user_id and other session data
    """
    # Decrypt session
    decrypted = self.decrypt(session_cookie)

    # Unserialize PHP session data
    session_data = phpserialize.loads(decrypted.encode())

    # Extract user ID from various Laravel guard patterns
    user_id = None

    # Pattern 1: login_web_{guard}_*
    for key in session_data:
        if isinstance(key, bytes):
            key_str = key.decode()
            if key_str.startswith("login_web_"):
                user_id = session_data[key]
                break

    # Pattern 2: Direct user_id key
    if not user_id and b"user_id" in session_data:
        user_id = session_data[b"user_id"]

    if not user_id:
        raise ValueError("No user_id found in session")

    return {
        "user_id": user_id.decode() if isinstance(user_id, bytes) else user_id,
        "session_data": session_data,
    }
undefined
import base64 import json import hashlib import hmac from Cryptodome.Cipher import AES from Cryptodome.Util.Padding import unpad import phpserialize
class LaravelSessionDecryptor: """Decrypt Laravel AES-256-CBC encrypted sessions.
Handles Laravel's session encryption format.
"""

def __init__(self, app_key: str):
    """Initialize with Laravel APP_KEY.

    Args:
        app_key: Laravel APP_KEY from .env (base64: prefix)
    """
    # Remove 'base64:' prefix if present
    if app_key.startswith("base64:"):
        app_key = app_key[7:]

    self.key = base64.b64decode(app_key)

def decrypt(self, encrypted_value: str) -> str:
    """Decrypt Laravel encrypted value.

    Format: base64(json({"iv": "...", "value": "...", "mac": "..."}))
    """
    # Decode base64
    decoded = base64.b64decode(encrypted_value)
    payload = json.loads(decoded)

    # Verify MAC signature
    if not self._valid_mac(payload):
        raise ValueError("Invalid MAC signature")

    # Decrypt
    iv = base64.b64decode(payload["iv"])
    encrypted = base64.b64decode(payload["value"])

    cipher = AES.new(self.key, AES.MODE_CBC, iv)
    decrypted = unpad(cipher.decrypt(encrypted), AES.block_size)

    return decrypted.decode("utf-8")

def _valid_mac(self, payload: dict) -> bool:
    """Verify MAC signature."""
    mac = payload.get("mac")
    if not mac:
        return False

    # Calculate expected MAC
    message = base64.b64encode(
        json.dumps({"iv": payload["iv"], "value": payload["value"]}).encode()
    )
    expected_mac = hmac.new(
        self.key,
        message,
        hashlib.sha256,
    ).hexdigest()

    return hmac.compare_digest(mac, expected_mac)

def decrypt_session(self, session_cookie: str) -> dict:
    """Decrypt Laravel session cookie and extract user data.

    Args:
        session_cookie: Laravel session cookie value

    Returns:
        Dictionary with user_id and other session data
    """
    # Decrypt session
    decrypted = self.decrypt(session_cookie)

    # Unserialize PHP session data
    session_data = phpserialize.loads(decrypted.encode())

    # Extract user ID from various Laravel guard patterns
    user_id = None

    # Pattern 1: login_web_{guard}_*
    for key in session_data:
        if isinstance(key, bytes):
            key_str = key.decode()
            if key_str.startswith("login_web_"):
                user_id = session_data[key]
                break

    # Pattern 2: Direct user_id key
    if not user_id and b"user_id" in session_data:
        user_id = session_data[b"user_id"]

    if not user_id:
        raise ValueError("No user_id found in session")

    return {
        "user_id": user_id.decode() if isinstance(user_id, bytes) else user_id,
        "session_data": session_data,
    }
undefined

5. Redis Session Management

5. Redis会话管理

python
undefined
python
undefined

src/infra/cache/redis_session.py

src/infra/cache/redis_session.py

import json from datetime import timedelta from redis.asyncio import Redis
class RedisSessionManager: """Manage user sessions in Redis.
Stores session data with TTL for automatic expiration.
"""

def __init__(self, redis_client: Redis, ttl_seconds: int = 28800):
    """Initialize session manager.

    Args:
        redis_client: Async Redis client
        ttl_seconds: Session TTL (default 8 hours)
    """
    self.redis = redis_client
    self.ttl = ttl_seconds

async def create_session(self, user_data: dict) -> str:
    """Create new session and return session ID.

    Args:
        user_data: User data to store

    Returns:
        Session ID (UUID)
    """
    import uuid
    session_id = str(uuid.uuid4())

    # Store in Redis
    session_key = f"session:{session_id}"
    await self.redis.setex(
        session_key,
        self.ttl,
        json.dumps(user_data),
    )

    return session_id

async def get_session(self, session_id: str) -> dict | None:
    """Retrieve session data.

    Args:
        session_id: Session identifier

    Returns:
        User data dictionary or None if not found
    """
    session_key = f"session:{session_id}"
    data = await self.redis.get(session_key)

    if not data:
        return None

    # Refresh TTL on access
    await self.redis.expire(session_key, self.ttl)

    return json.loads(data)

async def delete_session(self, session_id: str) -> bool:
    """Delete session.

    Args:
        session_id: Session identifier

    Returns:
        True if deleted, False if not found
    """
    session_key = f"session:{session_id}"
    result = await self.redis.delete(session_key)
    return result > 0

async def update_session(self, session_id: str, user_data: dict) -> bool:
    """Update existing session data.

    Args:
        session_id: Session identifier
        user_data: Updated user data

    Returns:
        True if updated, False if session not found
    """
    session_key = f"session:{session_id}"
    exists = await self.redis.exists(session_key)

    if not exists:
        return False

    await self.redis.setex(
        session_key,
        self.ttl,
        json.dumps(user_data),
    )
    return True
undefined
import json from datetime import timedelta from redis.asyncio import Redis
class RedisSessionManager: """Manage user sessions in Redis.
Stores session data with TTL for automatic expiration.
"""

def __init__(self, redis_client: Redis, ttl_seconds: int = 28800):
    """Initialize session manager.

    Args:
        redis_client: Async Redis client
        ttl_seconds: Session TTL (default 8 hours)
    """
    self.redis = redis_client
    self.ttl = ttl_seconds

async def create_session(self, user_data: dict) -> str:
    """Create new session and return session ID.

    Args:
        user_data: User data to store

    Returns:
        Session ID (UUID)
    """
    import uuid
    session_id = str(uuid.uuid4())

    # Store in Redis
    session_key = f"session:{session_id}"
    await self.redis.setex(
        session_key,
        self.ttl,
        json.dumps(user_data),
    )

    return session_id

async def get_session(self, session_id: str) -> dict | None:
    """Retrieve session data.

    Args:
        session_id: Session identifier

    Returns:
        User data dictionary or None if not found
    """
    session_key = f"session:{session_id}"
    data = await self.redis.get(session_key)

    if not data:
        return None

    # Refresh TTL on access
    await self.redis.expire(session_key, self.ttl)

    return json.loads(data)

async def delete_session(self, session_id: str) -> bool:
    """Delete session.

    Args:
        session_id: Session identifier

    Returns:
        True if deleted, False if not found
    """
    session_key = f"session:{session_id}"
    result = await self.redis.delete(session_key)
    return result > 0

async def update_session(self, session_id: str, user_data: dict) -> bool:
    """Update existing session data.

    Args:
        session_id: Session identifier
        user_data: Updated user data

    Returns:
        True if updated, False if session not found
    """
    session_key = f"session:{session_id}"
    exists = await self.redis.exists(session_key)

    if not exists:
        return False

    await self.redis.setex(
        session_key,
        self.ttl,
        json.dumps(user_data),
    )
    return True
undefined

6. Permission Checking Middleware

6. 权限校验中间件

python
undefined
python
undefined

src/api/middlewares/auth.py

src/api/middlewares/auth.py

from fastapi import Request, HTTPException, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from src.infra.adapters.auth.unified_adapter import UnifiedAuthAdapter
security = HTTPBearer()
class ProtectedResource: """FastAPI dependency for protected endpoints.
Usage:
    @app.get("/protected", dependencies=[Depends(ProtectedResource.check)])
"""

def __init__(self, unified_adapter: UnifiedAuthAdapter):
    self.unified_adapter = unified_adapter

async def check(
    self,
    credentials: HTTPAuthorizationCredentials = Depends(security),
) -> dict:
    """Validate token and return user data.

    Raises:
        HTTPException: 401 if token invalid, 403 if insufficient permissions
    """
    token = credentials.credentials

    try:
        user_data = await self.unified_adapter.validate_token(token)
        return user_data
    except JWTError as e:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail=str(e),
        )

async def check_permissions(
    self,
    credentials: HTTPAuthorizationCredentials,
    required_permissions: list[str],
) -> dict:
    """Validate token and check permissions.

    Args:
        credentials: Bearer token
        required_permissions: List of required permissions

    Returns:
        User data if authorized

    Raises:
        HTTPException: 401 unauthorized, 403 forbidden
    """
    user_data = await self.check(credentials)
    user_permissions = user_data.get("permissions", [])

    # Check for admin wildcard
    if "*" in user_permissions:
        return user_data

    # Check required permissions
    has_permission = any(
        perm in user_permissions for perm in required_permissions
    )

    if not has_permission:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail=f"Missing required permissions: {required_permissions}",
        )

    return user_data
undefined
from fastapi import Request, HTTPException, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from src.infra.adapters.auth.unified_adapter import UnifiedAuthAdapter
security = HTTPBearer()
class ProtectedResource: """FastAPI dependency for protected endpoints.
Usage:
    @app.get("/protected", dependencies=[Depends(ProtectedResource.check)])
"""

def __init__(self, unified_adapter: UnifiedAuthAdapter):
    self.unified_adapter = unified_adapter

async def check(
    self,
    credentials: HTTPAuthorizationCredentials = Depends(security),
) -> dict:
    """Validate token and return user data.

    Raises:
        HTTPException: 401 if token invalid, 403 if insufficient permissions
    """
    token = credentials.credentials

    try:
        user_data = await self.unified_adapter.validate_token(token)
        return user_data
    except JWTError as e:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail=str(e),
        )

async def check_permissions(
    self,
    credentials: HTTPAuthorizationCredentials,
    required_permissions: list[str],
) -> dict:
    """Validate token and check permissions.

    Args:
        credentials: Bearer token
        required_permissions: List of required permissions

    Returns:
        User data if authorized

    Raises:
        HTTPException: 401 unauthorized, 403 forbidden
    """
    user_data = await self.check(credentials)
    user_permissions = user_data.get("permissions", [])

    # Check for admin wildcard
    if "*" in user_permissions:
        return user_data

    # Check required permissions
    has_permission = any(
        perm in user_permissions for perm in required_permissions
    )

    if not has_permission:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail=f"Missing required permissions: {required_permissions}",
        )

    return user_data
undefined

7. Multi-System Authentication Endpoints

7. 多系统认证端点

python
undefined
python
undefined

src/api/path/auth.py

src/api/path/auth.py

from fastapi import APIRouter, Depends, HTTPException, status from pydantic import BaseModel
from src.infra.adapters.auth.unified_adapter import UnifiedAuthAdapter from src.infra.cache.redis_session import RedisSessionManager
router = APIRouter(prefix="/v1/auth", tags=["auth"])
class LoginRequest(BaseModel): username: str password: str
class SSOLoginRequest(BaseModel): corporativo_session: str # Cookie from Corporativo
class TokenResponse(BaseModel): access_token: str token_type: str = "bearer" expires_in: int
@router.post("/login", response_model=TokenResponse) async def login( request: LoginRequest, adapter: UnifiedAuthAdapter = Depends(), ): """Login with username/password (Corporativo).
Returns JWT access token.
"""
# Delegate to Corporativo adapter
result = await adapter.adapters["corporativo"].authenticate_credentials(
    username=request.username,
    password=request.password,
)

return TokenResponse(
    access_token=result["access_token"],
    expires_in=3600,
)
@router.post("/sso-login", response_model=TokenResponse) async def sso_login( request: SSOLoginRequest, adapter: UnifiedAuthAdapter = Depends(), session_manager: RedisSessionManager = Depends(), ): """SSO login using Corporativo session cookie.
Validates session, creates local session, returns JWT.
"""
# Validate Corporativo session
user_data = await adapter.validate_session(request.corporativo_session)

# Create local session
session_id = await session_manager.create_session(user_data)

# Generate JWT
token = adapter.adapters["corporativo"].generate_token(user_data)

return TokenResponse(
    access_token=token,
    expires_in=3600,
)
@router.get("/me") async def get_current_user( user_data: dict = Depends(ProtectedResource.check), ): """Get current authenticated user info.""" return { "cpf": user_data.get("sub"), "name": user_data.get("name"), "email": user_data.get("email"), "permissions": user_data.get("permissions"), "systems": user_data.get("systems", []), }
@router.post("/logout") async def logout( session_id: str, session_manager: RedisSessionManager = Depends(), ): """Logout and invalidate session.""" await session_manager.delete_session(session_id) return {"message": "Logged out successfully"}
undefined
from fastapi import APIRouter, Depends, HTTPException, status from pydantic import BaseModel
from src.infra.adapters.auth.unified_adapter import UnifiedAuthAdapter from src.infra.cache.redis_session import RedisSessionManager
router = APIRouter(prefix="/v1/auth", tags=["auth"])
class LoginRequest(BaseModel): username: str password: str
class SSOLoginRequest(BaseModel): corporativo_session: str # Cookie from Corporativo
class TokenResponse(BaseModel): access_token: str token_type: str = "bearer" expires_in: int
@router.post("/login", response_model=TokenResponse) async def login( request: LoginRequest, adapter: UnifiedAuthAdapter = Depends(), ): """Login with username/password (Corporativo).
Returns JWT access token.
"""
# Delegate to Corporativo adapter
result = await adapter.adapters["corporativo"].authenticate_credentials(
    username=request.username,
    password=request.password,
)

return TokenResponse(
    access_token=result["access_token"],
    expires_in=3600,
)
@router.post("/sso-login", response_model=TokenResponse) async def sso_login( request: SSOLoginRequest, adapter: UnifiedAuthAdapter = Depends(), session_manager: RedisSessionManager = Depends(), ): """SSO login using Corporativo session cookie.
Validates session, creates local session, returns JWT.
"""
# Validate Corporativo session
user_data = await adapter.validate_session(request.corporativo_session)

# Create local session
session_id = await session_manager.create_session(user_data)

# Generate JWT
token = adapter.adapters["corporativo"].generate_token(user_data)

return TokenResponse(
    access_token=token,
    expires_in=3600,
)
@router.get("/me") async def get_current_user( user_data: dict = Depends(ProtectedResource.check), ): """Get current authenticated user info.""" return { "cpf": user_data.get("sub"), "name": user_data.get("name"), "email": user_data.get("email"), "permissions": user_data.get("permissions"), "systems": user_data.get("systems", []), }
@router.post("/logout") async def logout( session_id: str, session_manager: RedisSessionManager = Depends(), ): """Logout and invalidate session.""" await session_manager.delete_session(session_id) return {"message": "Logged out successfully"}
undefined

Configuration

配置

Environment Variables

环境变量

python
undefined
python
undefined

src/config/settings.py

src/config/settings.py

from pydantic_settings import BaseSettings
class AppSettings(BaseSettings): """Multi-system authentication settings."""
# Feature flags
ENABLE_MULTI_SYSTEM_AUTH: bool = True
ENABLED_AUTH_SYSTEMS: list[str] = ["corporativo", "sgf", "ged", "carrinho"]

# JWT configuration
JWT_ALGORITHM: str = "RS256"
JWT_PUBLIC_KEY_PATH: str = "./keys/jwt_public.pem"
JWT_PRIVATE_KEY_PATH: str = "./keys/jwt_private.pem"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 60
REFRESH_TOKEN_EXPIRE_HOURS: int = 8

# SSO systems
CORPORATIVO_API_URL: str
CORPORATIVO_APP_KEY: str  # Laravel APP_KEY for session decryption

SGF_API_URL: str
SGF_API_KEY: str

GED_API_URL: str
GED_API_KEY: str

CARRINHO_API_URL: str
CARRINHO_API_KEY: str

# Redis
REDIS_URL: str = "redis://localhost:6379/0"
SESSION_TTL_SECONDS: int = 28800  # 8 hours

# Backwards verification
ENABLE_BACKWARDS_VERIFICATION: bool = True
VERIFICATION_CACHE_TTL: int = 30  # seconds
VERIFICATION_TIMEOUT: int = 5  # seconds
app_settings = AppSettings()
undefined
from pydantic_settings import BaseSettings
class AppSettings(BaseSettings): """Multi-system authentication settings."""
# Feature flags
ENABLE_MULTI_SYSTEM_AUTH: bool = True
ENABLED_AUTH_SYSTEMS: list[str] = ["corporativo", "sgf", "ged", "carrinho"]

# JWT configuration
JWT_ALGORITHM: str = "RS256"
JWT_PUBLIC_KEY_PATH: str = "./keys/jwt_public.pem"
JWT_PRIVATE_KEY_PATH: str = "./keys/jwt_private.pem"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 60
REFRESH_TOKEN_EXPIRE_HOURS: int = 8

# SSO systems
CORPORATIVO_API_URL: str
CORPORATIVO_APP_KEY: str  # Laravel APP_KEY for session decryption

SGF_API_URL: str
SGF_API_KEY: str

GED_API_URL: str
GED_API_KEY: str

CARRINHO_API_URL: str
CARRINHO_API_KEY: str

# Redis
REDIS_URL: str = "redis://localhost:6379/0"
SESSION_TTL_SECONDS: int = 28800  # 8 hours

# Backwards verification
ENABLE_BACKWARDS_VERIFICATION: bool = True
VERIFICATION_CACHE_TTL: int = 30  # seconds
VERIFICATION_TIMEOUT: int = 5  # seconds
app_settings = AppSettings()
undefined

RSA Key Pair Management

RSA密钥对管理

bash
undefined
bash
undefined

Generate RSA key pair for JWT signing

Generate RSA key pair for JWT signing

openssl genrsa -out keys/jwt_private.pem 4096 openssl rsa -in keys/jwt_private.pem -pubout -out keys/jwt_public.pem
openssl genrsa -out keys/jwt_private.pem 4096 openssl rsa -in keys/jwt_private.pem -pubout -out keys/jwt_public.pem

Set proper permissions

Set proper permissions

chmod 600 keys/jwt_private.pem chmod 644 keys/jwt_public.pem
chmod 600 keys/jwt_private.pem chmod 644 keys/jwt_public.pem

Add to .gitignore

Add to .gitignore

echo "keys/jwt_private.pem" >> .gitignore
undefined
echo "keys/jwt_private.pem" >> .gitignore
undefined

Testing Strategy

测试策略

Unit Tests (Token Validation)

单元测试(令牌校验)

python
undefined
python
undefined

tests/infra/adapters/auth/test_corporativo_adapter.py

tests/infra/adapters/auth/test_corporativo_adapter.py

import pytest from jose import jwt from datetime import datetime, timedelta
@pytest.fixture def valid_token(private_key): """Generate valid JWT token.""" payload = { "sub": "12345678901", "name": "Test User", "email": "test@example.com", "permissions": ["gefin.boleto.read"], "iss": "gefin-backend", "aud": "gefin-api", "exp": datetime.utcnow() + timedelta(hours=1), } return jwt.encode(payload, private_key, algorithm="RS256")
@pytest.mark.asyncio async def test_validate_token_success(corporativo_adapter, valid_token): """Test successful token validation.""" user_data = await corporativo_adapter.validate_token(valid_token)
assert user_data["sub"] == "12345678901"
assert "gefin.boleto.read" in user_data["permissions"]
@pytest.mark.asyncio async def test_validate_token_invalid_signature(corporativo_adapter): """Test token with invalid signature.""" invalid_token = "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.invalid.signature"
with pytest.raises(JWTError):
    await corporativo_adapter.validate_token(invalid_token)
@pytest.mark.asyncio async def test_validate_token_expired(corporativo_adapter, private_key): """Test expired token.""" payload = { "sub": "12345678901", "exp": datetime.utcnow() - timedelta(hours=1), # Expired "iss": "gefin-backend", "aud": "gefin-api", } expired_token = jwt.encode(payload, private_key, algorithm="RS256")
with pytest.raises(JWTError, match="expired"):
    await corporativo_adapter.validate_token(expired_token)
undefined
import pytest from jose import jwt from datetime import datetime, timedelta
@pytest.fixture def valid_token(private_key): """Generate valid JWT token.""" payload = { "sub": "12345678901", "name": "Test User", "email": "test@example.com", "permissions": ["gefin.boleto.read"], "iss": "gefin-backend", "aud": "gefin-api", "exp": datetime.utcnow() + timedelta(hours=1), } return jwt.encode(payload, private_key, algorithm="RS256")
@pytest.mark.asyncio async def test_validate_token_success(corporativo_adapter, valid_token): """Test successful token validation.""" user_data = await corporativo_adapter.validate_token(valid_token)
assert user_data["sub"] == "12345678901"
assert "gefin.boleto.read" in user_data["permissions"]
@pytest.mark.asyncio async def test_validate_token_invalid_signature(corporativo_adapter): """Test token with invalid signature.""" invalid_token = "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.invalid.signature"
with pytest.raises(JWTError):
    await corporativo_adapter.validate_token(invalid_token)
@pytest.mark.asyncio async def test_validate_token_expired(corporativo_adapter, private_key): """Test expired token.""" payload = { "sub": "12345678901", "exp": datetime.utcnow() - timedelta(hours=1), # Expired "iss": "gefin-backend", "aud": "gefin-api", } expired_token = jwt.encode(payload, private_key, algorithm="RS256")
with pytest.raises(JWTError, match="expired"):
    await corporativo_adapter.validate_token(expired_token)
undefined

Integration Tests (Backwards Verification)

集成测试(反向验证)

python
undefined
python
undefined

tests/integration/test_backwards_verification.py

tests/integration/test_backwards_verification.py

@pytest.mark.asyncio async def test_backwards_verification_valid_token( corporativo_adapter, mock_corporativo_api, ): """Test backwards verification with valid token.""" # Mock Corporativo /api/me endpoint mock_corporativo_api.get("/api/me").returns( status=200, json={"cpf": "12345678901", "name": "Test User"}, )
token = generate_corporativo_token()
user_data = await corporativo_adapter.validate_token(token)

assert user_data["sub"] == "12345678901"
@pytest.mark.asyncio async def test_backwards_verification_invalid_token( corporativo_adapter, mock_corporativo_api, ): """Test backwards verification with invalid token.""" mock_corporativo_api.get("/api/me").returns(status=401)
token = generate_corporativo_token()

with pytest.raises(JWTError, match="not valid in Corporativo"):
    await corporativo_adapter.validate_token(token)
undefined
@pytest.mark.asyncio async def test_backwards_verification_valid_token( corporativo_adapter, mock_corporativo_api, ): """Test backwards verification with valid token.""" # Mock Corporativo /api/me endpoint mock_corporativo_api.get("/api/me").returns( status=200, json={"cpf": "12345678901", "name": "Test User"}, )
token = generate_corporativo_token()
user_data = await corporativo_adapter.validate_token(token)

assert user_data["sub"] == "12345678901"
@pytest.mark.asyncio async def test_backwards_verification_invalid_token( corporativo_adapter, mock_corporativo_api, ): """Test backwards verification with invalid token.""" mock_corporativo_api.get("/api/me").returns(status=401)
token = generate_corporativo_token()

with pytest.raises(JWTError, match="not valid in Corporativo"):
    await corporativo_adapter.validate_token(token)
undefined

Best Practices

最佳实践

Security

安全

  • ✅ Always verify JWT signatures before trusting payload
  • ✅ Implement backwards verification for external tokens
  • ✅ Use RS256 (asymmetric) instead of HS256 for multi-service environments
  • ✅ Rotate keys periodically
  • ✅ Cache validation results with short TTL (30s)
  • ✅ Implement graceful degradation on network failures
  • ✅ Never log tokens or secrets
  • ✅ 在信任令牌 payload 前始终验证JWT签名
  • ✅ 对外部令牌实现反向验证
  • ✅ 在多服务环境中使用RS256(非对称加密)而非HS256
  • ✅ 定期轮换密钥
  • ✅ 为校验结果设置短TTL缓存(30秒)
  • ✅ 在网络故障时实现优雅降级
  • ✅ 切勿记录令牌或密钥

Performance

性能

  • ✅ Cache token validation results
  • ✅ Use Redis for session storage
  • ✅ Set reasonable timeouts for backwards verification
  • ✅ Skip backwards verification for self-issued tokens
  • ✅ Use connection pooling for HTTP clients
  • ✅ Implement circuit breakers for external APIs
  • ✅ 缓存令牌校验结果
  • ✅ 使用Redis存储会话
  • ✅ 为反向验证设置合理的超时时间
  • ✅ 对自签发令牌跳过反向验证
  • ✅ 为HTTP客户端使用连接池
  • ✅ 为外部API实现断路器

Permission Mapping

权限映射

  • ✅ Define clear permission mapping tables
  • ✅ Support wildcard permissions for admins
  • ✅ Provide default read permissions for authenticated users
  • ✅ Map Portuguese permissions to English format
  • ✅ Log permission mapping failures
  • ✅ 定义清晰的权限映射表
  • ✅ 为管理员支持通配符权限
  • ✅ 为已认证用户提供默认读取权限
  • ✅ 将葡萄牙语权限映射为英文格式
  • ✅ 记录权限映射失败情况

Session Management

会话管理

  • ✅ Use UUIDs for session IDs
  • ✅ Set appropriate TTLs (8 hours default)
  • ✅ Refresh TTL on session access
  • ✅ Implement session cleanup on logout
  • ✅ Support both token and session authentication
  • ✅ 使用UUID作为会话ID
  • ✅ 设置合适的TTL(默认8小时)
  • ✅ 在会话访问时刷新TTL
  • ✅ 在登出时实现会话清理
  • ✅ 同时支持令牌与会话认证

Common Pitfalls

常见陷阱

  1. Not Verifying Issuer/Audience
    • ❌ Accepting any JWT without checking claims
    • ✅ Manually verify iss and aud claims
  2. Using HS256 in Multi-Service Environments
    • ❌ Symmetric keys shared across services
    • ✅ Use RS256 with public/private key pairs
  3. No Backwards Verification
    • ❌ Trusting JWT without checking with issuer
    • ✅ Implement backwards verification for security
  4. Hardcoded Permission Mappings
    • ❌ Magic strings in code
    • ✅ Use configuration/database for mappings
  5. Not Handling Network Failures
    • ❌ Failing all requests when SSO is down
    • ✅ Implement graceful degradation with cache
  6. Token Leakage in Logs
    • ❌ Logging full tokens in error messages
    • ✅ Log only token metadata (sub, iss)
  1. 未验证Issuer/Audience
    • ❌ 不检查声明就接受任意JWT
    • ✅ 手动验证iss和aud声明
  2. 在多服务环境中使用HS256
    • ❌ 对称密钥在多服务间共享
    • ✅ 使用带公钥/私钥对的RS256
  3. 未实现反向验证
    • ❌ 信任JWT而不向签发系统校验
    • ✅ 为安全性实现反向验证
  4. 硬编码权限映射
    • ❌ 代码中使用魔法字符串
    • ✅ 使用配置/数据库存储映射关系
  5. 未处理网络故障
    • ❌ 当SSO不可用时所有请求失败
    • ✅ 结合缓存实现优雅降级
  6. 日志中泄露令牌
    • ❌ 在错误信息中记录完整令牌
    • ✅ 仅记录令牌元数据(sub、iss)

Architecture Decisions

架构决策

Why Multi-Adapter Pattern?

为什么使用多适配器模式?

  • Separation of Concerns: Each SSO system has its own adapter
  • Extensibility: Easy to add new SSO providers
  • Testability: Mock individual adapters independently
  • Maintainability: Changes to one SSO don't affect others
  • 关注点分离:每个SSO系统有独立的适配器
  • 可扩展性:轻松添加新的SSO提供商
  • 可测试性:可独立模拟各个适配器
  • 可维护性:对一个SSO的修改不会影响其他系统

Why Backwards Verification?

为什么需要反向验证?

  • Security: Prevent token replay attacks
  • Session Validation: Check if user is still active
  • Revocation Support: Detect revoked tokens
  • Trust Verification: Confirm token with authoritative system
  • 安全性:防止令牌重放攻击
  • 会话校验:检查用户是否仍处于活跃状态
  • 吊销支持:检测已吊销的令牌
  • 信任校验:向权威系统确认令牌有效性

Why RS256 Over HS256?

为什么选择RS256而非HS256?

  • Key Distribution: Public key can be shared safely
  • Trust Boundary: Services verify without shared secret
  • Rotation: Easier key rotation strategy
  • Industry Standard: OAuth2/OIDC best practice
  • 密钥分发:公钥可安全共享
  • 信任边界:服务无需共享密钥即可完成校验
  • 轮换:更简单的密钥轮换策略
  • 行业标准:OAuth2/OIDC最佳实践

Production Deployment

生产部署

Key Management

密钥管理

bash
undefined
bash
undefined

Production key generation

Production key generation

openssl genrsa -out jwt_private.pem 4096 openssl rsa -in jwt_private.pem -pubout -out jwt_public.pem
openssl genrsa -out jwt_private.pem 4096 openssl rsa -in jwt_private.pem -pubout -out jwt_public.pem

Secure storage (AWS Secrets Manager, HashiCorp Vault, etc.)

Secure storage (AWS Secrets Manager, HashiCorp Vault, etc.)

aws secretsmanager create-secret
--name gefin/jwt-private-key
--secret-string file://jwt_private.pem
undefined
aws secretsmanager create-secret
--name gefin/jwt-private-key
--secret-string file://jwt_private.pem
undefined

Monitoring

监控

python
undefined
python
undefined

Log authentication events

Log authentication events

import structlog
logger = structlog.get_logger()
async def validate_token(self, token: str): logger.info( "token_validation_started", issuer=self._get_issuer(token), )
try:
    user_data = await self._validate(token)
    logger.info(
        "token_validation_success",
        user_id=user_data["sub"],
        issuer=user_data["iss"],
    )
    return user_data
except JWTError as e:
    logger.warning(
        "token_validation_failed",
        error=str(e),
    )
    raise
undefined
import structlog
logger = structlog.get_logger()
async def validate_token(self, token: str): logger.info( "token_validation_started", issuer=self._get_issuer(token), )
try:
    user_data = await self._validate(token)
    logger.info(
        "token_validation_success",
        user_id=user_data["sub"],
        issuer=user_data["iss"],
    )
    return user_data
except JWTError as e:
    logger.warning(
        "token_validation_failed",
        error=str(e),
    )
    raise
undefined

References

参考资料

Production Examples

生产示例

Based on patterns from:
  • GEFIN Backend: Multi-system SSO with Corporativo, SGF, GED, CARRINHO
  • Enterprise SSO: JWT RS256 with backwards verification
  • Laravel Integration: Session decryption for legacy systems
基于以下项目的模式:
  • GEFIN Backend:集成Corporativo、SGF、GED、CARRINHO的多系统SSO
  • Enterprise SSO:带反向验证的JWT RS256认证
  • Laravel Integration:面向传统系统的会话解密功能