websocket-implementation

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

WebSocket Implementation

WebSocket实现

Build scalable real-time communication systems with proper connection management.
构建具备完善连接管理能力的可扩展实时通信系统。

Server Implementation (Socket.IO)

服务端实现(Socket.IO)

javascript
const { Server } = require('socket.io');
const { createAdapter } = require('@socket.io/redis-adapter');
const { createClient } = require('redis');

const io = new Server(server, {
  cors: { origin: process.env.CLIENT_URL, credentials: true }
});

// Redis adapter for horizontal scaling
const pubClient = createClient({ url: process.env.REDIS_URL });
const subClient = pubClient.duplicate();
Promise.all([pubClient.connect(), subClient.connect()]).then(() => {
  io.adapter(createAdapter(pubClient, subClient));
});

// Connection management
const users = new Map();

io.use((socket, next) => {
  const token = socket.handshake.auth.token;
  try {
    socket.user = verifyToken(token);
    next();
  } catch (err) {
    next(new Error('Authentication failed'));
  }
});

io.on('connection', (socket) => {
  users.set(socket.user.id, socket.id);
  console.log(`User ${socket.user.id} connected`);

  socket.on('join-room', (roomId) => {
    socket.join(roomId);
    socket.to(roomId).emit('user-joined', socket.user);
  });

  socket.on('message', ({ roomId, content }) => {
    io.to(roomId).emit('message', {
      userId: socket.user.id,
      content,
      timestamp: Date.now()
    });
  });

  socket.on('disconnect', () => {
    users.delete(socket.user.id);
  });
});

// Utility methods for message distribution
function broadcastUserUpdate(userId, data) {
  io.to(`user:${userId}`).emit('user-update', data);
}

function notifyRoom(roomId, event, data) {
  io.to(`room:${roomId}`).emit(event, data);
}

function sendDirectMessage(userId, message) {
  const socketId = users.get(userId);
  if (socketId) {
    io.to(socketId).emit('direct-message', message);
  }
}
javascript
const { Server } = require('socket.io');
const { createAdapter } = require('@socket.io/redis-adapter');
const { createClient } = require('redis');

const io = new Server(server, {
  cors: { origin: process.env.CLIENT_URL, credentials: true }
});

// Redis adapter for horizontal scaling
const pubClient = createClient({ url: process.env.REDIS_URL });
const subClient = pubClient.duplicate();
Promise.all([pubClient.connect(), subClient.connect()]).then(() => {
  io.adapter(createAdapter(pubClient, subClient));
});

// Connection management
const users = new Map();

io.use((socket, next) => {
  const token = socket.handshake.auth.token;
  try {
    socket.user = verifyToken(token);
    next();
  } catch (err) {
    next(new Error('Authentication failed'));
  }
});

io.on('connection', (socket) => {
  users.set(socket.user.id, socket.id);
  console.log(`User ${socket.user.id} connected`);

  socket.on('join-room', (roomId) => {
    socket.join(roomId);
    socket.to(roomId).emit('user-joined', socket.user);
  });

  socket.on('message', ({ roomId, content }) => {
    io.to(roomId).emit('message', {
      userId: socket.user.id,
      content,
      timestamp: Date.now()
    });
  });

  socket.on('disconnect', () => {
    users.delete(socket.user.id);
  });
});

// Utility methods for message distribution
function broadcastUserUpdate(userId, data) {
  io.to(`user:${userId}`).emit('user-update', data);
}

function notifyRoom(roomId, event, data) {
  io.to(`room:${roomId}`).emit(event, data);
}

function sendDirectMessage(userId, message) {
  const socketId = users.get(userId);
  if (socketId) {
    io.to(socketId).emit('direct-message', message);
  }
}

Client Implementation

客户端实现

javascript
import { io } from 'socket.io-client';

class WebSocketClient {
  constructor(url, token) {
    this.socket = io(url, {
      auth: { token },
      reconnection: true,
      reconnectionDelay: 1000,
      reconnectionAttempts: 5
    });

    this.messageQueue = [];
    this.setupListeners();
  }

  setupListeners() {
    this.socket.on('connect', () => {
      console.log('Connected');
      this.flushQueue();
    });

    this.socket.on('disconnect', (reason) => {
      console.log('Disconnected:', reason);
    });

    this.socket.on('message', (msg) => {
      this.onMessage?.(msg);
    });
  }

  joinRoom(roomId) {
    this.socket.emit('join-room', roomId);
  }

  send(roomId, content) {
    if (this.socket.connected) {
      this.socket.emit('message', { roomId, content });
    } else {
      this.messageQueue.push({ roomId, content });
    }
  }

  flushQueue() {
    while (this.messageQueue.length > 0) {
      const msg = this.messageQueue.shift();
      this.socket.emit('message', msg);
    }
  }
}
javascript
import { io } from 'socket.io-client';

class WebSocketClient {
  constructor(url, token) {
    this.socket = io(url, {
      auth: { token },
      reconnection: true,
      reconnectionDelay: 1000,
      reconnectionAttempts: 5
    });

    this.messageQueue = [];
    this.setupListeners();
  }

  setupListeners() {
    this.socket.on('connect', () => {
      console.log('Connected');
      this.flushQueue();
    });

    this.socket.on('disconnect', (reason) => {
      console.log('Disconnected:', reason);
    });

    this.socket.on('message', (msg) => {
      this.onMessage?.(msg);
    });
  }

  joinRoom(roomId) {
    this.socket.emit('join-room', roomId);
  }

  send(roomId, content) {
    if (this.socket.connected) {
      this.socket.emit('message', { roomId, content });
    } else {
      this.messageQueue.push({ roomId, content });
    }
  }

  flushQueue() {
    while (this.messageQueue.length > 0) {
      const msg = this.messageQueue.shift();
      this.socket.emit('message', msg);
    }
  }
}

React Hook

React Hook

javascript
function useWebSocket(url) {
  const [socket, setSocket] = useState(null);
  const [connected, setConnected] = useState(false);
  const [messages, setMessages] = useState([]);

  useEffect(() => {
    // getToken() is a user-supplied helper that returns the current auth token
    // Example implementations:
    // - From localStorage: () => localStorage.getItem('authToken')
    // - From context: () => authContext.token
    // - From cookie: () => document.cookie.split('token=')[1]
    const ws = io(url, { auth: { token: getToken() } });

    ws.on('connect', () => setConnected(true));
    ws.on('disconnect', () => setConnected(false));
    ws.on('message', (msg) => {
      setMessages(prev => [...prev, msg]);
    });

    setSocket(ws);
    return () => ws.disconnect();
  }, [url]);

  const send = useCallback((roomId, content) => {
    socket?.emit('message', { roomId, content });
  }, [socket]);

  return { connected, messages, send };
}
javascript
function useWebSocket(url) {
  const [socket, setSocket] = useState(null);
  const [connected, setConnected] = useState(false);
  const [messages, setMessages] = useState([]);

  useEffect(() => {
    // getToken() is a user-supplied helper that returns the current auth token
    // Example implementations:
    // - From localStorage: () => localStorage.getItem('authToken')
    // - From context: () => authContext.token
    // - From cookie: () => document.cookie.split('token=')[1]
    const ws = io(url, { auth: { token: getToken() } });

    ws.on('connect', () => setConnected(true));
    ws.on('disconnect', () => setConnected(false));
    ws.on('message', (msg) => {
      setMessages(prev => [...prev, msg]);
    });

    setSocket(ws);
    return () => ws.disconnect();
  }, [url]);

  const send = useCallback((roomId, content) => {
    socket?.emit('message', { roomId, content });
  }, [socket]);

  return { connected, messages, send };
}

Message Protocol

消息协议

typescript
interface Message {
  type: 'message' | 'typing' | 'presence';
  roomId: string;
  userId: string;
  content?: string;
  timestamp: number;
}

// Acknowledge delivery
socket.emit('message', data, (ack) => {
  if (ack.success) console.log('Delivered');
});
typescript
interface Message {
  type: 'message' | 'typing' | 'presence';
  roomId: string;
  userId: string;
  content?: string;
  timestamp: number;
}

// Acknowledge delivery
socket.emit('message', data, (ack) => {
  if (ack.success) console.log('Delivered');
});

Additional Implementations

其他实现

See references/python-websocket.md for:
  • Python aiohttp WebSocket server
  • FastAPI WebSocket endpoints
  • Async connection handling
查看 references/python-websocket.md 了解以下内容:
  • Python aiohttp WebSocket 服务端
  • FastAPI WebSocket 端点
  • 异步连接处理

Scaling Considerations

扩展考量

ConnectionsArchitecture
<10KSingle server
10K-100KRedis pub/sub
>100KSharded Redis + load balancer
连接数架构
<10K单服务器
10K-100KRedis 发布/订阅
>100K分片Redis + 负载均衡

Monitoring Endpoints

监控端点

javascript
// Express endpoints for operational visibility
app.get('/api/ws/stats', (req, res) => {
  res.json({
    activeConnections: io.sockets.sockets.size,
    rooms: [...io.sockets.adapter.rooms.keys()],
    users: users.size
  });
});

app.get('/api/ws/health', (req, res) => {
  res.json({
    status: 'healthy',
    uptime: process.uptime(),
    memoryUsage: process.memoryUsage()
  });
});
javascript
// Express endpoints for operational visibility
app.get('/api/ws/stats', (req, res) => {
  res.json({
    activeConnections: io.sockets.sockets.size,
    rooms: [...io.sockets.adapter.rooms.keys()],
    users: users.size
  });
});

app.get('/api/ws/health', (req, res) => {
  res.json({
    status: 'healthy',
    uptime: process.uptime(),
    memoryUsage: process.memoryUsage()
  });
});

Best Practices

最佳实践

  • Authenticate before allowing operations
  • Implement reconnection with exponential backoff
  • Use rooms and channels for targeted broadcasting
  • Add heartbeat/ping for connection health
  • Persist important messages to database
  • Monitor active connection counts
  • Display user presence/availability status
  • Implement rate limiting on incoming messages
  • Use acknowledgments to confirm message delivery
  • Leverage Redis for distributed deployments
  • Implement comprehensive error handling
  • 允许操作前先完成身份认证
  • 实现带指数退避的重连机制
  • 使用房间和频道进行定向广播
  • 添加心跳/ ping 机制检测连接健康状态
  • 将重要消息持久化到数据库
  • 监控活跃连接数量
  • 展示用户在线/可用状态
  • 对传入消息实现速率限制
  • 使用确认机制验证消息送达状态
  • 分布式部署场景下使用Redis
  • 实现完善的错误处理

Never Do

禁止事项

  • Send sensitive data unencrypted
  • Store unlimited messages in memory
  • Skip authorization on room joins
  • Ignore connection error handling
  • Allow unbounded room subscriptions
  • Neglect cleanup of disconnected user data
  • Send frequent oversized message payloads
  • Include authentication credentials in message bodies
  • Deploy without security validation
  • Allow uncontrolled connection accumulation
  • Build without scalability consideration
  • 未加密传输敏感数据
  • 在内存中存储无上限的消息
  • 加入房间时跳过权限校验
  • 忽略连接错误处理
  • 允许无限制的房间订阅
  • 未清理已断开连接的用户数据
  • 频繁发送超大消息负载
  • 在消息体中包含身份认证凭证
  • 未经过安全验证就部署
  • 允许不受控的连接累积
  • 开发时不考虑扩展性