一、WebSocket 协议原理
WebSocket 是 HTML5 定义的全双工通信协议(RFC 6455),它在单个 TCP 连接上实现客户端与服务器之间的持久双向通信,避免了 HTTP 短轮询和长轮询的性能开销。
二、Node.js 服务端实现
// server.js — Node.js WebSocket 服务器
const WebSocket = require('ws')
const http = require('http')
const server = http.createServer()
const wss = new WebSocket.Server({ server })
// 连接池管理
const clients = new Map()
// 心跳配置
const HEARTBEAT_INTERVAL = 30000 // 30秒心跳
const HEARTBEAT_TIMEOUT = 10000 // 10秒无响应断开
wss.on('connection', (ws, req) => {
const clientId = Date.now().toString(36) + Math.random().toString(36).slice(2)
clients.set(clientId, { ws, ip: req.socket.remoteAddress, connectedAt: Date.now() })
ws.isAlive = true
ws.clientId = clientId
console.log(`[连接] ${clientId} 来自 ${req.socket.remoteAddress}`)
// 消息处理
ws.on('message', (data) => {
try {
const msg = JSON.parse(data.toString())
switch (msg.type) {
case 'ping':
ws.send(JSON.stringify({ type: 'pong', ts: Date.now() }))
break
case 'chat':
// 广播消息给所有客户端
broadcast({ type: 'chat', from: clientId, content: msg.content, ts: Date.now() })
break
case 'subscribe':
// 订阅特定频道
ws.channels = ws.channels || new Set()
ws.channels.add(msg.channel)
break
default:
console.log(`[消息] ${clientId}:`, msg)
}
} catch (e) {
console.error('消息解析失败:', e.message)
}
})
// 关闭处理
ws.on('close', () => {
clients.delete(clientId)
console.log(`[断开] ${clientId}`)
})
// 错误处理
ws.on('error', (err) => {
console.error(`[错误] ${clientId}:`, err.message)
clients.delete(clientId)
})
})
// 广播消息
function broadcast(data) {
const payload = JSON.stringify(data)
wss.clients.forEach(client => {
if (client.readyState === WebSocket.OPEN) {
client.send(payload)
}
})
}
// 心跳检测
const heartbeat = setInterval(() => {
wss.clients.forEach(ws => {
if (!ws.isAlive) {
clients.delete(ws.clientId)
return ws.terminate()
}
ws.isAlive = false
ws.ping() // WebSocket 内置 ping/pong
})
}, HEARTBEAT_INTERVAL)
wss.on('close', () => clearInterval(heartbeat))
server.listen(3005, () => {
console.log('WebSocket Server running on :3005')
})
三、前端客户端实现
// client.js — 浏览器 WebSocket 客户端
class WSClient {
constructor(url) {
this.url = url
this.reconnectInterval = 1000
this.maxReconnectInterval = 30000
this.reconnectAttempts = 0
this.maxReconnectAttempts = 50
this.connect()
}
connect() {
this.ws = new WebSocket(this.url)
this.ws.onopen = () => {
console.log('[WS] 已连接')
this.reconnectAttempts = 0
this.reconnectInterval = 1000
}
this.ws.onmessage = (event) => {
const msg = JSON.parse(event.data)
this.handleMessage(msg)
}
this.ws.onclose = (event) => {
console.log(`[WS] 断开 (code=${event.code})`)
this.scheduleReconnect()
}
this.ws.onerror = (error) => {
console.error('[WS] 错误:', error)
}
}
handleMessage(msg) {
switch (msg.type) {
case 'pong':
this.latency = Date.now() - msg.ts
break
case 'chat':
console.log(`[${msg.from}]: ${msg.content}`)
break
}
}
scheduleReconnect() {
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.error('[WS] 重连次数超限,停止重连')
return
}
const delay = Math.min(
this.reconnectInterval * Math.pow(1.5, this.reconnectAttempts),
this.maxReconnectInterval
)
console.log(`[WS] ${delay}ms 后第 ${this.reconnectAttempts + 1} 次重连`)
setTimeout(() => {
this.reconnectAttempts++
this.connect()
}, delay)
}
send(data) {
if (this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(data))
}
}
close() {
this.maxReconnectAttempts = 0 // 禁止重连
this.ws.close()
}
}
// 使用
const ws = new WSClient('wss://api.example.com/ws')
ws.send({ type: 'ping', ts: Date.now() })
四、Nginx 反向代理 WebSocket
# Nginx 配置 — WebSocket 代理
map $http_upgrade $connection_upgrade {
default upgrade;
'' close;
}
server {
listen 443 ssl;
server_name api.example.com;
location /ws {
proxy_pass http://ws_backend;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection $connection_upgrade;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
# WebSocket 专用超时(比普通 HTTP 长)
proxy_read_timeout 3600s;
proxy_send_timeout 3600s;
}
}
upstream ws_backend {
# 使用 ip_hash 保证同一客户端始终连接同一后端
ip_hash;
server 127.0.0.1:3005;
server 127.0.0.1:3006;
}
五、水平扩展方案
WebSocket 是有状态连接,水平扩展比 HTTP 复杂。推荐方案:
- IP Hash 负载均衡:Nginx
ip_hash确保同一客户端始终路由到同一后端 - Redis Pub/Sub:多节点通过 Redis 交换消息,实现跨节点广播
- 会话关联 Cookie:Nginx
sticky cookie替代 IP Hash
// Redis 跨节点广播实现
const Redis = require('ioredis')
const redis = new Redis()
// 本节点收到消息后发布到 Redis
ws.on('message', (data) => {
const msg = JSON.parse(data)
if (msg.type === 'chat') {
redis.publish('ws:broadcast', JSON.stringify({
type: 'chat',
nodeId: process.env.NODE_ID,
content: msg.content
}))
}
})
// 订阅 Redis 频道,接收其他节点的消息
const subscriber = new Redis()
subscriber.subscribe('ws:broadcast')
subscriber.on('message', (channel, message) => {
const msg = JSON.parse(message)
// 广播给本节点的所有客户端
wss.clients.forEach(client => {
if (client.readyState === WebSocket.OPEN && client.nodeId !== msg.nodeId) {
client.send(message)
}
})
})
💡 WebSocket 生产要点
务必配置心跳(服务端 ping/pong + 客户端超时检测);实现指数退避重连策略;Nginx 代理超时调整为 3600s;多节点扩展使用 Redis Pub/Sub 或 MQTT Broker 实现跨节点消息。单机 WebSocket 连接数上限约为 65,536(受端口数限制),超过此值必须水平扩展。