WebSocket 实时应用开发:从原理到实践
前段时间做了个实时协作编辑器,踩了不少 WebSocket 的坑。
为什么用 WebSocket
对比一下方案:
| 方案 | 延迟 | 复杂度 | 适用场景 |
|---|---|---|---|
| 短轮询 | 高 | 低 | 低频更新 |
| 长轮询 | 中 | 中 | 中频更新 |
| SSE | 低 | 低 | 服务端推送 |
| WebSocket | 低 | 高 | 双向通信 |
我们的场景是多人实时编辑,双向通信、低延迟,WebSocket 是最佳选择。
基础连接
客户端
const ws = new WebSocket('wss://example.com/ws');
ws.onopen = () => {
console.log('连接成功');
};
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
handleMessage(data);
};
ws.onerror = (error) => {
console.error('WebSocket 错误', error);
};
ws.onclose = () => {
console.log('连接关闭');
// 重连逻辑
};
服务端 (Node.js + ws)
import { WebSocketServer } from 'ws';
const wss = new WebSocketServer({ port: 8080 });
wss.on('connection', (ws, req) => {
console.log('新连接');
ws.on('message', (data) => {
const message = JSON.parse(data);
// 处理消息
});
ws.on('close', () => {
console.log('连接关闭');
});
});
心跳机制
WebSocket 连接会断,而且不会主动告诉你。心跳是必须的:
// 客户端
let heartbeatTimer;
function startHeartbeat() {
heartbeatTimer = setInterval(() => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({ type: 'ping' }));
}
}, 30000);
}
function stopHeartbeat() {
clearInterval(heartbeatTimer);
}
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.type === 'pong') {
return; // 心跳响应,不需要处理
}
handleMessage(data);
};
// 服务端
const clients = new Map();
wss.on('connection', (ws) => {
const clientId = generateId();
clients.set(clientId, {
ws,
lastHeartbeat: Date.now(),
});
ws.on('message', (data) => {
const msg = JSON.parse(data);
if (msg.type === 'ping') {
ws.send(JSON.stringify({ type: 'pong' }));
clients.get(clientId).lastHeartbeat = Date.now();
return;
}
// 处理业务消息
});
});
// 定时清理断开的连接
setInterval(() => {
const now = Date.now();
for (const [id, client] of clients) {
if (now - client.lastHeartbeat > 60000) {
client.ws.terminate();
clients.delete(id);
}
}
}, 30000);
断线重连
class WebSocketClient {
constructor(url) {
this.url = url;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 5;
this.reconnectDelay = 1000;
this.connect();
}
connect() {
this.ws = new WebSocket(this.url);
this.ws.onopen = () => {
console.log('连接成功');
this.reconnectAttempts = 0;
this.startHeartbeat();
};
this.ws.onclose = () => {
this.stopHeartbeat();
this.reconnect();
};
this.ws.onerror = (error) => {
console.error('WebSocket 错误', error);
};
}
reconnect() {
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.error('重连失败,请刷新页面');
return;
}
this.reconnectAttempts++;
const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1);
console.log(`${delay}ms 后重连,第 ${this.reconnectAttempts} 次`);
setTimeout(() => {
this.connect();
}, delay);
}
}
协作编辑的冲突处理
多人同时编辑,冲突怎么处理?
OT (Operational Transformation)
每次编辑记录操作,服务器负责变换和合并:
// 客户端
function onTextChange(change) {
ws.send(JSON.stringify({
type: 'edit',
documentId: docId,
revision: currentRevision,
operation: {
type: 'insert',
position: change.position,
text: change.text,
},
}));
}
// 服务端
const documents = new Map();
function handleEdit(clientId, message) {
const doc = documents.get(message.documentId);
// 检查版本
if (message.revision !== doc.revision) {
// 需要变换
for (let i = message.revision; i < doc.revision; i++) {
message.operation = transform(message.operation, doc.history[i]);
}
}
// 应用操作
applyOperation(doc, message.operation);
// 广播给其他客户端
broadcast(message.documentId, {
type: 'edit',
revision: doc.revision,
operation: message.operation,
}, clientId);
}
CRDT (Conflict-free Replicated Data Types)
更现代的方案,不需要服务器变换:
import * as Y from 'yjs';
const ydoc = new Y.Doc();
const ytext = ydoc.getText('content');
// 本地编辑
ytext.insert(0, 'Hello');
// 同步到其他客户端
const update = Y.encodeStateAsUpdate(ydoc);
ws.send(JSON.stringify({ type: 'sync', update }));
// 接收远程更新
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.type === 'sync') {
Y.applyUpdate(ydoc, data.update);
}
};
CRDT 更适合离线编辑、P2P 场景。
性能优化
| 优化项 | 方法 |
|---|---|
| 消息压缩 | 启用 perMessageDeflate |
| 二进制传输 | 用 ArrayBuffer 代替 JSON |
| 批量发送 | 合并短时间内的多条消息 |
| 连接复用 | 同一用户复用连接 |
消息压缩
const wss = new WebSocketServer({
port: 8080,
perMessageDeflate: {
zlibDeflateOptions: {
level: 3,
},
},
});
批量发送
class MessageBatcher {
constructor(sendFn, delay = 50) {
this.sendFn = sendFn;
this.delay = delay;
this.queue = [];
this.timer = null;
}
add(message) {
this.queue.push(message);
if (!this.timer) {
this.timer = setTimeout(() => this.flush(), this.delay);
}
}
flush() {
if (this.queue.length > 0) {
this.sendFn(this.queue);
this.queue = [];
}
this.timer = null;
}
}
安全考虑
认证
WebSocket 没有 header,认证要在 URL 或首条消息里:
// URL 参数
const ws = new WebSocket(`wss://example.com/ws?token=${token}`);
// 首条消息认证
ws.onopen = () => {
ws.send(JSON.stringify({ type: 'auth', token }));
};
// 服务端
wss.on('connection', async (ws, req) => {
const token = new URL(req.url, 'http://example.com').searchParams.get('token');
try {
const user = await verifyToken(token);
ws.user = user;
} catch (error) {
ws.close(4001, '认证失败');
}
});
限流
const rateLimiter = new Map();
function checkRateLimit(clientId) {
const now = Date.now();
const limit = rateLimiter.get(clientId) || { count: 0, resetTime: now + 60000 };
if (now > limit.resetTime) {
limit.count = 0;
limit.resetTime = now + 60000;
}
if (limit.count >= 100) {
return false;
}
limit.count++;
rateLimiter.set(clientId, limit);
return true;
}
总结
WebSocket 开发比 HTTP 复杂,主要在:
- 连接管理(心跳、重连)
- 状态同步(冲突处理)
- 安全认证
做完这个项目,对实时通信的理解深了很多。CRDT 确实是未来的方向,Y.js 这种库大大降低了开发难度。