this.server = new WebSocket.Server({ port: this.port });this.server.on('connection', (ws, req) => {const uniqueId = dataUtil.uuid();ws.id = uniqueId;global.serverSession.set(uniqueId, ws);logger.debug({ message: '客户端已连接', traceId: ws.id, address: req.socket.remoteAddress });let buffer = Buffer.alloc(0);ws.on('message', async (data) => {// WebSocket 的消息可能是 Buffer 或字符串buffer = Buffer.concat([buffer, Buffer.from(data)]);await this.processData(ws, buffer );});ws.on('close', () => {logger.debug({ message: '客户端已断开连接', traceId: ws.id });socketCloseEvent.emit(ws.id, ws.destroyType);global.serverSession.delete(ws.id);if(ws._idleTimer){clearInterval(ws._idleTimer);}});ws.on('error', (err) => {logger.error({ message: `WebSocket 连接发生错误 ${err}`, traceId: ws.id });});// 心跳检测(可选,客户端配合)ws.isAlive = true;ws.on('pong', () => { ws.isAlive = true; });// 设置连接超时(模拟 TCP_IDLE_THRESHOLD)const timeout = parseInt(process.env.TCP_IDLE_THRESHOLD);ws._idleTimer = setInterval(() => {if (!ws.isAlive) {logger.debug({ message: 'WebSocket 心跳超时断开连接', traceId: ws.id });ws.terminate();} else {ws.isAlive = false;ws.ping();}}, timeout);});
在上述代码
let buffer = Buffer.alloc(0);ws.on('message', async (data) => {// WebSocket 的消息可能是 Buffer 或字符串buffer = Buffer.concat([buffer, Buffer.from(data)]);await this.processData(ws, Buffer.from(data));});
通过以上这种方法解析时,会存在buffer 拼接半包、粘包等问题。
问题根因逐步解析
第一步:WebSocket 的 on(‘message’) 是异步且无排队机制
每次前端发送消息,Node.js 的 ws 库会调用:
ws.on('message', callback);
- 这个回调不会等待上一个 message 处理完成;
- 如果前端并发发送多个消息(如调用多次 ws.send()),后端会并发触发多个 on(‘message’) 回调;
- 回调函数内部的 await 并不会阻塞下一个消息的到来。
第二步:共享状态 buffer 被多个异步回调同时修改
你在每个 on(‘message’) 中都这么写:
buffer = Buffer.concat([buffer, Buffer.from(data)]);
buffer = await this.processData(ws, buffer);
- 多个 on(‘message’) 回调是并发执行的;
- 但它们共用同一个 buffer;
- 多个异步回调可能同时拼接数据,造成粘包错乱、重复消费、消息丢失。
第三步:粘包/解包逻辑对 buffer 有修改,也导致状态错乱
比如你在 processData 里这么做:
while (buffer.length >= HEADER_LENGTH) {const packetLength = buffer.readUInt32LE(0);if (buffer.length < packetLength) break;// 处理一包buffer = buffer.slice(packetLength);
}
你修改了 buffer 的引用,但如果多个 on(‘message’) 同时运行:
- A 在 slice 的时候,B 正在 concat;
- buffer 的状态被错改/错切;
- 最终造成:
- 包处理多次;
- 某些包永远解不开;
- 无法回收剩余数据(死循环或内存增长)
解决办法
在 WebSocket 中实现自动串行、自动排队的消息处理机制
目标:确保每个 WebSocket 连接收到的消息,在服务端串行处理,按顺序执行,且不丢、不乱、不重。
封装成“连接级消息调度器类”——自动串行消息消费器
// MessageProcessor.jsclass MessageProcessor {/*** @param {WebSocket} ws - 当前 WebSocket 连接实例* @param {Function} handler - 数据处理函数 (ws, buffer) => Promise<newBuffer>*/constructor(ws, handler) {this.ws = ws;this.handler = handler;this.queue = [];this.buffer = Buffer.alloc(0);this.processing = false;}/*** 将收到的数据放入处理队列* @param {Buffer} data*/enqueue(data) {this.queue.push(data);this._processQueue();}/*** 串行处理队列中的数据* @private*/async _processQueue() {if (this.processing) return;this.processing = true;try {while (this.queue.length > 0) {const chunk = this.queue.shift();this.buffer = Buffer.concat([this.buffer, chunk]);this.buffer = await this.handler(this.ws, this.buffer);}} catch (err) {console.error(`[MessageProcessor] Error while processing message: ${err.message}`);} finally {this.processing = false;}}
}module.exports = MessageProcessor;
使用方式:
const processor = new MessageProcessor(ws, this.processData.bind(this));ws.on('message', (data) => {processor.enqueue(Buffer.from(data));
});
优点:
-
封装性好,每个连接一个实例;
-
可测试、可复用;
-
自动排队串行,无需使用锁;
-
非阻塞,可多连接并发使用;
-
和 through2 在语义上非常接近。
解析MessageProcessor 是如何一步步解决这些问题的?
Step 1:用内部队列排队所有消息 queue.push(data)
enqueue(data) {this.queue.push(data);this._processQueue();
}
- 所有收到的消息都放进内部队列;
- 消息不会被立即处理,而是等待处理器“空闲”后再处理;
- 避免了并发执行逻辑
Step 2:使用 processing 标志,严格串行执行
async _processQueue() {if (this.processing) return;this.processing = true;while (this.queue.length > 0) {const chunk = this.queue.shift();this.buffer = Buffer.concat([this.buffer, chunk]);this.buffer = await this.handler(this.ws, this.buffer); // handler = processData}this.processing = false;
}
关键点:
- 第一个消息进来时,开始处理;
- 未处理完之前,不会进入下一轮处理;
- 后续数据只能排队,不会同时触发多次 processData;
- buffer 的状态始终由一个处理器独占。
Step 3:粘包 / 解包逻辑只在一个线程上下文中修改 buffer
由于 processData 只在 _processQueue 内调用,它拿到的是唯一的 buffer 实例,无并发修改。
你的典型 processData 中逻辑是这样:
while (buffer.length >= HEADER_LENGTH) {const len = buffer.readUInt32LE(0);if (buffer.length < len) break;const packet = buffer.slice(0, len);buffer = buffer.slice(len);// do something with packet
}
现在没有并发修改了,buffer.slice、concat 都是线程安全的。
四、所以总结:问题为什么发生 + MessageProcessor 如何解决
阶段 | 原问题 | MessageProcessor 如何修复 |
---|---|---|
并发触发 on('message') | 多次同时执行逻辑 | 使用队列排队每条消息 |
buffer 是共享变量 | 被多个 async 改写 | 仅在一个逻辑处理器中使用 |
processData 修改 buffer 状态 | 状态竞争 / 脏读 | 串行执行保证唯一状态 |
五、你现在处于稳定状态的根本原因
- 你用 MessageProcessor 隔离了每个连接的处理队列;
- 每个连接独立维护 buffer 状态;
- 消息进来后,不会打断正在处理的内容,形成了消息处理管线(stream-like);
这本质上就是在 net.pipe(through2(…)) 中所依赖的 backpressure、数据流顺序处理机制。
注意: 只有当你用 WebSocket 进行“低层级的二进制通信 + 自定义协议格式”时,才需要 buffer 拼接 + 串行处理。
场景 | 说明 | 是否会出现“粘包/拆包/并发 buffer 错乱”问题? |
---|---|---|
✅ WebSocket + 发送 JSON 字符串 | 浏览器默认场景,如:ws.send(JSON.stringify(obj)) | ❌ 不会,每条消息独立、完整 |
✅ WebSocket + 发送 Blob / Uint8Array | 发送结构化数据、图片、ArrayBuffer | ❌ 不会,每个 message 是一个完整帧 |
❗ WebSocket + 发送自定义二进制协议(Buffer) | 比如你自定义包头(长度 + 指令 + 数据) | ✅ 会,需要你手动 buffer 拼接、解析边界 |
✅ TCP + 业务协议 + stream 处理 | 典型 TCP 应用,粘包问题严重 | ✅ 会,这就是你原来通过 through2 解决的方式 |
❌ HTTP 请求(REST) | 每个请求独立处理 | ❌ 不会,天然不会粘包 |
为什么自定义二进制协议需要 buffer 拼接?
WebSocket 的“消息”单位 vs “物理帧”单位:
-
WebSocket 协议本身会保留帧边界,也就是说:
浏览器/Node.js 的 ws 库可以保障你在 on(‘message’) 时拿到的是完整的一帧;
但是!!!
一帧不代表你的一条“业务逻辑消息”!
举个例子:你的自定义协议如下:
[4字节 length][2字节版本][4字节 cmdid][...data]
但你发送多个这样的包,比如:
// 你可能将 3 个包合成一个 buffer 发出:
ws.send(Buffer.concat([packet1, packet2, packet3]));
结果后端拿到的 data 是:
- 不是一条消息;
- 而是多个业务包合并的;
- 所以你必须手动解析 buffer,按长度解开每个逻辑包;
- 而这个 buffer 本身是共享状态,所以你才需要串行处理。
正常 JSON 传输为什么不会出问题?
假设你前端这样做:
ws.send(JSON.stringify({ cmd: 'ping', ts: Date.now() }));
后端直接:
ws.on('message', data => {const obj = JSON.parse(data.toString());handle(obj);
});
每一条 message 是浏览器封装好的独立帧,不存在“粘包/拆包”的问题;也没有共享 buffer、也就没有并发读写的风险。
什么时候需要处理拼包 + 并发?
条件 | 是否需要处理拼包/并发 |
---|---|
自己定义包头 + 发送多个合并 Buffer | ✅ 是,必须处理 buffer 解析、并发状态 |
每次发送只发一个完整包 | 如果后端一定能拿到完整帧,一般不需要 |
你用了 ws.send(packet1); ws.send(packet2) 连续发 | 仍然有可能在某些环境下合并为一帧 |
用 TCP socket + stream | ✅ 必须做拼包处理,TCP 是流,不保边界 |