this.server = new WebSocket.Server({ port: this.port });this.server.on('connection', (ws, req) => {const uniqueId = dataUtil.uuid();ws.id = uniqueId;global.serverSession.set(uniqueId, ws);logger.debug({ message: '客户端已连接', traceId: ws.id, address: req.socket.remoteAddress });let buffer = Buffer.alloc(0);ws.on('message', async (data) => {// WebSocket 的消息可能是 Buffer 或字符串buffer = Buffer.concat([buffer, Buffer.from(data)]);await this.processData(ws, buffer );});ws.on('close', () => {logger.debug({ message: '客户端已断开连接', traceId: ws.id });socketCloseEvent.emit(ws.id, ws.destroyType);global.serverSession.delete(ws.id);if(ws._idleTimer){clearInterval(ws._idleTimer);}});ws.on('error', (err) => {logger.error({ message: `WebSocket 连接发生错误 ${err}`, traceId: ws.id });});// 心跳检测(可选,客户端配合)ws.isAlive = true;ws.on('pong', () => { ws.isAlive = true; });// 设置连接超时(模拟 TCP_IDLE_THRESHOLD)const timeout = parseInt(process.env.TCP_IDLE_THRESHOLD);ws._idleTimer = setInterval(() => {if (!ws.isAlive) {logger.debug({ message: 'WebSocket 心跳超时断开连接', traceId: ws.id });ws.terminate();} else {ws.isAlive = false;ws.ping();}}, timeout);});

在上述代码

let buffer = Buffer.alloc(0);ws.on('message', async (data) => {// WebSocket 的消息可能是 Buffer 或字符串buffer = Buffer.concat([buffer, Buffer.from(data)]);await this.processData(ws, Buffer.from(data));});

通过以上这种方法解析时,会存在buffer 拼接半包、粘包等问题。

问题根因逐步解析

第一步:WebSocket 的 on(‘message’) 是异步且无排队机制

每次前端发送消息,Node.js 的 ws 库会调用:

ws.on('message', callback);
  • 这个回调不会等待上一个 message 处理完成;
  • 如果前端并发发送多个消息(如调用多次 ws.send()),后端会并发触发多个 on(‘message’) 回调;
  • 回调函数内部的 await 并不会阻塞下一个消息的到来。

第二步:共享状态 buffer 被多个异步回调同时修改

你在每个 on(‘message’) 中都这么写:

buffer = Buffer.concat([buffer, Buffer.from(data)]);
buffer = await this.processData(ws, buffer);
  • 多个 on(‘message’) 回调是并发执行的;
  • 但它们共用同一个 buffer;
  • 多个异步回调可能同时拼接数据,造成粘包错乱、重复消费、消息丢失。

第三步:粘包/解包逻辑对 buffer 有修改,也导致状态错乱

比如你在 processData 里这么做:

while (buffer.length >= HEADER_LENGTH) {const packetLength = buffer.readUInt32LE(0);if (buffer.length < packetLength) break;// 处理一包buffer = buffer.slice(packetLength);
}

你修改了 buffer 的引用,但如果多个 on(‘message’) 同时运行:

  • A 在 slice 的时候,B 正在 concat;
  • buffer 的状态被错改/错切;
  • 最终造成:
    • 包处理多次;
    • 某些包永远解不开;
    • 无法回收剩余数据(死循环或内存增长)

解决办法

在 WebSocket 中实现自动串行、自动排队的消息处理机制

目标:确保每个 WebSocket 连接收到的消息,在服务端串行处理,按顺序执行,且不丢、不乱、不重。

封装成“连接级消息调度器类”——自动串行消息消费器

// MessageProcessor.jsclass MessageProcessor {/*** @param {WebSocket} ws - 当前 WebSocket 连接实例* @param {Function} handler - 数据处理函数 (ws, buffer) => Promise<newBuffer>*/constructor(ws, handler) {this.ws = ws;this.handler = handler;this.queue = [];this.buffer = Buffer.alloc(0);this.processing = false;}/*** 将收到的数据放入处理队列* @param {Buffer} data*/enqueue(data) {this.queue.push(data);this._processQueue();}/*** 串行处理队列中的数据* @private*/async _processQueue() {if (this.processing) return;this.processing = true;try {while (this.queue.length > 0) {const chunk = this.queue.shift();this.buffer = Buffer.concat([this.buffer, chunk]);this.buffer = await this.handler(this.ws, this.buffer);}} catch (err) {console.error(`[MessageProcessor] Error while processing message: ${err.message}`);} finally {this.processing = false;}}
}module.exports = MessageProcessor;

使用方式:

const processor = new MessageProcessor(ws, this.processData.bind(this));ws.on('message', (data) => {processor.enqueue(Buffer.from(data));
});

优点:

  • 封装性好,每个连接一个实例;

  • 可测试、可复用;

  • 自动排队串行,无需使用锁;

  • 非阻塞,可多连接并发使用;

  • 和 through2 在语义上非常接近。

解析MessageProcessor 是如何一步步解决这些问题的?

Step 1:用内部队列排队所有消息 queue.push(data)

enqueue(data) {this.queue.push(data);this._processQueue();
}
  • 所有收到的消息都放进内部队列;
  • 消息不会被立即处理,而是等待处理器“空闲”后再处理;
  • 避免了并发执行逻辑

Step 2:使用 processing 标志,严格串行执行

async _processQueue() {if (this.processing) return;this.processing = true;while (this.queue.length > 0) {const chunk = this.queue.shift();this.buffer = Buffer.concat([this.buffer, chunk]);this.buffer = await this.handler(this.ws, this.buffer); // handler = processData}this.processing = false;
}

关键点:

  • 第一个消息进来时,开始处理;
  • 未处理完之前,不会进入下一轮处理;
  • 后续数据只能排队,不会同时触发多次 processData;
  • buffer 的状态始终由一个处理器独占。

Step 3:粘包 / 解包逻辑只在一个线程上下文中修改 buffer

由于 processData 只在 _processQueue 内调用,它拿到的是唯一的 buffer 实例,无并发修改。
你的典型 processData 中逻辑是这样:

while (buffer.length >= HEADER_LENGTH) {const len = buffer.readUInt32LE(0);if (buffer.length < len) break;const packet = buffer.slice(0, len);buffer = buffer.slice(len);// do something with packet
}

现在没有并发修改了,buffer.slice、concat 都是线程安全的。

四、所以总结:问题为什么发生 + MessageProcessor 如何解决

阶段原问题MessageProcessor 如何修复
并发触发 on('message')多次同时执行逻辑使用队列排队每条消息
buffer 是共享变量被多个 async 改写仅在一个逻辑处理器中使用
processData 修改 buffer 状态状态竞争 / 脏读串行执行保证唯一状态

五、你现在处于稳定状态的根本原因

  • 你用 MessageProcessor 隔离了每个连接的处理队列;
  • 每个连接独立维护 buffer 状态;
  • 消息进来后,不会打断正在处理的内容,形成了消息处理管线(stream-like);
    这本质上就是在 net.pipe(through2(…)) 中所依赖的 backpressure、数据流顺序处理机制。

注意: 只有当你用 WebSocket 进行“低层级的二进制通信 + 自定义协议格式”时,才需要 buffer 拼接 + 串行处理。

场景说明是否会出现“粘包/拆包/并发 buffer 错乱”问题?
✅ WebSocket + 发送 JSON 字符串浏览器默认场景,如:ws.send(JSON.stringify(obj))❌ 不会,每条消息独立、完整
✅ WebSocket + 发送 Blob / Uint8Array发送结构化数据、图片、ArrayBuffer❌ 不会,每个 message 是一个完整帧
❗ WebSocket + 发送自定义二进制协议(Buffer)比如你自定义包头(长度 + 指令 + 数据)✅ 会,需要你手动 buffer 拼接、解析边界
✅ TCP + 业务协议 + stream 处理典型 TCP 应用,粘包问题严重✅ 会,这就是你原来通过 through2 解决的方式
❌ HTTP 请求(REST)每个请求独立处理❌ 不会,天然不会粘包

为什么自定义二进制协议需要 buffer 拼接?

WebSocket 的“消息”单位 vs “物理帧”单位:

  • WebSocket 协议本身会保留帧边界,也就是说:

    浏览器/Node.js 的 ws 库可以保障你在 on(‘message’) 时拿到的是完整的一帧;

但是!!!

一帧不代表你的一条“业务逻辑消息”!

举个例子:你的自定义协议如下:

[4字节 length][2字节版本][4字节 cmdid][...data]

但你发送多个这样的包,比如:

// 你可能将 3 个包合成一个 buffer 发出:
ws.send(Buffer.concat([packet1, packet2, packet3]));

结果后端拿到的 data 是:

  • 不是一条消息;
  • 而是多个业务包合并的;
  • 所以你必须手动解析 buffer,按长度解开每个逻辑包;
  • 而这个 buffer 本身是共享状态,所以你才需要串行处理。

正常 JSON 传输为什么不会出问题?

假设你前端这样做:

ws.send(JSON.stringify({ cmd: 'ping', ts: Date.now() }));

后端直接:

ws.on('message', data => {const obj = JSON.parse(data.toString());handle(obj);
});

每一条 message 是浏览器封装好的独立帧,不存在“粘包/拆包”的问题;也没有共享 buffer、也就没有并发读写的风险。

什么时候需要处理拼包 + 并发?

条件是否需要处理拼包/并发
自己定义包头 + 发送多个合并 Buffer✅ 是,必须处理 buffer 解析、并发状态
每次发送只发一个完整包如果后端一定能拿到完整帧,一般不需要
你用了 ws.send(packet1); ws.send(packet2) 连续发仍然有可能在某些环境下合并为一帧
用 TCP socket + stream✅ 必须做拼包处理,TCP 是流,不保边界

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/92857.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/92857.shtml
英文地址,请注明出处:http://en.pswp.cn/bicheng/92857.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

元数据管理与数据治理平台:Apache Atlas 分类传播 Classification Propagation

文中内容仅限技术学习与代码实践参考&#xff0c;市场存在不确定性&#xff0c;技术分析需谨慎验证&#xff0c;不构成任何投资建议。Apache Atlas 框架是一套可扩展的核心基础治理服务&#xff0c;使企业能够有效、高效地满足 Hadoop 中的合规性要求&#xff0c;并支持与整个企…

TSF应用开发与运维部署

架构演进历程&#xff1a;单体架构-->SOA架构-->微服务架构-->Service Mesh腾讯微服务平台TSF (Tencent Service Framework) 是一个围绕应用和微服务的 PaaS 平台。提供服务全生命周期管理能力和数据化运营支持。提供多维度应用、服务、机器的监控数据&#xff0c;助力…

linux开发之mmap内存映射

mmap概念 mmp是 将文件或设备直接映射到进程的虚拟内存空间 的一种机制&#xff0c;可实现程序像访问内存一样访问文件&#xff0c;而不需要传统的 read()/write()系统调用 文件内容被映射到进程的地址空间&#xff0c;读写文件就像操作内存一样&#xff0c;操作系统负责自动同…

CPP继承

继承 一、继承概述 1、为什么需要继承 如下示例&#xff0c;Person 类、Student 类、Teacher 类有大量重复的代码&#xff0c;造成代码冗余&#xff0c;降低开发效率。我们可以通过继承来解决这一问题。在面向对象的编程语言中&#xff0c;继承是一个核心概念。主要作用将重复的…

模块 PCB 技术在未来通信领域的创新突破方向

未来通信领域对数据传输速率、信号稳定性及设备集成度的要求持续攀升&#xff0c;模块 PCB 作为通信设备的关键组件&#xff0c;其技术创新成为推动行业发展的核心动力。猎板 PCB 凭借深厚的技术积累与持续的研发投入&#xff0c;在模块 PCB 技术创新方面取得诸多突破&#xff…

mysql的InnoDB索引总结

MySQL InnoDB索引知识点总结 1. 索引类型 1.1 聚簇索引&#xff08;Clustered Index&#xff09; 定义与特性 定义&#xff1a;聚簇索引是InnoDB的默认存储方式&#xff0c;数据行按照主键的顺序物理存储在磁盘上特性&#xff1a; 每个InnoDB表只能有一个聚簇索引数据页中的记录…

C++模板的补充

类模板(上一篇没讲到类模板C/C内存管理&函数模板-CSDN博客&#xff09; 类模板的定义&#xff1a; template<class T1, class T2, ..., class Tn> class 类模板名 {// 类内成员定义 }; 用一个简单的栈例子讲类模板 #define _CRT_SECURE_NO_WARNINGS #include &l…

用JOIN替代子查询的查询性能优化

一、子查询的性能瓶颈分析‌重复执行成本‌关联子查询会导致外层每行数据触发一次子查询&#xff0c;时间复杂度为O(M*N)sql-- 典型低效案例 SELECT e.employee_id, (SELECT d.department_name FROM departments d WHERE d.department_id e.department_id) FROM employees e; …

【设计模式】访问者模式模式

访问者模式&#xff08;Visitor Pattern&#xff09;详解一、访问者模式简介 访问者模式&#xff08;Visitor Pattern&#xff09; 是一种 行为型设计模式&#xff08;对象行为型模式&#xff09;&#xff0c;它允许你在不修改对象结构的前提下&#xff0c;为对象结构中的元素添…

比特币现货和比特币合约的区别与联系

一、基本定义项目现货&#xff08;Spot&#xff09;合约&#xff08;Futures / Perpetual&#xff09;本质直接买卖比特币本身买卖比特币价格的衍生品合约所得资产真实的 BTC合约头寸&#xff08;没有直接持有 BTC&#xff09;结算方式交割比特币现金结算&#xff08;多数平台&…

Qt/C++开发监控GB28181系统/实时监测设备在线离线/视频预览自动重连/重新点播取流/低延迟

一、前言说明 一个好的视频监控系统&#xff0c;设备掉线后能够自动重连&#xff0c;也是一个重要的功能指标&#xff0c;如果监控系统只是个rtsp流地址&#xff0c;那非常好办&#xff0c;只需要重新打开流地址即可&#xff0c;而gb28181中就变得复杂了很多&#xff0c;需要多…

此芯p1开发板使用OpenHarmony时llama.cpp不同优化速度对比(GPU vs CPU)

硬件环境 Cix P1 SoC 瑞莎星睿 O6 开发板 rx580显卡 产品介绍&#xff1a; https://docs.radxa.com/orion/o6/getting-started/introduction OpenHarmony 5.0.0 使用vulkan后端的llama.cpp &#xff08;GPU&#xff09; # ./llama-bench -m /data/qwen1_5-0_5b-chat-q2_k.…

Android 四大布局:使用方式与性能优化原理

一、四大布局基本用法与特点1. LinearLayout&#xff08;线性布局&#xff09;使用方式&#xff1a;<LinearLayoutandroid:orientation"vertical" <!-- 排列方向&#xff1a;vertical/horizontal -->android:layout_width"match_parent"android:…

Redis的BigKey问题

Redis的BigKey问题 什么是大Key问题&#xff1f; 大key问题其实可以说是大value问题&#xff0c;就是某个key对应的value所占据的存储空间太大了&#xff0c;所以导致我们在操作这个key的时候花费的时间过长&#xff08;序列化\反序列化&#xff09;&#xff0c;从而降低了redi…

TDengine IDMP 产品基本概念

基本概念 元素 (Element) IDMP 通过树状层次结构来组织数据&#xff0c;树状结构里的每个节点被称之为元素 (Element)。元素是一个物理的或逻辑的实体。它可以是具体的物理设备&#xff08;比如一台汽车&#xff09;&#xff0c;物理设备的一个子系统&#xff08;比如一台汽车的…

专题二_滑动窗口_将x减到0的最小操作数

一&#xff1a;题目解释&#xff1a;每次只能移除数组的边界&#xff0c;移除的边界的总和为x&#xff0c;要求返回你移除边界的最小操作数&#xff01;也就是说你最少花几次移除边界&#xff0c;就能够让这些移除的边界的和为x&#xff0c;则返回这个次数&#xff01;所以这个…

CentOS 7 下通过 Anaconda3 运行llm大模型、deepseek大模型的完整指南

CentOS 7 下通过 Anaconda3 运行llm大模型、deepseek大模型的完整指南A1 CentOS 7 下通过 Anaconda3 运行大模型的完整指南一、环境准备二、创建专用环境三、模型部署与运行四、优化配置常见问题解决B1 CentOS 7 下通过 Anaconda3 使用 CPU 运行 DeepSeek 大模型的完整方案一、…

Flutter应用在Windows 8上正常运行

要让Flutter应用在Windows 8上正常运行,需满足以下前提条件,涵盖系统环境、依赖配置、编译设置等关键环节: 一、系统环境基础要求 Windows 8版本 必须是 Windows 8.1(核心支持),不支持早期Windows 8(需升级到8.1,微软已停止对原版Windows 8的支持)。 确认系统版本:右…

Redis实现消息队列三种方式

参考 Redis队列详解&#xff08;springboot实战&#xff09;_redis 队列-CSDN博客 前言 MQ消息队列有很多种&#xff0c;比如RabbitMQ,RocketMQ,Kafka等&#xff0c;但是也可以基于redis来实现&#xff0c;可以降低系统的维护成本和实现复杂度&#xff0c;本篇介绍redis中实现…

【C++动态版本号生成方案:实现类似C# 1.0.* 的自动构建号】

C动态版本号生成方案&#xff1a;实现类似C# 1.0.* 的自动构建号 在C#中&#xff0c;1.0.*版本号格式会在编译时自动生成构建号和修订号。本文将介绍如何在C项目中实现类似功能&#xff0c;通过MSBuild自动化生成基于编译时间的版本号。 实现原理 版本号构成&#xff1a;主版本…