websocket

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

WebSocket Security Skill

WebSocket 安全技能指南

File Organization

文件结构

  • SKILL.md: Core principles, patterns, essential security (this file)
  • references/security-examples.md: CSWSH examples and authentication patterns
  • references/advanced-patterns.md: Connection management, scaling patterns
  • references/threat-model.md: Attack scenarios including CSWSH
  • SKILL.md:核心原则、模式、基础安全内容(本文档)
  • references/security-examples.md:CSWSH示例与身份验证模式
  • references/advanced-patterns.md:连接管理、扩展模式
  • references/threat-model.md:包含CSWSH在内的攻击场景

Validation Gates

验证关卡

Gate 0.2: PASSED (5+ vulnerabilities documented) - CVE-2024-23898, CVE-2024-26135, CVE-2023-0957

关卡0.2:已通过(记录5个以上漏洞)- CVE-2024-23898、CVE-2024-26135、CVE-2023-0957

1. Overview

1. 概述

Risk Level: HIGH
Justification: WebSocket connections bypass Same-Origin Policy protections, making them vulnerable to Cross-Site WebSocket Hijacking (CSWSH). Persistent connections require careful authentication, session management, and input validation.
You are an expert in WebSocket security, understanding the unique vulnerabilities of persistent bidirectional connections.
风险等级:高
理由:WebSocket连接绕过同源策略保护,易受跨站WebSocket劫持(CSWSH)攻击。持久化连接需要谨慎处理身份验证、会话管理和输入验证。
您是WebSocket安全领域的专家,了解持久双向连接的独特漏洞。

Core Expertise Areas

核心专业领域

  • CSWSH (Cross-Site WebSocket Hijacking) prevention
  • Origin header validation and token-based authentication
  • Message validation and per-message authorization
  • Rate limiting and connection lifecycle security

  • CSWSH(跨站WebSocket劫持)防护
  • Origin头验证与基于令牌的身份验证
  • 消息验证与逐消息授权
  • 速率限制与连接生命周期安全

2. Core Responsibilities

2. 核心职责

Fundamental Principles

基本原则

  1. TDD First: Write tests before implementation - test security boundaries, connection lifecycle
  2. Performance Aware: Optimize for low latency (<50ms), connection pooling, backpressure
  3. Validate Origin: Always check Origin header against explicit allowlist
  4. Authenticate First: Verify identity before accepting messages
  5. Authorize Each Action: Don't assume connection equals unlimited access
  6. Validate All Messages: Treat WebSocket messages as untrusted input
  7. Limit Resources: Rate limit messages, timeout idle connections
  1. 优先TDD:在实现前编写测试 - 测试安全边界、连接生命周期
  2. 性能感知:针对低延迟(<50ms)、连接池、背压进行优化
  3. 验证Origin:始终对照明确的白名单检查Origin头
  4. 先验证身份:在接收消息前验证用户身份
  5. 为每个操作授权:不要假设连接等同于无限制访问
  6. 验证所有消息:将WebSocket消息视为不可信输入
  7. 限制资源:对消息进行速率限制,闲置连接超时

Security Decision Framework

安全决策框架

SituationApproach
New connectionValidate Origin, require authentication token
Each messageValidate format, check authorization for action
Sensitive operationsRe-verify session, log action
Idle connectionTimeout after inactivity period
Error conditionClose connection, log details

场景处理方式
新连接验证Origin,要求身份验证令牌
每条消息验证格式,检查操作的授权
敏感操作重新验证会话,记录操作
闲置连接闲置一段时间后超时
错误情况关闭连接,记录详细信息

3. Technical Foundation

3. 技术基础

Version Recommendations

版本推荐

ComponentVersionNotes
FastAPI/Starlette0.115+WebSocket support
websockets12.0+Python WebSocket library
组件版本说明
FastAPI/Starlette0.115+支持WebSocket
websockets12.0+Python WebSocket库

Security Configuration

安全配置

python
WEBSOCKET_CONFIG = {
    "max_message_size": 1024 * 1024,  # 1MB
    "max_connections_per_ip": 10,
    "idle_timeout_seconds": 300,
    "messages_per_minute": 60,
}
python
WEBSOCKET_CONFIG = {
    "max_message_size": 1024 * 1024,  # 1MB
    "max_connections_per_ip": 10,
    "idle_timeout_seconds": 300,
    "messages_per_minute": 60,
}

NEVER use "*" for origins

切勿将"*"用于源地址

4. Implementation Workflow (TDD)

4. 实现工作流(TDD)

Step 1: Write Failing Test First

步骤1:先编写失败的测试

python
import pytest
from httpx import AsyncClient, ASGITransport
from fastapi.testclient import TestClient
python
import pytest
from httpx import AsyncClient, ASGITransport
from fastapi.testclient import TestClient

Test security boundaries first

先测试安全边界

@pytest.mark.asyncio async def test_origin_validation_rejects_invalid(): """CSWSH prevention - must reject invalid origins.""" async with AsyncClient( transport=ASGITransport(app=app), base_url="http://test" ) as client: # This should fail until origin validation is implemented with pytest.raises(Exception): async with client.websocket_connect( "/ws?token=valid", headers={"Origin": "https://evil.com"} ): pass
@pytest.mark.asyncio async def test_authentication_required(): """Must reject connections without valid token.""" with TestClient(app) as client: with pytest.raises(Exception): with client.websocket_connect("/ws") as ws: pass
@pytest.mark.asyncio async def test_message_authorization(): """Each message action must be authorized.""" with TestClient(app) as client: with client.websocket_connect( "/ws?token=readonly_user", headers={"Origin": "https://app.example.com"} ) as ws: ws.send_json({"action": "delete", "id": "123"}) response = ws.receive_json() assert response.get("error") == "Permission denied"
undefined
@pytest.mark.asyncio async def test_origin_validation_rejects_invalid(): """CSWSH防护 - 必须拒绝无效源地址。""" async with AsyncClient( transport=ASGITransport(app=app), base_url="http://test" ) as client: # 在实现源地址验证前,此测试应失败 with pytest.raises(Exception): async with client.websocket_connect( "/ws?token=valid", headers={"Origin": "https://evil.com"} ): pass
@pytest.mark.asyncio async def test_authentication_required(): """必须拒绝无有效令牌的连接。""" with TestClient(app) as client: with pytest.raises(Exception): with client.websocket_connect("/ws") as ws: pass
@pytest.mark.asyncio async def test_message_authorization(): """每条消息的操作都必须经过授权。""" with TestClient(app) as client: with client.websocket_connect( "/ws?token=readonly_user", headers={"Origin": "https://app.example.com"} ) as ws: ws.send_json({"action": "delete", "id": "123"}) response = ws.receive_json() assert response.get("error") == "Permission denied"
undefined

Step 2: Implement Minimum to Pass

步骤2:实现满足测试的最小代码

python
undefined
python
undefined

Implement only what's needed to pass the test

仅实现通过测试所需的代码

async def validate_origin(websocket: WebSocket) -> bool: origin = websocket.headers.get("origin") if not origin or origin not in ALLOWED_ORIGINS: await websocket.close(code=4003, reason="Invalid origin") return False return True
undefined
async def validate_origin(websocket: WebSocket) -> bool: origin = websocket.headers.get("origin") if not origin or origin not in ALLOWED_ORIGINS: await websocket.close(code=4003, reason="Invalid origin") return False return True
undefined

Step 3: Refactor and Verify

步骤3:重构与验证

bash
undefined
bash
undefined

Run all WebSocket tests

运行所有WebSocket测试

pytest tests/websocket/ -v --asyncio-mode=auto
pytest tests/websocket/ -v --asyncio-mode=auto

Check for security issues

检查安全问题

bandit -r src/websocket/
bandit -r src/websocket/

Verify no regressions

验证无回归问题

pytest tests/ -v

---
pytest tests/ -v

---

5. Performance Patterns

5. 性能模式

Pattern 1: Connection Pooling

模式1:连接池

python
undefined
python
undefined

BAD - Create new connection for each request

错误示例 - 为每个请求创建新连接

ws = await create_connection(user_id) # Expensive!
ws = await create_connection(user_id) # 开销大!

GOOD - Reuse connections from pool

正确示例 - 从连接池复用连接

class ConnectionPool: def init(self, max_size: int = 100): self.connections: dict[str, WebSocket] = {}
async def get_or_create(self, user_id: str) -> WebSocket:
    if user_id not in self.connections:
        self.connections[user_id] = await create_connection(user_id)
    return self.connections[user_id]
undefined
class ConnectionPool: def init(self, max_size: int = 100): self.connections: dict[str, WebSocket] = {}
async def get_or_create(self, user_id: str) -> WebSocket:
    if user_id not in self.connections:
        self.connections[user_id] = await create_connection(user_id)
    return self.connections[user_id]
undefined

Pattern 2: Message Batching

模式2:消息批处理

python
undefined
python
undefined

BAD - Send messages one at a time

错误示例 - 逐个发送消息

for item in items: await websocket.send_json({"type": "item", "data": item})
for item in items: await websocket.send_json({"type": "item", "data": item})

GOOD - Batch messages to reduce overhead

正确示例 - 批处理消息以减少开销

await websocket.send_json({"type": "batch", "data": items[:50]})
undefined
await websocket.send_json({"type": "batch", "data": items[:50]})
undefined

Pattern 3: Binary Protocols

模式3:二进制协议

python
undefined
python
undefined

BAD - JSON for high-frequency data (~80 bytes)

错误示例 - 高频数据使用JSON(约80字节)

await websocket.send_json({"x": 123.456, "y": 789.012, "z": 456.789})
await websocket.send_json({"x": 123.456, "y": 789.012, "z": 456.789})

GOOD - Binary format (20 bytes)

正确示例 - 使用二进制格式(20字节)

import struct await websocket.send_bytes(struct.pack('!3f', 123.456, 789.012, 456.789))
undefined
import struct await websocket.send_bytes(struct.pack('!3f', 123.456, 789.012, 456.789))
undefined

Pattern 4: Heartbeat Optimization

模式4:心跳优化

python
undefined
python
undefined

BAD - Fixed frequent heartbeats

错误示例 - 固定频繁的心跳

HEARTBEAT_INTERVAL = 5 # Every 5 seconds
HEARTBEAT_INTERVAL = 5 # 每5秒一次

GOOD - Adaptive heartbeats based on activity

正确示例 - 根据活动情况自适应心跳

interval = 60 if (time() - last_activity) < 60 else 30
undefined
interval = 60 if (time() - last_activity) < 60 else 30
undefined

Pattern 5: Backpressure Handling

模式5:背压处理

python
undefined
python
undefined

BAD - Blocks on slow clients

错误示例 - 在客户端缓慢时阻塞

await ws.send_json(message)
await ws.send_json(message)

GOOD - Timeout and bounded queue

正确示例 - 超时与有限队列

from collections import deque queue = deque(maxlen=100) # Drop oldest when full try: await asyncio.wait_for(ws.send_json(message), timeout=1.0) except asyncio.TimeoutError: pass # Client too slow

---
from collections import deque queue = deque(maxlen=100) # 队列满时丢弃最旧的消息 try: await asyncio.wait_for(ws.send_json(message), timeout=1.0) except asyncio.TimeoutError: pass # 客户端过慢

---

6. Implementation Patterns

6. 实现模式

Pattern 1: Origin Validation (Critical for CSWSH Prevention)

模式1:Origin验证(CSWSH防护的关键)

python
from fastapi import WebSocket

async def validate_origin(websocket: WebSocket) -> bool:
    """Validate WebSocket origin against allowlist."""
    origin = websocket.headers.get("origin")
    if not origin or origin not in ALLOWED_ORIGINS:
        await websocket.close(code=4003, reason="Invalid origin")
        return False
    return True

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    if not await validate_origin(websocket):
        return
    await websocket.accept()
python
from fastapi import WebSocket

async def validate_origin(websocket: WebSocket) -> bool:
    """对照白名单验证WebSocket源地址。"""
    origin = websocket.headers.get("origin")
    if not origin or origin not in ALLOWED_ORIGINS:
        await websocket.close(code=4003, reason="Invalid origin")
        return False
    return True

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    if not await validate_origin(websocket):
        return
    await websocket.accept()

Pattern 2: Token-Based Authentication

模式2:基于令牌的身份验证

python
from jose import jwt, JWTError

async def authenticate_websocket(websocket: WebSocket) -> User | None:
    """Authenticate via token (not cookies - vulnerable to CSWSH)."""
    token = websocket.query_params.get("token")
    if not token:
        await websocket.close(code=4001, reason="Authentication required")
        return None
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
        user = await user_service.get(payload.get("sub"))
        if not user:
            await websocket.close(code=4001, reason="User not found")
            return None
        return user
    except JWTError:
        await websocket.close(code=4001, reason="Invalid token")
        return None
python
from jose import jwt, JWTError

async def authenticate_websocket(websocket: WebSocket) -> User | None:
    """通过令牌进行身份验证(不要使用Cookie - 易受CSWSH攻击)。"""
    token = websocket.query_params.get("token")
    if not token:
        await websocket.close(code=4001, reason="Authentication required")
        return None
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
        user = await user_service.get(payload.get("sub"))
        if not user:
            await websocket.close(code=4001, reason="User not found")
            return None
        return user
    except JWTError:
        await websocket.close(code=4001, reason="Invalid token")
        return None

Pattern 3: Per-Message Authorization

模式3:逐消息授权

python
from pydantic import BaseModel, field_validator

class WebSocketMessage(BaseModel):
    action: str
    data: dict

    @field_validator('action')
    @classmethod
    def validate_action(cls, v):
        if v not in {'subscribe', 'unsubscribe', 'send', 'query'}:
            raise ValueError(f'Invalid action: {v}')
        return v

async def handle_message(websocket: WebSocket, user: User, raw_data: dict):
    try:
        message = WebSocketMessage(**raw_data)
    except ValueError:
        await websocket.send_json({"error": "Invalid message format"})
        return

    if not user.has_permission(f"ws:{message.action}"):
        await websocket.send_json({"error": "Permission denied"})
        return

    result = await handlers[message.action](user, message.data)
    await websocket.send_json(result)
python
from pydantic import BaseModel, field_validator

class WebSocketMessage(BaseModel):
    action: str
    data: dict

    @field_validator('action')
    @classmethod
    def validate_action(cls, v):
        if v not in {'subscribe', 'unsubscribe', 'send', 'query'}:
            raise ValueError(f'Invalid action: {v}')
        return v

async def handle_message(websocket: WebSocket, user: User, raw_data: dict):
    try:
        message = WebSocketMessage(**raw_data)
    except ValueError:
        await websocket.send_json({"error": "Invalid message format"})
        return

    if not user.has_permission(f"ws:{message.action}"):
        await websocket.send_json({"error": "Permission denied"})
        return

    result = await handlers[message.action](user, message.data)
    await websocket.send_json(result)

Pattern 4: Connection Manager with Rate Limiting

模式4:带速率限制的连接管理器

python
from collections import defaultdict
from time import time

class SecureConnectionManager:
    def __init__(self):
        self.connections: dict[str, WebSocket] = {}
        self.message_counts: dict[str, list[float]] = defaultdict(list)
        self.connections_per_ip: dict[str, int] = defaultdict(int)

    async def connect(self, websocket: WebSocket, user_id: str, ip: str) -> bool:
        if self.connections_per_ip[ip] >= WEBSOCKET_CONFIG["max_connections_per_ip"]:
            await websocket.close(code=4029, reason="Too many connections")
            return False
        await websocket.accept()
        self.connections[user_id] = websocket
        self.connections_per_ip[ip] += 1
        return True

    def check_rate_limit(self, user_id: str) -> bool:
        now = time()
        self.message_counts[user_id] = [
            ts for ts in self.message_counts[user_id] if ts > now - 60
        ]
        if len(self.message_counts[user_id]) >= WEBSOCKET_CONFIG["messages_per_minute"]:
            return False
        self.message_counts[user_id].append(now)
        return True
python
from collections import defaultdict
from time import time

class SecureConnectionManager:
    def __init__(self):
        self.connections: dict[str, WebSocket] = {}
        self.message_counts: dict[str, list[float]] = defaultdict(list)
        self.connections_per_ip: dict[str, int] = defaultdict(int)

    async def connect(self, websocket: WebSocket, user_id: str, ip: str) -> bool:
        if self.connections_per_ip[ip] >= WEBSOCKET_CONFIG["max_connections_per_ip"]:
            await websocket.close(code=4029, reason="Too many connections")
            return False
        await websocket.accept()
        self.connections[user_id] = websocket
        self.connections_per_ip[ip] += 1
        return True

    def check_rate_limit(self, user_id: str) -> bool:
        now = time()
        self.message_counts[user_id] = [
            ts for ts in self.message_counts[user_id] if ts > now - 60
        ]
        if len(self.message_counts[user_id]) >= WEBSOCKET_CONFIG["messages_per_minute"]:
            return False
        self.message_counts[user_id].append(now)
        return True

Pattern 5: Complete Secure Handler

模式5:完整的安全处理器

python
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    if not await validate_origin(websocket):
        return
    user = await authenticate_websocket(websocket)
    if not user:
        return

    ip = websocket.client.host
    if not await manager.connect(websocket, user.id, ip):
        return

    try:
        while True:
            raw = await asyncio.wait_for(
                websocket.receive_json(),
                timeout=WEBSOCKET_CONFIG["idle_timeout_seconds"]
            )
            if not manager.check_rate_limit(user.id):
                await websocket.send_json({"error": "Rate limited"})
                continue
            await handle_message(websocket, user, raw)
    except (WebSocketDisconnect, asyncio.TimeoutError):
        pass
    finally:
        manager.disconnect(user.id, ip)

python
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    if not await validate_origin(websocket):
        return
    user = await authenticate_websocket(websocket)
    if not user:
        return

    ip = websocket.client.host
    if not await manager.connect(websocket, user.id, ip):
        return

    try:
        while True:
            raw = await asyncio.wait_for(
                websocket.receive_json(),
                timeout=WEBSOCKET_CONFIG["idle_timeout_seconds"]
            )
            if not manager.check_rate_limit(user.id):
                await websocket.send_json({"error": "Rate limited"})
                continue
            await handle_message(websocket, user, raw)
    except (WebSocketDisconnect, asyncio.TimeoutError):
        pass
    finally:
        manager.disconnect(user.id, ip)

7. Security Standards

7. 安全标准

Domain Vulnerability Landscape

领域漏洞情况

CVE IDSeverityDescriptionMitigation
CVE-2024-23898HIGHJenkins CSWSH - command executionValidate Origin
CVE-2024-26135HIGHMeshCentral CSWSH - config leakOrigin + SameSite
CVE-2023-0957CRITICALGitpod CSWSH - account takeoverOrigin + token auth
CVE编号严重程度描述缓解措施
CVE-2024-23898Jenkins CSWSH - 命令执行验证Origin
CVE-2024-26135MeshCentral CSWSH - 配置泄露Origin验证 + SameSite
CVE-2023-0957严重Gitpod CSWSH - 账户接管Origin验证 + 令牌身份验证

OWASP Top 10 Mapping

OWASP Top 10映射

CategoryMitigations
A01 Access ControlOrigin validation, per-message authz
A02 Crypto FailuresTLS/WSS only, signed tokens
A03 InjectionValidate all message content
A07 Auth FailuresToken auth, session validation
类别缓解措施
A01 访问控制Origin验证、逐消息授权
A02 加密失败仅使用TLS/WSS、签名令牌
A03 注入验证所有消息内容
A07 身份验证失败令牌身份验证、会话验证

CSWSH Prevention Summary

CSWSH防护总结

python
async def secure_websocket_handler(websocket: WebSocket):
    # 1. VALIDATE ORIGIN (Critical)
    if websocket.headers.get("origin") not in ALLOWED_ORIGINS:
        await websocket.close(code=4003)
        return
    # 2. AUTHENTICATE with token (not cookies)
    user = await validate_token(websocket.query_params.get("token"))
    if not user:
        await websocket.close(code=4001)
        return
    # 3. Accept only after validation
    await websocket.accept()
    # 4. AUTHORIZE each message, 5. RATE LIMIT, 6. TIMEOUT idle

python
async def secure_websocket_handler(websocket: WebSocket):
    # 1. 验证Origin(关键)
    if websocket.headers.get("origin") not in ALLOWED_ORIGINS:
        await websocket.close(code=4003)
        return
    # 2. 使用令牌进行身份验证(不要使用Cookie)
    user = await validate_token(websocket.query_params.get("token"))
    if not user:
        await websocket.close(code=4001)
        return
    # 3. 验证通过后再接受连接
    await websocket.accept()
    # 4. 为每条消息授权,5. 速率限制,6. 闲置连接超时

8. Common Mistakes & Anti-Patterns

8. 常见错误与反模式

No Origin Validation

无Origin验证

python
undefined
python
undefined

NEVER - vulnerable to CSWSH

切勿这样做 - 易受CSWSH攻击

@app.websocket("/ws") async def vulnerable(websocket: WebSocket): await websocket.accept() # Accepts any origin!
@app.websocket("/ws") async def vulnerable(websocket: WebSocket): await websocket.accept() # 接受任何源地址!

ALWAYS - validate origin first

务必这样做 - 先验证源地址

if websocket.headers.get("origin") not in ALLOWED_ORIGINS: await websocket.close(code=4003) return
undefined
if websocket.headers.get("origin") not in ALLOWED_ORIGINS: await websocket.close(code=4003) return
undefined

Cookie-Only Authentication

仅使用Cookie身份验证

python
undefined
python
undefined

NEVER - cookies sent automatically in CSWSH attacks

切勿这样做 - Cookie在CSWSH攻击中会自动发送

session = websocket.cookies.get("session")
session = websocket.cookies.get("session")

ALWAYS - require explicit token parameter

务必这样做 - 要求显式的令牌参数

token = websocket.query_params.get("token")
undefined
token = websocket.query_params.get("token")
undefined

No Per-Message Authorization

无逐消息授权

python
undefined
python
undefined

NEVER - assumes connection = full access

切勿这样做 - 假设连接等同于完全访问权限

if data["action"] == "delete": await delete_resource(data["id"])
if data["action"] == "delete": await delete_resource(data["id"])

ALWAYS - check permission for each action

务必这样做 - 为每个操作检查权限

if not user.has_permission("delete"): return {"error": "Permission denied"}
undefined
if not user.has_permission("delete"): return {"error": "Permission denied"}
undefined

No Input Validation

无输入验证

python
undefined
python
undefined

NEVER - trust WebSocket messages

切勿这样做 - 信任WebSocket消息

await db.execute(f"SELECT * FROM {data['table']}") # SQL injection!
await db.execute(f"SELECT * FROM {data['table']}") # SQL注入!

ALWAYS - validate with Pydantic

务必这样做 - 使用Pydantic验证

message = WebSocketMessage(**data)

---
message = WebSocketMessage(**data)

---

9. Pre-Implementation Checklist

9. 实现前检查清单

Phase 1: Before Writing Code

阶段1:编写代码前

  • Write failing tests for security boundaries (CSWSH, auth, authz)
  • Write failing tests for connection lifecycle (connect, disconnect, timeout)
  • Write failing tests for message validation and rate limiting
  • Review threat model in
    references/threat-model.md
  • Identify performance requirements (latency, throughput, connections)
  • 为安全边界编写失败测试(CSWSH、身份验证、授权)
  • 为连接生命周期编写失败测试(连接、断开、超时)
  • 为消息验证和速率限制编写失败测试
  • 审阅
    references/threat-model.md
    中的威胁模型
  • 确定性能要求(延迟、吞吐量、连接数)

Phase 2: During Implementation

阶段2:实现过程中

  • Origin validation against explicit allowlist
  • Token-based authentication (not cookie-only)
  • Per-message authorization checks
  • Rate limiting and idle timeout implemented
  • All messages validated with Pydantic
  • Connection pooling for efficiency
  • Backpressure handling for slow clients
  • 针对明确的白名单进行Origin验证
  • 基于令牌的身份验证(不只是Cookie)
  • 逐消息授权检查
  • 实现速率限制和闲置超时
  • 使用Pydantic验证所有消息
  • 连接池以提升效率
  • 为慢客户端处理背压

Phase 3: Before Committing

阶段3:提交前

  • All security tests pass:
    pytest tests/websocket/ -v
  • No security issues:
    bandit -r src/websocket/
  • WSS (TLS) enforced in production config
  • CSWSH test coverage verified
  • Performance benchmarks meet targets (<50ms latency)
  • No regressions:
    pytest tests/ -v

  • 所有安全测试通过:
    pytest tests/websocket/ -v
  • 无安全问题:
    bandit -r src/websocket/
  • 生产配置中强制使用WSS(TLS)
  • 验证CSWSH测试覆盖率
  • 性能基准满足目标(延迟<50ms)
  • 无回归问题:
    pytest tests/ -v

10. Summary

10. 总结

Security Goals:
  • CSWSH-Resistant: Origin validation, token auth
  • Properly Authorized: Per-message permission checks
  • Rate Limited: Prevent message flooding
  • Validated: All messages treated as untrusted
Critical Reminders: ALWAYS validate Origin, use token auth (not cookies), authorize EACH message, use WSS in production.
安全目标:
  • 抗CSWSH:Origin验证、令牌身份验证
  • 正确授权:逐消息权限检查
  • 速率限制:防止消息泛滥
  • 验证输入:所有消息视为不可信
关键提醒:务必验证Origin,使用令牌身份验证(不要使用Cookie),为每条消息授权,生产环境中使用WSS。