websocket
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseWebSocket Security Skill
WebSocket 安全技能指南
File Organization
文件结构
- SKILL.md: Core principles, patterns, essential security (this file)
- references/security-examples.md: CSWSH examples and authentication patterns
- references/advanced-patterns.md: Connection management, scaling patterns
- references/threat-model.md: Attack scenarios including CSWSH
- SKILL.md:核心原则、模式、基础安全内容(本文档)
- references/security-examples.md:CSWSH示例与身份验证模式
- references/advanced-patterns.md:连接管理、扩展模式
- references/threat-model.md:包含CSWSH在内的攻击场景
Validation Gates
验证关卡
Gate 0.2: PASSED (5+ vulnerabilities documented) - CVE-2024-23898, CVE-2024-26135, CVE-2023-0957
关卡0.2:已通过(记录5个以上漏洞)- CVE-2024-23898、CVE-2024-26135、CVE-2023-0957
1. Overview
1. 概述
Risk Level: HIGH
Justification: WebSocket connections bypass Same-Origin Policy protections, making them vulnerable to Cross-Site WebSocket Hijacking (CSWSH). Persistent connections require careful authentication, session management, and input validation.
You are an expert in WebSocket security, understanding the unique vulnerabilities of persistent bidirectional connections.
风险等级:高
理由:WebSocket连接绕过同源策略保护,易受跨站WebSocket劫持(CSWSH)攻击。持久化连接需要谨慎处理身份验证、会话管理和输入验证。
您是WebSocket安全领域的专家,了解持久双向连接的独特漏洞。
Core Expertise Areas
核心专业领域
- CSWSH (Cross-Site WebSocket Hijacking) prevention
- Origin header validation and token-based authentication
- Message validation and per-message authorization
- Rate limiting and connection lifecycle security
- CSWSH(跨站WebSocket劫持)防护
- Origin头验证与基于令牌的身份验证
- 消息验证与逐消息授权
- 速率限制与连接生命周期安全
2. Core Responsibilities
2. 核心职责
Fundamental Principles
基本原则
- TDD First: Write tests before implementation - test security boundaries, connection lifecycle
- Performance Aware: Optimize for low latency (<50ms), connection pooling, backpressure
- Validate Origin: Always check Origin header against explicit allowlist
- Authenticate First: Verify identity before accepting messages
- Authorize Each Action: Don't assume connection equals unlimited access
- Validate All Messages: Treat WebSocket messages as untrusted input
- Limit Resources: Rate limit messages, timeout idle connections
- 优先TDD:在实现前编写测试 - 测试安全边界、连接生命周期
- 性能感知:针对低延迟(<50ms)、连接池、背压进行优化
- 验证Origin:始终对照明确的白名单检查Origin头
- 先验证身份:在接收消息前验证用户身份
- 为每个操作授权:不要假设连接等同于无限制访问
- 验证所有消息:将WebSocket消息视为不可信输入
- 限制资源:对消息进行速率限制,闲置连接超时
Security Decision Framework
安全决策框架
| Situation | Approach |
|---|---|
| New connection | Validate Origin, require authentication token |
| Each message | Validate format, check authorization for action |
| Sensitive operations | Re-verify session, log action |
| Idle connection | Timeout after inactivity period |
| Error condition | Close connection, log details |
| 场景 | 处理方式 |
|---|---|
| 新连接 | 验证Origin,要求身份验证令牌 |
| 每条消息 | 验证格式,检查操作的授权 |
| 敏感操作 | 重新验证会话,记录操作 |
| 闲置连接 | 闲置一段时间后超时 |
| 错误情况 | 关闭连接,记录详细信息 |
3. Technical Foundation
3. 技术基础
Version Recommendations
版本推荐
| Component | Version | Notes |
|---|---|---|
| FastAPI/Starlette | 0.115+ | WebSocket support |
| websockets | 12.0+ | Python WebSocket library |
| 组件 | 版本 | 说明 |
|---|---|---|
| FastAPI/Starlette | 0.115+ | 支持WebSocket |
| websockets | 12.0+ | Python WebSocket库 |
Security Configuration
安全配置
python
WEBSOCKET_CONFIG = {
"max_message_size": 1024 * 1024, # 1MB
"max_connections_per_ip": 10,
"idle_timeout_seconds": 300,
"messages_per_minute": 60,
}python
WEBSOCKET_CONFIG = {
"max_message_size": 1024 * 1024, # 1MB
"max_connections_per_ip": 10,
"idle_timeout_seconds": 300,
"messages_per_minute": 60,
}NEVER use "*" for origins
切勿将"*"用于源地址
ALLOWED_ORIGINS = ["https://app.example.com", "https://admin.example.com"]
---ALLOWED_ORIGINS = ["https://app.example.com", "https://admin.example.com"]
---4. Implementation Workflow (TDD)
4. 实现工作流(TDD)
Step 1: Write Failing Test First
步骤1:先编写失败的测试
python
import pytest
from httpx import AsyncClient, ASGITransport
from fastapi.testclient import TestClientpython
import pytest
from httpx import AsyncClient, ASGITransport
from fastapi.testclient import TestClientTest security boundaries first
先测试安全边界
@pytest.mark.asyncio
async def test_origin_validation_rejects_invalid():
"""CSWSH prevention - must reject invalid origins."""
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test"
) as client:
# This should fail until origin validation is implemented
with pytest.raises(Exception):
async with client.websocket_connect(
"/ws?token=valid",
headers={"Origin": "https://evil.com"}
):
pass
@pytest.mark.asyncio
async def test_authentication_required():
"""Must reject connections without valid token."""
with TestClient(app) as client:
with pytest.raises(Exception):
with client.websocket_connect("/ws") as ws:
pass
@pytest.mark.asyncio
async def test_message_authorization():
"""Each message action must be authorized."""
with TestClient(app) as client:
with client.websocket_connect(
"/ws?token=readonly_user",
headers={"Origin": "https://app.example.com"}
) as ws:
ws.send_json({"action": "delete", "id": "123"})
response = ws.receive_json()
assert response.get("error") == "Permission denied"
undefined@pytest.mark.asyncio
async def test_origin_validation_rejects_invalid():
"""CSWSH防护 - 必须拒绝无效源地址。"""
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test"
) as client:
# 在实现源地址验证前,此测试应失败
with pytest.raises(Exception):
async with client.websocket_connect(
"/ws?token=valid",
headers={"Origin": "https://evil.com"}
):
pass
@pytest.mark.asyncio
async def test_authentication_required():
"""必须拒绝无有效令牌的连接。"""
with TestClient(app) as client:
with pytest.raises(Exception):
with client.websocket_connect("/ws") as ws:
pass
@pytest.mark.asyncio
async def test_message_authorization():
"""每条消息的操作都必须经过授权。"""
with TestClient(app) as client:
with client.websocket_connect(
"/ws?token=readonly_user",
headers={"Origin": "https://app.example.com"}
) as ws:
ws.send_json({"action": "delete", "id": "123"})
response = ws.receive_json()
assert response.get("error") == "Permission denied"
undefinedStep 2: Implement Minimum to Pass
步骤2:实现满足测试的最小代码
python
undefinedpython
undefinedImplement only what's needed to pass the test
仅实现通过测试所需的代码
async def validate_origin(websocket: WebSocket) -> bool:
origin = websocket.headers.get("origin")
if not origin or origin not in ALLOWED_ORIGINS:
await websocket.close(code=4003, reason="Invalid origin")
return False
return True
undefinedasync def validate_origin(websocket: WebSocket) -> bool:
origin = websocket.headers.get("origin")
if not origin or origin not in ALLOWED_ORIGINS:
await websocket.close(code=4003, reason="Invalid origin")
return False
return True
undefinedStep 3: Refactor and Verify
步骤3:重构与验证
bash
undefinedbash
undefinedRun all WebSocket tests
运行所有WebSocket测试
pytest tests/websocket/ -v --asyncio-mode=auto
pytest tests/websocket/ -v --asyncio-mode=auto
Check for security issues
检查安全问题
bandit -r src/websocket/
bandit -r src/websocket/
Verify no regressions
验证无回归问题
pytest tests/ -v
---pytest tests/ -v
---5. Performance Patterns
5. 性能模式
Pattern 1: Connection Pooling
模式1:连接池
python
undefinedpython
undefinedBAD - Create new connection for each request
错误示例 - 为每个请求创建新连接
ws = await create_connection(user_id) # Expensive!
ws = await create_connection(user_id) # 开销大!
GOOD - Reuse connections from pool
正确示例 - 从连接池复用连接
class ConnectionPool:
def init(self, max_size: int = 100):
self.connections: dict[str, WebSocket] = {}
async def get_or_create(self, user_id: str) -> WebSocket:
if user_id not in self.connections:
self.connections[user_id] = await create_connection(user_id)
return self.connections[user_id]undefinedclass ConnectionPool:
def init(self, max_size: int = 100):
self.connections: dict[str, WebSocket] = {}
async def get_or_create(self, user_id: str) -> WebSocket:
if user_id not in self.connections:
self.connections[user_id] = await create_connection(user_id)
return self.connections[user_id]undefinedPattern 2: Message Batching
模式2:消息批处理
python
undefinedpython
undefinedBAD - Send messages one at a time
错误示例 - 逐个发送消息
for item in items:
await websocket.send_json({"type": "item", "data": item})
for item in items:
await websocket.send_json({"type": "item", "data": item})
GOOD - Batch messages to reduce overhead
正确示例 - 批处理消息以减少开销
await websocket.send_json({"type": "batch", "data": items[:50]})
undefinedawait websocket.send_json({"type": "batch", "data": items[:50]})
undefinedPattern 3: Binary Protocols
模式3:二进制协议
python
undefinedpython
undefinedBAD - JSON for high-frequency data (~80 bytes)
错误示例 - 高频数据使用JSON(约80字节)
await websocket.send_json({"x": 123.456, "y": 789.012, "z": 456.789})
await websocket.send_json({"x": 123.456, "y": 789.012, "z": 456.789})
GOOD - Binary format (20 bytes)
正确示例 - 使用二进制格式(20字节)
import struct
await websocket.send_bytes(struct.pack('!3f', 123.456, 789.012, 456.789))
undefinedimport struct
await websocket.send_bytes(struct.pack('!3f', 123.456, 789.012, 456.789))
undefinedPattern 4: Heartbeat Optimization
模式4:心跳优化
python
undefinedpython
undefinedBAD - Fixed frequent heartbeats
错误示例 - 固定频繁的心跳
HEARTBEAT_INTERVAL = 5 # Every 5 seconds
HEARTBEAT_INTERVAL = 5 # 每5秒一次
GOOD - Adaptive heartbeats based on activity
正确示例 - 根据活动情况自适应心跳
interval = 60 if (time() - last_activity) < 60 else 30
undefinedinterval = 60 if (time() - last_activity) < 60 else 30
undefinedPattern 5: Backpressure Handling
模式5:背压处理
python
undefinedpython
undefinedBAD - Blocks on slow clients
错误示例 - 在客户端缓慢时阻塞
await ws.send_json(message)
await ws.send_json(message)
GOOD - Timeout and bounded queue
正确示例 - 超时与有限队列
from collections import deque
queue = deque(maxlen=100) # Drop oldest when full
try:
await asyncio.wait_for(ws.send_json(message), timeout=1.0)
except asyncio.TimeoutError:
pass # Client too slow
---from collections import deque
queue = deque(maxlen=100) # 队列满时丢弃最旧的消息
try:
await asyncio.wait_for(ws.send_json(message), timeout=1.0)
except asyncio.TimeoutError:
pass # 客户端过慢
---6. Implementation Patterns
6. 实现模式
Pattern 1: Origin Validation (Critical for CSWSH Prevention)
模式1:Origin验证(CSWSH防护的关键)
python
from fastapi import WebSocket
async def validate_origin(websocket: WebSocket) -> bool:
"""Validate WebSocket origin against allowlist."""
origin = websocket.headers.get("origin")
if not origin or origin not in ALLOWED_ORIGINS:
await websocket.close(code=4003, reason="Invalid origin")
return False
return True
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
if not await validate_origin(websocket):
return
await websocket.accept()python
from fastapi import WebSocket
async def validate_origin(websocket: WebSocket) -> bool:
"""对照白名单验证WebSocket源地址。"""
origin = websocket.headers.get("origin")
if not origin or origin not in ALLOWED_ORIGINS:
await websocket.close(code=4003, reason="Invalid origin")
return False
return True
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
if not await validate_origin(websocket):
return
await websocket.accept()Pattern 2: Token-Based Authentication
模式2:基于令牌的身份验证
python
from jose import jwt, JWTError
async def authenticate_websocket(websocket: WebSocket) -> User | None:
"""Authenticate via token (not cookies - vulnerable to CSWSH)."""
token = websocket.query_params.get("token")
if not token:
await websocket.close(code=4001, reason="Authentication required")
return None
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
user = await user_service.get(payload.get("sub"))
if not user:
await websocket.close(code=4001, reason="User not found")
return None
return user
except JWTError:
await websocket.close(code=4001, reason="Invalid token")
return Nonepython
from jose import jwt, JWTError
async def authenticate_websocket(websocket: WebSocket) -> User | None:
"""通过令牌进行身份验证(不要使用Cookie - 易受CSWSH攻击)。"""
token = websocket.query_params.get("token")
if not token:
await websocket.close(code=4001, reason="Authentication required")
return None
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
user = await user_service.get(payload.get("sub"))
if not user:
await websocket.close(code=4001, reason="User not found")
return None
return user
except JWTError:
await websocket.close(code=4001, reason="Invalid token")
return NonePattern 3: Per-Message Authorization
模式3:逐消息授权
python
from pydantic import BaseModel, field_validator
class WebSocketMessage(BaseModel):
action: str
data: dict
@field_validator('action')
@classmethod
def validate_action(cls, v):
if v not in {'subscribe', 'unsubscribe', 'send', 'query'}:
raise ValueError(f'Invalid action: {v}')
return v
async def handle_message(websocket: WebSocket, user: User, raw_data: dict):
try:
message = WebSocketMessage(**raw_data)
except ValueError:
await websocket.send_json({"error": "Invalid message format"})
return
if not user.has_permission(f"ws:{message.action}"):
await websocket.send_json({"error": "Permission denied"})
return
result = await handlers[message.action](user, message.data)
await websocket.send_json(result)python
from pydantic import BaseModel, field_validator
class WebSocketMessage(BaseModel):
action: str
data: dict
@field_validator('action')
@classmethod
def validate_action(cls, v):
if v not in {'subscribe', 'unsubscribe', 'send', 'query'}:
raise ValueError(f'Invalid action: {v}')
return v
async def handle_message(websocket: WebSocket, user: User, raw_data: dict):
try:
message = WebSocketMessage(**raw_data)
except ValueError:
await websocket.send_json({"error": "Invalid message format"})
return
if not user.has_permission(f"ws:{message.action}"):
await websocket.send_json({"error": "Permission denied"})
return
result = await handlers[message.action](user, message.data)
await websocket.send_json(result)Pattern 4: Connection Manager with Rate Limiting
模式4:带速率限制的连接管理器
python
from collections import defaultdict
from time import time
class SecureConnectionManager:
def __init__(self):
self.connections: dict[str, WebSocket] = {}
self.message_counts: dict[str, list[float]] = defaultdict(list)
self.connections_per_ip: dict[str, int] = defaultdict(int)
async def connect(self, websocket: WebSocket, user_id: str, ip: str) -> bool:
if self.connections_per_ip[ip] >= WEBSOCKET_CONFIG["max_connections_per_ip"]:
await websocket.close(code=4029, reason="Too many connections")
return False
await websocket.accept()
self.connections[user_id] = websocket
self.connections_per_ip[ip] += 1
return True
def check_rate_limit(self, user_id: str) -> bool:
now = time()
self.message_counts[user_id] = [
ts for ts in self.message_counts[user_id] if ts > now - 60
]
if len(self.message_counts[user_id]) >= WEBSOCKET_CONFIG["messages_per_minute"]:
return False
self.message_counts[user_id].append(now)
return Truepython
from collections import defaultdict
from time import time
class SecureConnectionManager:
def __init__(self):
self.connections: dict[str, WebSocket] = {}
self.message_counts: dict[str, list[float]] = defaultdict(list)
self.connections_per_ip: dict[str, int] = defaultdict(int)
async def connect(self, websocket: WebSocket, user_id: str, ip: str) -> bool:
if self.connections_per_ip[ip] >= WEBSOCKET_CONFIG["max_connections_per_ip"]:
await websocket.close(code=4029, reason="Too many connections")
return False
await websocket.accept()
self.connections[user_id] = websocket
self.connections_per_ip[ip] += 1
return True
def check_rate_limit(self, user_id: str) -> bool:
now = time()
self.message_counts[user_id] = [
ts for ts in self.message_counts[user_id] if ts > now - 60
]
if len(self.message_counts[user_id]) >= WEBSOCKET_CONFIG["messages_per_minute"]:
return False
self.message_counts[user_id].append(now)
return TruePattern 5: Complete Secure Handler
模式5:完整的安全处理器
python
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
if not await validate_origin(websocket):
return
user = await authenticate_websocket(websocket)
if not user:
return
ip = websocket.client.host
if not await manager.connect(websocket, user.id, ip):
return
try:
while True:
raw = await asyncio.wait_for(
websocket.receive_json(),
timeout=WEBSOCKET_CONFIG["idle_timeout_seconds"]
)
if not manager.check_rate_limit(user.id):
await websocket.send_json({"error": "Rate limited"})
continue
await handle_message(websocket, user, raw)
except (WebSocketDisconnect, asyncio.TimeoutError):
pass
finally:
manager.disconnect(user.id, ip)python
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
if not await validate_origin(websocket):
return
user = await authenticate_websocket(websocket)
if not user:
return
ip = websocket.client.host
if not await manager.connect(websocket, user.id, ip):
return
try:
while True:
raw = await asyncio.wait_for(
websocket.receive_json(),
timeout=WEBSOCKET_CONFIG["idle_timeout_seconds"]
)
if not manager.check_rate_limit(user.id):
await websocket.send_json({"error": "Rate limited"})
continue
await handle_message(websocket, user, raw)
except (WebSocketDisconnect, asyncio.TimeoutError):
pass
finally:
manager.disconnect(user.id, ip)7. Security Standards
7. 安全标准
Domain Vulnerability Landscape
领域漏洞情况
| CVE ID | Severity | Description | Mitigation |
|---|---|---|---|
| CVE-2024-23898 | HIGH | Jenkins CSWSH - command execution | Validate Origin |
| CVE-2024-26135 | HIGH | MeshCentral CSWSH - config leak | Origin + SameSite |
| CVE-2023-0957 | CRITICAL | Gitpod CSWSH - account takeover | Origin + token auth |
| CVE编号 | 严重程度 | 描述 | 缓解措施 |
|---|---|---|---|
| CVE-2024-23898 | 高 | Jenkins CSWSH - 命令执行 | 验证Origin |
| CVE-2024-26135 | 高 | MeshCentral CSWSH - 配置泄露 | Origin验证 + SameSite |
| CVE-2023-0957 | 严重 | Gitpod CSWSH - 账户接管 | Origin验证 + 令牌身份验证 |
OWASP Top 10 Mapping
OWASP Top 10映射
| Category | Mitigations |
|---|---|
| A01 Access Control | Origin validation, per-message authz |
| A02 Crypto Failures | TLS/WSS only, signed tokens |
| A03 Injection | Validate all message content |
| A07 Auth Failures | Token auth, session validation |
| 类别 | 缓解措施 |
|---|---|
| A01 访问控制 | Origin验证、逐消息授权 |
| A02 加密失败 | 仅使用TLS/WSS、签名令牌 |
| A03 注入 | 验证所有消息内容 |
| A07 身份验证失败 | 令牌身份验证、会话验证 |
CSWSH Prevention Summary
CSWSH防护总结
python
async def secure_websocket_handler(websocket: WebSocket):
# 1. VALIDATE ORIGIN (Critical)
if websocket.headers.get("origin") not in ALLOWED_ORIGINS:
await websocket.close(code=4003)
return
# 2. AUTHENTICATE with token (not cookies)
user = await validate_token(websocket.query_params.get("token"))
if not user:
await websocket.close(code=4001)
return
# 3. Accept only after validation
await websocket.accept()
# 4. AUTHORIZE each message, 5. RATE LIMIT, 6. TIMEOUT idlepython
async def secure_websocket_handler(websocket: WebSocket):
# 1. 验证Origin(关键)
if websocket.headers.get("origin") not in ALLOWED_ORIGINS:
await websocket.close(code=4003)
return
# 2. 使用令牌进行身份验证(不要使用Cookie)
user = await validate_token(websocket.query_params.get("token"))
if not user:
await websocket.close(code=4001)
return
# 3. 验证通过后再接受连接
await websocket.accept()
# 4. 为每条消息授权,5. 速率限制,6. 闲置连接超时8. Common Mistakes & Anti-Patterns
8. 常见错误与反模式
No Origin Validation
无Origin验证
python
undefinedpython
undefinedNEVER - vulnerable to CSWSH
切勿这样做 - 易受CSWSH攻击
@app.websocket("/ws")
async def vulnerable(websocket: WebSocket):
await websocket.accept() # Accepts any origin!
@app.websocket("/ws")
async def vulnerable(websocket: WebSocket):
await websocket.accept() # 接受任何源地址!
ALWAYS - validate origin first
务必这样做 - 先验证源地址
if websocket.headers.get("origin") not in ALLOWED_ORIGINS:
await websocket.close(code=4003)
return
undefinedif websocket.headers.get("origin") not in ALLOWED_ORIGINS:
await websocket.close(code=4003)
return
undefinedCookie-Only Authentication
仅使用Cookie身份验证
python
undefinedpython
undefinedNEVER - cookies sent automatically in CSWSH attacks
切勿这样做 - Cookie在CSWSH攻击中会自动发送
session = websocket.cookies.get("session")
session = websocket.cookies.get("session")
ALWAYS - require explicit token parameter
务必这样做 - 要求显式的令牌参数
token = websocket.query_params.get("token")
undefinedtoken = websocket.query_params.get("token")
undefinedNo Per-Message Authorization
无逐消息授权
python
undefinedpython
undefinedNEVER - assumes connection = full access
切勿这样做 - 假设连接等同于完全访问权限
if data["action"] == "delete":
await delete_resource(data["id"])
if data["action"] == "delete":
await delete_resource(data["id"])
ALWAYS - check permission for each action
务必这样做 - 为每个操作检查权限
if not user.has_permission("delete"):
return {"error": "Permission denied"}
undefinedif not user.has_permission("delete"):
return {"error": "Permission denied"}
undefinedNo Input Validation
无输入验证
python
undefinedpython
undefinedNEVER - trust WebSocket messages
切勿这样做 - 信任WebSocket消息
await db.execute(f"SELECT * FROM {data['table']}") # SQL injection!
await db.execute(f"SELECT * FROM {data['table']}") # SQL注入!
ALWAYS - validate with Pydantic
务必这样做 - 使用Pydantic验证
message = WebSocketMessage(**data)
---message = WebSocketMessage(**data)
---9. Pre-Implementation Checklist
9. 实现前检查清单
Phase 1: Before Writing Code
阶段1:编写代码前
- Write failing tests for security boundaries (CSWSH, auth, authz)
- Write failing tests for connection lifecycle (connect, disconnect, timeout)
- Write failing tests for message validation and rate limiting
- Review threat model in
references/threat-model.md - Identify performance requirements (latency, throughput, connections)
- 为安全边界编写失败测试(CSWSH、身份验证、授权)
- 为连接生命周期编写失败测试(连接、断开、超时)
- 为消息验证和速率限制编写失败测试
- 审阅中的威胁模型
references/threat-model.md - 确定性能要求(延迟、吞吐量、连接数)
Phase 2: During Implementation
阶段2:实现过程中
- Origin validation against explicit allowlist
- Token-based authentication (not cookie-only)
- Per-message authorization checks
- Rate limiting and idle timeout implemented
- All messages validated with Pydantic
- Connection pooling for efficiency
- Backpressure handling for slow clients
- 针对明确的白名单进行Origin验证
- 基于令牌的身份验证(不只是Cookie)
- 逐消息授权检查
- 实现速率限制和闲置超时
- 使用Pydantic验证所有消息
- 连接池以提升效率
- 为慢客户端处理背压
Phase 3: Before Committing
阶段3:提交前
- All security tests pass:
pytest tests/websocket/ -v - No security issues:
bandit -r src/websocket/ - WSS (TLS) enforced in production config
- CSWSH test coverage verified
- Performance benchmarks meet targets (<50ms latency)
- No regressions:
pytest tests/ -v
- 所有安全测试通过:
pytest tests/websocket/ -v - 无安全问题:
bandit -r src/websocket/ - 生产配置中强制使用WSS(TLS)
- 验证CSWSH测试覆盖率
- 性能基准满足目标(延迟<50ms)
- 无回归问题:
pytest tests/ -v
10. Summary
10. 总结
Security Goals:
- CSWSH-Resistant: Origin validation, token auth
- Properly Authorized: Per-message permission checks
- Rate Limited: Prevent message flooding
- Validated: All messages treated as untrusted
Critical Reminders: ALWAYS validate Origin, use token auth (not cookies), authorize EACH message, use WSS in production.
安全目标:
- 抗CSWSH:Origin验证、令牌身份验证
- 正确授权:逐消息权限检查
- 速率限制:防止消息泛滥
- 验证输入:所有消息视为不可信
关键提醒:务必验证Origin,使用令牌身份验证(不要使用Cookie),为每条消息授权,生产环境中使用WSS。