消息存储系统
聊天室设计,消息存储系统非常关键,因为一开始设计时使用MongoDB,所以后续使用schemma方式存储。
后端架构:express + MongoDB
消息插入策略
在 MongoDB 中设计聊天消息存储时,插入策略的选择会影响性能、扩展性和查询效率。一般消息插入会按照单条消息插入和群组插入,这里我选择使用单条消息插入,理由在下方的比较当中。
对插入方式进行比较:
1. 单条消息独立插入
每条消息作为独立文档插入 messages 集合
优点:
- 查询灵活:可以轻松实现各种查询需求(按群组、按用户、按时间范围等)
- 扩展性强:天然支持分片(按 group_id 分片效果最佳)
- 写入性能高:MongoDB 对单文档插入做了高度优化
- 维护简单:消息更新/删除只需操作单条文档
- 存储效率:未读消息标记等状态管理更方便
缺点:
- 大量消息时集合文档数增长快(但MongoDB 单集合支持千万级文档无压力)
- 获取群组完整聊天记录需要查询多个文档(可通过索引优化)
2. 按群组聚合插入
每个聊天室(群组)作为一个文档,消息以子文档数组形式存储
优点:
- 获取某个群组所有消息只需读取一个文档
- 理论上减少文档总数
缺点:
- 文档大小限制:MongoDB 单文档最大16MB,高频聊天群容易超出限制
- 写入冲突:高并发时对同一群组文档的更新会产生锁竞争
- 扩展困难:无法有效分片,性能随群组活跃度下降明显
- 查询局限:难以实现跨群组查询、全文搜索等复杂需求
- 更新低效:修改/删除单条消息需要定位到数组元素
这两种插入方式优缺点都很明显,但是对于绝大多数生产级聊天应用,推荐使用单条消息独立插入的方式,只有在非常特殊的小规模、低频率场景下,才考虑使用按群组聚合的插入方式。虽然表面上看起来会产生更多文档,但 MongoDB 对这种模式有最好的支持。
并且对于单条消息独立插入也可以进行优化,使用索引和分片策略、数据分页等方式优化性能和效率。
单条消息插入的优化策略
1. 批量插入代替单条插入
使用insertMany代替insertOne的方式,批量插入的消息效率比单条插入更高
const messages = [...];
db.messages.insertMany(messages, { ordered: false }); // ordered:false 忽略错误继续插入
2. 索引优化技巧
使用覆盖索引优化常见查询,覆盖索引查询的关键在于使用投影条件,只返回查询结果所需字段,从而避免对实际文档的访问。
- 所有的查询字段是索引的一部分
- 所有的查询返回字段在同一个索引中
MongoDB 有一个字段索引的特定应用程序,称为覆盖索引查询(Covered Queries),其中查询的所有列都被进行索引。
通过创建适当的索引,使查询可以直接从索引中获取所需的数据,而无需访问实际的文档数据,减少磁盘 I/O 和内存消耗,提高查询性能
使用部分索引减少索引大小,仅索引集合中符合指定过滤器表达式的文档
3. 分片策略
可以进行按聊天室(群组)分片,可以确保同一群组的消息物理上存储在相同分片上,同时保持集群负载均衡。
消息接口
消息模型
接下来设计node + express的路由接口配置,先准备消息的模型messageSchema
/*** 消息模型* @typedef {Object} Message* @property {string} messageId - 消息ID 必填字符串类型字段,必须唯一* @property {string} roomId - 聊天室ID 必填字符串类型字段,必须唯一* @property {string} senderId - 消息发送者ID 必填字符串类型字段,必须唯一* @property {string} username - 用户名 必填字符串类型字段,必须唯一* @property {Date} time - 消息发送时间* @property {string} content - 消息内容* @property {Array} mentions - 消息中被@的用户信息数组* @property {boolean} deleted - 是否删除* @property {string} deletedBy - 删除者ID* @property {Date} deletedAt - 删除时间* @property {string} messageType - 消息类型,默认为 'text'* @property { Array } attachments - 附件信息数组,如图片、文件等* @property { string } attachments.type - 附件类型,如 'image', 'file'* @property { string } attachments.url - 附件URL* @property { string } attachments.name - 附件名称*/
const messageSchema = new Schema({messageId: { type: String, required: true, unique: true },roomId: { type: String, required: true },senderId: { type: String, required: true },username: { type: String, required: true },time: { type: Date, default: Date.now },content: { type: String, required: true },mentions: [{username: String, // 被@的用户名position: Number // 在内容中的位置}],deleted: { type: Boolean, default: false }, // 是否删除deletedBy: { type: String, default: '' }, // 删除者IDdeletedAt: { type: Date, default: '' }, // 删除时间messageType: { type: String, enum: ['text', 'image', 'file'], default: 'text' }, // 消息类型attachments: [{ // 附件信息数组,如图片、文件等type: String, // 附件类型,如 'image', 'file'url: String, // 附件URLname: String // 附件名称}],
});
路由接口
提前定义一些将需要用到的路由接口,创建在routes
文件夹中,引入controller
控制器和token检测中间件,在所有路由处理前添加 authMiddleware 进行身份验证,确保只有携带有效 token 的用户才能访问消息相关接口。
关于token检测中间件相关可以看:JWTの求生记录 🌟 JWT这玩意儿就像你对象的聊天记录——看不懂但必须得会验证,不然API分分钟给你返回401,来! - 掘金
然后在主路由文件中挂载消息路由处理
// 挂载chatRoom文件夹中的消息路由
router.use('/chatRoom', messageRouter);
创建对应的消息处理控制器messageController.js
,将之前所需要的方法都进行创建,下面我将以发送消息sendMessage为例进行讲解。
messageController控制器
controller
创建sendMessage方法,从请求体获取参数: senderId , username , content , roomId
调用 handleMessageContent 处理消息内容,返回 mentions , messageType , attachments,这里是为了将消息内容中的@用户信息提取出来,存入mentions数组中,因为不额外处理图片逻辑了,所以messageType , attachments都是默认值,使用正则表达式提取@用户信息。
最后,用 messageService.createMessage 保存消息到数据库,返回相应的处理结果给前端
sendMessage代码:
/*** 发送消息* @param {Object} req - 请求对象* @param {Object} res - 响应对象* @returns {Promise<void>} - 返回一个Promise对象,解析为void*/
const sendMessage = async (req, res) => {try {// 获取请求参数const { senderId, username, content, roomId } = req.body;// 处理content中的@用户信息,提取出用户名,存入mentions数组中let { mentions, messageType, attachments } = handleMessageContent(content);// 调用服务层方法,创建消息并保存到数据库中,返回保存后的消息对象,包括result, message, data, code等属性let getResult = await messageService.createMessage(senderId, username, content, mentions, messageType, attachments, roomId);return res.status(getResult.code).json(getResult);} catch (error) {return res.status(500).json({ status: 'failure', code: 500, message: '发送消息失败' });}
}
handleMessageContent 代码:
const handleMessageContent = (content) => {let { html } = content;let mentions = [], attachments = [], messageType = 'text'; // 用于存储@用户信息的数组// "<span class="insert-mention" contenteditable="false" style="color: rgb(0, 123, 255);">@李四</span> 2324"// 正则表达式匹配@用户信息的格式const mentionRegex = /<span class="insert-mention" contenteditable="false"[^>]*>@([^<]+)<\/span>/g;// 遍历消息内容中的所有@用户信息,提取出用户名,存入mentions数组中const mentionMatches = html.match(mentionRegex);if (mentionMatches) {mentionMatches.forEach((mention) => {// 提取出用户名,存入mentions数组中let username = mention.match(/@([^<]+)/)[1]; // 使用正则表达式提取出用户名mentions.push({ username }); // 存入mentions数组中,同时记录位置信息});}return { mentions, messageType, attachments };
}
在一开始的时候还是写了图片处理,但是为了加快速度省去了这部分内容
- 检测base64图片数据
- 解码并保存图片到本地
- 生成图片信息对象存入attachments数组
- 设置messageType为’image’
if (images && images.length > 0) { // 如果有图片,将图片信息存入attachments数组中attachments = images.map((image) => { // 遍历图片数组,将图片信息存入attachments数组中// 对base64图片数据进行特殊处理try { if (image.data.startsWith('data:image/')) { // 如果是base64图片数据,将其存入attachments数组中,同时记录图片类型和urllet base64Data = image.data.split(',')[1]; // 提取出base64图片数据let imageType = image.data.split(';')[0].split('/')[1]; // 提取出图片类型,如jpg、png等let imageName = `${Date.now()}.${imageType}`; // 生成图片名称,使用当前时间戳作为文件名,加上图片类型作为后缀名let imagePath = `E:/工作文件04/New_Work/images/${imageName}`; // 生成图片路径,使用images文件夹作为存放图片的目录// 将图片数据写入到指定路径中,使用fs模块的writeFileSync方法,将base64图片数据解码后写入到指定路径中,使用base64解码方法decodeBase64fs.writeFileSync(imagePath, decodeBase64(base64Data), 'base64'); // 将base64图片数据解码后写入到指定路径中,使用base64解码方法decodeBase64return { type: 'image', url: imagePath, name: imageName }; // 存入图片信息,包括类型、url和名称}} catch (error) { // 如果处理过程中出现错误,打印错误信息,并返回nullconsole.error('处理图片数据失败:', error); // 打印错误信息return null; // 返回null,表示处理失败}})messageType = 'image'; // 将消息类型设置为image
}
messageService服务层
创建一个messageService.js文件进行业务逻辑处理,创建消息方法createMessage,此方法作用是创建并保存消息到数据库。
逻辑流程:
- 接收参数: senderId , username , content , mentions , messageType , attachments , roomId
- 生成消息ID和当前时间戳
- 创建新的Message实例
- 调用DBService.insertMany保存到数据库
- 处理保存结果:
- 成功:返回200状态码和消息数据
- 失败:返回400状态码和错误信息
const createMessage = async (senderId, username, content, mentions, messageType, attachments, roomId) => {try {let messageId = generateId(8);let time = new Date(); // 获取当前时间作为消息创建时间const newMessage = new Message({ messageId, roomId, senderId, username, time, content, mentions, messageType, attachments }); // 保存到数据库let result = await DBService.insertMany('Message', newMessage); // 保存到数据库if (result.success) { // 如果保存成功,返回保存后的消息对象return { result: 'success', message: '保存消息成功', data: result.data, code: 200 }; // 返回保存后的消息对象} else { // 如果保存失败,返回错误信息return { result:'failure', message: '保存消息失败', data: null, code: 400 }; }} catch (error) {console.error('保存消息失败:', error); // 打印错误信息}
}
这样基本的数据逻辑操作就完成了,在前端页面中发送聊天消息进行验证吧
这里再按之前的方式,编写一个根据roomId列表获取每个聊天室最新消息的get方法,从query中获取roomIdList
,检查 roomIdList
是否为空,如果为空则返回400错误。设置查询参数:
- limit: 1 只获取最后一条消息
- sort: { time: -1 } 按时间降序排序
调用服务层的 getMessagesByRoomIdList 方法执行实际查询,这里本来是将id数组一个一个去进行查询,使用Promise.all
获取所有结果,但是此种方法需要访问数据库的次数会过多,对性能不太好。
代码如下:
const latestMessagesPromises = roomIdList.map(roomId => model.find({ roomId }).sort({ time: -1 }).limit(1)
);
const latestMessages = await Promise.all(latestMessagesPromises);
或许我发现了聚合查询,在mongose
中可以使用aggregate执行复杂的数据转换和分析,最终修改完只需要对数据库执行一次查询访问即可。
const getMessagesByRoomIdList = async (roomIdList, limit, sort) => {try {if (typeof roomIdList == "string") { // 如果房间Id列表是字符串,说明只有一个房间Id,需要转换为数组roomIdList = JSON.parse(roomIdList.replace(/'/g, '"'));; // 将字符串转换为数组}// 为每个房间ID查询最新一条消息,使用聚合查询const aggregateQuery = [{$match: { roomId: { $in: roomIdList } } // 匹配房间ID在列表中的消息}, {$sort: sort // 按时间降序排序}, {$group: { // 按房间ID分组_id: '$roomId', // 分组字段为房间IDmessage: { $first: '$$ROOT' } // 获取每个分组中的第一条消息}}, {$project: { // 投影字段,只返回消息内容和时间content: '$message.content', // 返回消息内容time: '$message.time', // 返回消息时间username: '$message.username', // 消息发送者用户名messageType: '$message.messageType', // 消息类型}}]; let result = await DBService.aggregate('Message', aggregateQuery); // 查询消息列表if (result.success) { // 如果查询成功,返回消息列表数组return { result:'success', message: '获取消息列表成功', data: result.data, code: 200 }; // 返回消息列表数组} else { // 如果查询失败,返回错误信息return { result:'failure', message: '获取消息列表失败', data: null, code: 400 }; // 返回错误信息} } catch (error) {console.error('获取消息列表失败:', error); // 打印错误信息}
}
这里其他的查询方法也是类似按自己的需求写完了,现在在前端登录后界面上就能够获取到之前发送到数据库中的消息列表了
WebSocket消息通信
在构建实时通信功能时,我选择了Socket.IO库作为解决方案。这并非首次尝试,此前我曾基于Socket.IO开发过一个微信风格的聊天界面:SocketIO の 聊天练习基于socketIO的双向通信,准备制作一个聊天界面。聊天界面的大体样式参考于微信界面,后 - 掘金
再次选择Socket.IO的主要原因很实际:它提供了高层抽象,使开发者无需处理底层细节,能够专注于业务逻辑实现 😋
安装过程非常简单:
- 后端安装命令:
yarn add socket.io
- 前端安装命令:
yarn add socket.io-client
实现效果:
1. 基础配置
后端采用Express框架,通过以下代码初始化Socket.IO服务:
const http = require('http');
const { Server } = require('socket.io');
// 引入Socket.IO处理器
const socketHandler = require('./socketHandler');const server = express();
const httpServer = http.createServer(server);
const IO = new Server(httpServer, { cors: { origin: '*' } }); // 允许所有来源的跨域请求
// 初始化Socket.IO处理
socketHandler(IO);
这里同样也可以写发送消息事件,并且比之前通过HTTP路由的方式更优,获取消息列表推荐使用HTTP路由
2. 通信协议对比
HTTP路由和socketIO对比:
特性 | Socket.io | HTTP路由 |
---|---|---|
延迟 | 毫秒级(长连接即时传输) | 高(每次新建TCP连接) |
效率 | 无HTTP头开销 | 每个请求携带完整HTTP头 |
方向性 | 双向通信 | 单向请求 |
实时反馈 | 可立即收到已送达/已读回执 | 需额外轮询或长连接 |
连接状态 | 持久连接感知在线状态 | 无状态 |
适用场景 | 高频、实时、小数据包交互 | 低频、非实时、大数据传输 |
后端核心模块设计
node后端创建一个socketHandler.js文件,是后端Socket.IO的核心控制器,负责处理WebSocket连接、消息收发、房间管理和用户认证。
- 连接管理
- 使用 socketAuthMiddleware 进行用户认证,验证 JWT token
- 维护 onlineUsers Map 存储用户ID与socket ID的映射关系
- 处理连接断开事件,清理相关资源
- 房间管理
- 通过join-room 事件处理用户加入房间逻辑
- 使用 updateRoomMembers 函数维护房间成员列表
- 存储在 chatRooms Map 中
- 消息处理
- send-message 事件处理消息收发
- 调用 saveMessage 函数将消息存入数据库
- 使用 IO.to(roomId).emit 向房间内所有用户广播消息
- 心跳检测
- 30秒检测一次心跳 ( heartBeat_interval )
- 通过 resetHeartbeat 函数重置心跳计时器
- 超时未收到心跳则断开连接
- 错误处理
- 对关键操作进行 try-catch 错误捕获
- 向客户端发送错误信息
前端实现方案
前端通信流程如下图所示:
Socket工具类
前端创建一个socketUtils.js,这是一个封装了Socket.IO客户端功能的工具类
- 连接管理
- 使用 io() 建立WebSocket连接
- 支持携带token认证
- 配置了重连策略和超时设置
- 心跳机制
- 每20秒发送一次心跳包( heartbeat 事件)
- 监听服务端响应( pong 事件)
- 断开连接时清除定时器
- 事件处理
- 提供 emitEvent 方法发送事件
- 提供 onEvent / offEvent 方法监听/取消事件
- 暴露 **disconnect **方法主动断开连接
- 错误处理
- 服务器主动断开时清除本地token
- 显示断开提示并跳转登录页
该实现采用单例模式导出,确保全局只有一个Socket实例。
此时,在前端文件当中,可以在main.js引入创建的socket实例
import socket from '@/utils/socketUtils';// 初始化socket
app.provide('socket', socket);
通过inject(‘socket’) 获取Socket.IO实例,在业务场景当中主要使用聊天消息收发和房间管理
// 发送消息
socket.emitEvent('send-message', body);
// 接收消息
socket.onEvent('room update message', updateMessage);
// 加入房间
socket.emitEvent('join-room', {roomId, userId});
关键实现细节
- 房间管理优化:采用动态房间加入机制,用户只加入当前活跃的聊天房间,减少不必要的消息广播。
- 消息持久化:在广播消息前先存入数据库,确保消息可靠性,即使连接中断也不会丢失。
- 断线重连:前端配置了5次重试机制,配合后端的在线状态检测,实现无缝重新连接。
- 资源清理:组件卸载时自动取消事件监听,防止内存泄漏。
对于需要实时功能的现代Web应用,Socket.IO仍然是值得推荐的解决方案。后续可以考虑加入消息已读状态、输入指示器等增强功能,进一步提升用户体验。
socketHandler.js实现代码:
const { verifyToken } = require('../services/tokenService');
const messageService = require('../services/messageService'); // 引入服务层
const { handleMessageContent } = require('../utils/dataDeal.js'); // 引入工具函数const onlineUsers = new Map(); // 存储用户ID与socket ID的映射
// 存储聊天群信息
const chatRooms = new Map();const heartBeat_interval = 30000; // 30秒检测一次
let heartbeatTimer;
/*** socket连接*/
const socketHandler = (IO) => {// 连接处理IO.on('connection', (socket) => {// 判断连接的token是否合法,不合法断开连接socketAuthMiddleware(socket);// 监听用户加入聊天室事件socket.on('join-room', async (msg) => {try {console.log('用户加入房间:', msg); // 打印用户加入房间的信息socket.join(msg.roomId); // 加入房间let roomIds = onlineUsers.get(msg.userId)?.roomId; // 获取用户的房间ID列表if (!roomIds.includes(msg.roomId)) {onlineUsers.get(msg.userId)?.roomId.push(msg.roomId); // 将房间ID添加到用户的房间ID列表中}updateRoomMembers(msg.roomId, socket); // 更新房间成员列表} catch (error) { // 捕获异常console.error('加入房间失败:', error); // 打印错误信息}})// 监听客户端消息socket.on('send-message', async (message) => {try {let userId = message.senderId, roomId = message.roomId; // 获取用户IDif (!userId) { // 如果没有用户ID,返回错误信息console.log('没有用户ID,无法发送消息'); // 打印错误信息return; // 返回错误信息}// 在这里处理消息,例如保存到数据库或进行其他操作let saveResult = await saveMessage(message, socket);if (saveResult) { // 保存成功,向房间内的所有用户发送消息// 向房间内的其他用户发送消息IO.to(roomId).emit('room update message', {type: 'ohter_message', // 消息类型userId: userId, // 用户IDmessage: saveResult // 消息内容}); }} catch (error) {console.error('处理消息时出错:', error);let userId = message.senderId; // 获取用户IDconst toSocketId = onlineUsers.get(userId); // 获取用户的socket IDif (toSocketId) { // 如果有socket ID,向用户发送错误信息socket.to(toSocketId).emit('message-error', { message: '处理消息时出错', tempId: message.senderId }); // 向用户发送错误信息}}});// 断开连接处理socket.on('disconnect', () => {for (const [userId, socketId] of onlineUsers.entries()) {if (socketId === socket.id) {onlineUsers.delete(userId);console.log(`用户断开连接: ${userId}`);// 处理用户离开房间的逻辑let roomIds = onlineUsers.get(userId)?.roomId; // 获取用户的房间ID列表if (roomIds) { // 如果有房间ID列表,遍历房间ID列表,离开房间roomIds.forEach(roomId => { // 遍历房间ID列表,离开房间socket.leave(roomId); // 离开房间});updateRoomMembers(roomId, socket); // 更新房间成员列表}break;}}clearInterval(heartbeatTimer); // 清除心跳检测定时器});// 心跳检测const resetHeartbeat = () => {clearTimeout(heartbeatTimer);heartbeatTimer = setTimeout(() => {console.log(`心跳超时,断开连接: ${socket.id}`);socket.disconnect();}, heartBeat_interval);};socket.on('heartbeat', (token) => { // 监听心跳事件socketAuthMiddleware(socket);resetHeartbeat(); // 重置心跳socket.emit('pong');});});
};/*** 更新聊天室内成员列表*/
const updateRoomMembers = (roomId, socket) => { // 传入房间ID和成员列表if (!socket || !socket.adapter) {console.error('无效的socket对象或缺少adapter属性');return;}const room = socket.adapter.rooms.get(roomId);if (room) { // 如果房间存在const members = Array.from(room);console.log(`房间 ${roomId} 成员列表: ${members}`);chatRooms.set(roomId, members); // 更新房间成员列表}
}/*** 验证 token 的中间件* @param {Object} req - 请求对象* @param {Object} res - 响应对象* @param {Function} next - 下一步函数
*/
const socketAuthMiddleware = (socket) => {const token = socket.handshake.auth.token; // 从客户端传递的token中获取if (!token) { // 没有token,不连接socket.disconnect(); // 断开连接console.log('没有token,不连接');return;}try {const user = verifyToken(token.replace('Bearer ', '')); // 验证tokenif (!user) { // 验证失败,断开连接socket.disconnect(); // 断开连接console.log('token验证失败,不连接');return;}// 将用户信息附加到socket对象socket.user = user;onlineUsers.set(user.userid, { socketId: socket.id, roomId: []}); // 存储用户ID与socket ID的映射} catch (error) { // 验证失败,断开连接socket.disconnect(); // 断开连接return;}
}/*** 存储消息到数据库中*/
const saveMessage = async (message) => {try {// 获取请求参数const { senderId, username, content, roomId } = message;// 处理content中的@用户信息,提取出用户名,存入mentions数组中let { mentions, messageType, attachments } = handleMessageContent(content);// 调用服务层方法,创建消息并保存到数据库中,返回保存后的消息对象,包括result, message, data, code等属性let getResult = await messageService.createMessage(senderId, username, content, mentions, messageType, attachments, roomId);return getResult.data;} catch (error) {console.error('保存消息时出错:', error); return null;}
}module.exports = socketHandler;
socketUtils.js实现代码:
/*** socket连接*/
import { io } from 'socket.io-client';class SocketIOService {socket = null; // 存储socket实例constructor() { }setupSocketConnection() {let token = localStorage.getItem('token')if (!token) { // 没有token,不连接console.log('No token found, not connecting to socket')return;}this.socket = io('http://localhost:10086', {auth: {token: token, // 携带认证token},transports: ['websocket'], // 指定传输方式reconnection: true, // 自动重连reconnectionAttempts: 5, // 重连尝试次数reconnectionDelay: 1000, // 重连延迟timeout: 20000 // 连接超时时间})this.socket.on('connect', () => { // 连接成功console.log('Socket connected:', this.socket.id);// 每25秒发送一次心跳this.heartbeatInterval = setInterval(() => {// console.log('发送心跳'); // 打印日志,方便调试this.socket.emit('heartbeat', Date.now());}, 20000);});this.socket.on('pong', () => {console.log('收到服务端心跳响应');});this.socket.on('disconnect', (reason) => {console.log('Socket disconnected:', reason);// 清除心跳定时器clearInterval(this.heartbeatInterval);if (reason === 'io server disconnect') {// 清除无效tokenlocalStorage.removeItem('token');ElMessage.warning('与服务器心跳断开,重新登录');// 跳转到登录页setTimeout(() => {window.location.href = '/';}, 1000);// 服务器主动断开需要手动重连// this.socket.connect();}})}// 发送emitEvent(eventName, data) {if (!this.socket) return; // 如果socket未连接,不发送消息this.socket.emit(eventName, data);}// 监听onEvent(eventName, callback) {if (!this.socket) return; // 如果socket未连接,不监听消息this.socket.on(eventName, callback);}// 取消监听offEvent(eventName, callback) {if (!this.socket) return; // 如果socket未连接,不取消监听消息this.socket.off(eventName, callback);}// 断开连接disconnect() {if (!this.socket) returnthis.socket.disconnect()}
}export default new SocketIOService();