WebSocket 实时应用开发:从原理到实践

前段时间做了个实时协作编辑器,踩了不少 WebSocket 的坑。

为什么用 WebSocket

对比一下方案:

方案延迟复杂度适用场景
短轮询低频更新
长轮询中频更新
SSE服务端推送
WebSocket双向通信

我们的场景是多人实时编辑,双向通信、低延迟,WebSocket 是最佳选择。

基础连接

客户端

const ws = new WebSocket('wss://example.com/ws');

ws.onopen = () => {
  console.log('连接成功');
};

ws.onmessage = (event) => {
  const data = JSON.parse(event.data);
  handleMessage(data);
};

ws.onerror = (error) => {
  console.error('WebSocket 错误', error);
};

ws.onclose = () => {
  console.log('连接关闭');
  // 重连逻辑
};

服务端 (Node.js + ws)

import { WebSocketServer } from 'ws';

const wss = new WebSocketServer({ port: 8080 });

wss.on('connection', (ws, req) => {
  console.log('新连接');

  ws.on('message', (data) => {
    const message = JSON.parse(data);
    // 处理消息
  });

  ws.on('close', () => {
    console.log('连接关闭');
  });
});

心跳机制

WebSocket 连接会断,而且不会主动告诉你。心跳是必须的:

// 客户端
let heartbeatTimer;

function startHeartbeat() {
  heartbeatTimer = setInterval(() => {
    if (ws.readyState === WebSocket.OPEN) {
      ws.send(JSON.stringify({ type: 'ping' }));
    }
  }, 30000);
}

function stopHeartbeat() {
  clearInterval(heartbeatTimer);
}

ws.onmessage = (event) => {
  const data = JSON.parse(event.data);
  if (data.type === 'pong') {
    return; // 心跳响应,不需要处理
  }
  handleMessage(data);
};
// 服务端
const clients = new Map();

wss.on('connection', (ws) => {
  const clientId = generateId();
  clients.set(clientId, {
    ws,
    lastHeartbeat: Date.now(),
  });

  ws.on('message', (data) => {
    const msg = JSON.parse(data);
    if (msg.type === 'ping') {
      ws.send(JSON.stringify({ type: 'pong' }));
      clients.get(clientId).lastHeartbeat = Date.now();
      return;
    }
    // 处理业务消息
  });
});

// 定时清理断开的连接
setInterval(() => {
  const now = Date.now();
  for (const [id, client] of clients) {
    if (now - client.lastHeartbeat > 60000) {
      client.ws.terminate();
      clients.delete(id);
    }
  }
}, 30000);

WebSocket 连接流程

断线重连

class WebSocketClient {
  constructor(url) {
    this.url = url;
    this.reconnectAttempts = 0;
    this.maxReconnectAttempts = 5;
    this.reconnectDelay = 1000;
    this.connect();
  }

  connect() {
    this.ws = new WebSocket(this.url);

    this.ws.onopen = () => {
      console.log('连接成功');
      this.reconnectAttempts = 0;
      this.startHeartbeat();
    };

    this.ws.onclose = () => {
      this.stopHeartbeat();
      this.reconnect();
    };

    this.ws.onerror = (error) => {
      console.error('WebSocket 错误', error);
    };
  }

  reconnect() {
    if (this.reconnectAttempts >= this.maxReconnectAttempts) {
      console.error('重连失败,请刷新页面');
      return;
    }

    this.reconnectAttempts++;
    const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1);

    console.log(`${delay}ms 后重连,第 ${this.reconnectAttempts} 次`);

    setTimeout(() => {
      this.connect();
    }, delay);
  }
}

协作编辑的冲突处理

多人同时编辑,冲突怎么处理?

OT (Operational Transformation)

每次编辑记录操作,服务器负责变换和合并:

// 客户端
function onTextChange(change) {
  ws.send(JSON.stringify({
    type: 'edit',
    documentId: docId,
    revision: currentRevision,
    operation: {
      type: 'insert',
      position: change.position,
      text: change.text,
    },
  }));
}
// 服务端
const documents = new Map();

function handleEdit(clientId, message) {
  const doc = documents.get(message.documentId);

  // 检查版本
  if (message.revision !== doc.revision) {
    // 需要变换
    for (let i = message.revision; i < doc.revision; i++) {
      message.operation = transform(message.operation, doc.history[i]);
    }
  }

  // 应用操作
  applyOperation(doc, message.operation);

  // 广播给其他客户端
  broadcast(message.documentId, {
    type: 'edit',
    revision: doc.revision,
    operation: message.operation,
  }, clientId);
}

CRDT (Conflict-free Replicated Data Types)

更现代的方案,不需要服务器变换:

import * as Y from 'yjs';

const ydoc = new Y.Doc();
const ytext = ydoc.getText('content');

// 本地编辑
ytext.insert(0, 'Hello');

// 同步到其他客户端
const update = Y.encodeStateAsUpdate(ydoc);
ws.send(JSON.stringify({ type: 'sync', update }));

// 接收远程更新
ws.onmessage = (event) => {
  const data = JSON.parse(event.data);
  if (data.type === 'sync') {
    Y.applyUpdate(ydoc, data.update);
  }
};

CRDT 更适合离线编辑、P2P 场景。

协作编辑示意图

性能优化

优化项方法
消息压缩启用 perMessageDeflate
二进制传输用 ArrayBuffer 代替 JSON
批量发送合并短时间内的多条消息
连接复用同一用户复用连接

消息压缩

const wss = new WebSocketServer({
  port: 8080,
  perMessageDeflate: {
    zlibDeflateOptions: {
      level: 3,
    },
  },
});

批量发送

class MessageBatcher {
  constructor(sendFn, delay = 50) {
    this.sendFn = sendFn;
    this.delay = delay;
    this.queue = [];
    this.timer = null;
  }

  add(message) {
    this.queue.push(message);
    if (!this.timer) {
      this.timer = setTimeout(() => this.flush(), this.delay);
    }
  }

  flush() {
    if (this.queue.length > 0) {
      this.sendFn(this.queue);
      this.queue = [];
    }
    this.timer = null;
  }
}

安全考虑

认证

WebSocket 没有 header,认证要在 URL 或首条消息里:

// URL 参数
const ws = new WebSocket(`wss://example.com/ws?token=${token}`);

// 首条消息认证
ws.onopen = () => {
  ws.send(JSON.stringify({ type: 'auth', token }));
};
// 服务端
wss.on('connection', async (ws, req) => {
  const token = new URL(req.url, 'http://example.com').searchParams.get('token');

  try {
    const user = await verifyToken(token);
    ws.user = user;
  } catch (error) {
    ws.close(4001, '认证失败');
  }
});

限流

const rateLimiter = new Map();

function checkRateLimit(clientId) {
  const now = Date.now();
  const limit = rateLimiter.get(clientId) || { count: 0, resetTime: now + 60000 };

  if (now > limit.resetTime) {
    limit.count = 0;
    limit.resetTime = now + 60000;
  }

  if (limit.count >= 100) {
    return false;
  }

  limit.count++;
  rateLimiter.set(clientId, limit);
  return true;
}

总结

WebSocket 开发比 HTTP 复杂,主要在:

  1. 连接管理(心跳、重连)
  2. 状态同步(冲突处理)
  3. 安全认证

做完这个项目,对实时通信的理解深了很多。CRDT 确实是未来的方向,Y.js 这种库大大降低了开发难度。