Loading...
Loading...
Real-time bidirectional communication with security focus on CSWSH prevention, authentication, and message validation
npx skill4agent add martinholovsky/claude-skills-generator websocket| Situation | Approach |
|---|---|
| New connection | Validate Origin, require authentication token |
| Each message | Validate format, check authorization for action |
| Sensitive operations | Re-verify session, log action |
| Idle connection | Timeout after inactivity period |
| Error condition | Close connection, log details |
| Component | Version | Notes |
|---|---|---|
| FastAPI/Starlette | 0.115+ | WebSocket support |
| websockets | 12.0+ | Python WebSocket library |
WEBSOCKET_CONFIG = {
"max_message_size": 1024 * 1024, # 1MB
"max_connections_per_ip": 10,
"idle_timeout_seconds": 300,
"messages_per_minute": 60,
}
# NEVER use "*" for origins
ALLOWED_ORIGINS = ["https://app.example.com", "https://admin.example.com"]import pytest
from httpx import AsyncClient, ASGITransport
from fastapi.testclient import TestClient
# Test security boundaries first
@pytest.mark.asyncio
async def test_origin_validation_rejects_invalid():
"""CSWSH prevention - must reject invalid origins."""
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test"
) as client:
# This should fail until origin validation is implemented
with pytest.raises(Exception):
async with client.websocket_connect(
"/ws?token=valid",
headers={"Origin": "https://evil.com"}
):
pass
@pytest.mark.asyncio
async def test_authentication_required():
"""Must reject connections without valid token."""
with TestClient(app) as client:
with pytest.raises(Exception):
with client.websocket_connect("/ws") as ws:
pass
@pytest.mark.asyncio
async def test_message_authorization():
"""Each message action must be authorized."""
with TestClient(app) as client:
with client.websocket_connect(
"/ws?token=readonly_user",
headers={"Origin": "https://app.example.com"}
) as ws:
ws.send_json({"action": "delete", "id": "123"})
response = ws.receive_json()
assert response.get("error") == "Permission denied"# Implement only what's needed to pass the test
async def validate_origin(websocket: WebSocket) -> bool:
origin = websocket.headers.get("origin")
if not origin or origin not in ALLOWED_ORIGINS:
await websocket.close(code=4003, reason="Invalid origin")
return False
return True# Run all WebSocket tests
pytest tests/websocket/ -v --asyncio-mode=auto
# Check for security issues
bandit -r src/websocket/
# Verify no regressions
pytest tests/ -v# BAD - Create new connection for each request
ws = await create_connection(user_id) # Expensive!
# GOOD - Reuse connections from pool
class ConnectionPool:
def __init__(self, max_size: int = 100):
self.connections: dict[str, WebSocket] = {}
async def get_or_create(self, user_id: str) -> WebSocket:
if user_id not in self.connections:
self.connections[user_id] = await create_connection(user_id)
return self.connections[user_id]# BAD - Send messages one at a time
for item in items:
await websocket.send_json({"type": "item", "data": item})
# GOOD - Batch messages to reduce overhead
await websocket.send_json({"type": "batch", "data": items[:50]})# BAD - JSON for high-frequency data (~80 bytes)
await websocket.send_json({"x": 123.456, "y": 789.012, "z": 456.789})
# GOOD - Binary format (20 bytes)
import struct
await websocket.send_bytes(struct.pack('!3f', 123.456, 789.012, 456.789))# BAD - Fixed frequent heartbeats
HEARTBEAT_INTERVAL = 5 # Every 5 seconds
# GOOD - Adaptive heartbeats based on activity
interval = 60 if (time() - last_activity) < 60 else 30# BAD - Blocks on slow clients
await ws.send_json(message)
# GOOD - Timeout and bounded queue
from collections import deque
queue = deque(maxlen=100) # Drop oldest when full
try:
await asyncio.wait_for(ws.send_json(message), timeout=1.0)
except asyncio.TimeoutError:
pass # Client too slowfrom fastapi import WebSocket
async def validate_origin(websocket: WebSocket) -> bool:
"""Validate WebSocket origin against allowlist."""
origin = websocket.headers.get("origin")
if not origin or origin not in ALLOWED_ORIGINS:
await websocket.close(code=4003, reason="Invalid origin")
return False
return True
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
if not await validate_origin(websocket):
return
await websocket.accept()from jose import jwt, JWTError
async def authenticate_websocket(websocket: WebSocket) -> User | None:
"""Authenticate via token (not cookies - vulnerable to CSWSH)."""
token = websocket.query_params.get("token")
if not token:
await websocket.close(code=4001, reason="Authentication required")
return None
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
user = await user_service.get(payload.get("sub"))
if not user:
await websocket.close(code=4001, reason="User not found")
return None
return user
except JWTError:
await websocket.close(code=4001, reason="Invalid token")
return Nonefrom pydantic import BaseModel, field_validator
class WebSocketMessage(BaseModel):
action: str
data: dict
@field_validator('action')
@classmethod
def validate_action(cls, v):
if v not in {'subscribe', 'unsubscribe', 'send', 'query'}:
raise ValueError(f'Invalid action: {v}')
return v
async def handle_message(websocket: WebSocket, user: User, raw_data: dict):
try:
message = WebSocketMessage(**raw_data)
except ValueError:
await websocket.send_json({"error": "Invalid message format"})
return
if not user.has_permission(f"ws:{message.action}"):
await websocket.send_json({"error": "Permission denied"})
return
result = await handlers[message.action](user, message.data)
await websocket.send_json(result)from collections import defaultdict
from time import time
class SecureConnectionManager:
def __init__(self):
self.connections: dict[str, WebSocket] = {}
self.message_counts: dict[str, list[float]] = defaultdict(list)
self.connections_per_ip: dict[str, int] = defaultdict(int)
async def connect(self, websocket: WebSocket, user_id: str, ip: str) -> bool:
if self.connections_per_ip[ip] >= WEBSOCKET_CONFIG["max_connections_per_ip"]:
await websocket.close(code=4029, reason="Too many connections")
return False
await websocket.accept()
self.connections[user_id] = websocket
self.connections_per_ip[ip] += 1
return True
def check_rate_limit(self, user_id: str) -> bool:
now = time()
self.message_counts[user_id] = [
ts for ts in self.message_counts[user_id] if ts > now - 60
]
if len(self.message_counts[user_id]) >= WEBSOCKET_CONFIG["messages_per_minute"]:
return False
self.message_counts[user_id].append(now)
return True@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
if not await validate_origin(websocket):
return
user = await authenticate_websocket(websocket)
if not user:
return
ip = websocket.client.host
if not await manager.connect(websocket, user.id, ip):
return
try:
while True:
raw = await asyncio.wait_for(
websocket.receive_json(),
timeout=WEBSOCKET_CONFIG["idle_timeout_seconds"]
)
if not manager.check_rate_limit(user.id):
await websocket.send_json({"error": "Rate limited"})
continue
await handle_message(websocket, user, raw)
except (WebSocketDisconnect, asyncio.TimeoutError):
pass
finally:
manager.disconnect(user.id, ip)| CVE ID | Severity | Description | Mitigation |
|---|---|---|---|
| CVE-2024-23898 | HIGH | Jenkins CSWSH - command execution | Validate Origin |
| CVE-2024-26135 | HIGH | MeshCentral CSWSH - config leak | Origin + SameSite |
| CVE-2023-0957 | CRITICAL | Gitpod CSWSH - account takeover | Origin + token auth |
| Category | Mitigations |
|---|---|
| A01 Access Control | Origin validation, per-message authz |
| A02 Crypto Failures | TLS/WSS only, signed tokens |
| A03 Injection | Validate all message content |
| A07 Auth Failures | Token auth, session validation |
async def secure_websocket_handler(websocket: WebSocket):
# 1. VALIDATE ORIGIN (Critical)
if websocket.headers.get("origin") not in ALLOWED_ORIGINS:
await websocket.close(code=4003)
return
# 2. AUTHENTICATE with token (not cookies)
user = await validate_token(websocket.query_params.get("token"))
if not user:
await websocket.close(code=4001)
return
# 3. Accept only after validation
await websocket.accept()
# 4. AUTHORIZE each message, 5. RATE LIMIT, 6. TIMEOUT idle# NEVER - vulnerable to CSWSH
@app.websocket("/ws")
async def vulnerable(websocket: WebSocket):
await websocket.accept() # Accepts any origin!
# ALWAYS - validate origin first
if websocket.headers.get("origin") not in ALLOWED_ORIGINS:
await websocket.close(code=4003)
return# NEVER - cookies sent automatically in CSWSH attacks
session = websocket.cookies.get("session")
# ALWAYS - require explicit token parameter
token = websocket.query_params.get("token")# NEVER - assumes connection = full access
if data["action"] == "delete":
await delete_resource(data["id"])
# ALWAYS - check permission for each action
if not user.has_permission("delete"):
return {"error": "Permission denied"}# NEVER - trust WebSocket messages
await db.execute(f"SELECT * FROM {data['table']}") # SQL injection!
# ALWAYS - validate with Pydantic
message = WebSocketMessage(**data)references/threat-model.mdpytest tests/websocket/ -vbandit -r src/websocket/pytest tests/ -v