定义
队列 Queue 是一种先进先出的数据结构,所以消费消息时也是按照顺序来消费的
消息队列看作是一个存放消息的容器,需要使用消息的时候,直接从容器中取出消息供自己使用即可
参与消息传递的双方称为 生产者 和 消费者
- 生产者负责发送消息
- 消费者负责处理消息
操作系统中的进程通信的一种很重要的方式就是消息队列
当前我们说的是:指的是各个服务以及系统内部各个组件/模块之前的通信
,属于一种 中间件
中间件
就是一类为应用软件服务的软件
,应用软件是为用户服务的,用户不会接触或者使用到中间件
常见的中间件有
- 消息队列
- RPC 框架
- 分布式组件
- HTTP 服务器
- 任务调度框架
- 配置中心
- 数据库层的分库分表工具
- 数据迁移工具
消息队列可以降低系统耦合性、实现任务异步、有效地进行流量削峰
,是分布式和微服务系统中重要的组件之一
作用
三点好处:
-
异步处理
:将用户请求中包含的耗时操作,通过消息队列实现异步处理,将对应的消息发送到消息队列之后就立即返回结果
,减少响应时间,提高用户体验。随后,系统再对消息进行消费
-
削峰/限流
:先将短时间高并发产生的事务消息存储在消息队列
中,然后后端服务再慢慢根据自己的能力去消费
这些消息,这样就避免直接把后端服务打垮掉
-
降低系统耦合性
:生产者(客户端)发送消息到消息队列中去,消费者(服务端)处理消息,需要消费的系统直接去消息队列取消息进行消费即可而不需要和其他系统有耦合
,这显然也提高了系统的扩展性 -
其他的如下
- 实现分布式事务
- 顺序保证
- 数据流处理
异步处理
异步处理
:将用户请求中包含的耗时操作,通过消息队列实现异步处理,将对应的消息发送到消息队列之后就立即返回结果
,减少响应时间,提高用户体验。随后,系统再对消息进行消费
因为用户请求数据写入消息队列之后就立即返回给用户了,但是请求数据在后续的业务校验、写数据库等操作中可能失败。因此,使用消息队列进行异步处理之后
,需要适当修改业务流程进行配合
比如用户在提交订单之后,
- 订单数据写入消息队列,不能立即返回用户订单提交成功,
- 需要在消息队列的订单消费者进程真正处理完该订单之后,甚至出库后,再通过电子邮件或短信通知用户订单成功,以免交易纠纷。
- 这就类似我们平时手机订火车票和电影票。
削峰/限流
削峰/限流
:先将短时间高并发产生的事务消息存储在消息队列
中,然后后端服务再慢慢根据自己的能力去消费
这些消息,这样就避免直接把后端服务打垮掉
削峰(Peak Shaving)实现方案:
- 异步缓冲机制:
生产者突发高并发请求写入消息队列
,避免直接冲击下游服务,消费者按恒定速率从队列拉取消息
,将“脉冲流量”转为“稳定流” - 队列容量控制:设置队列最大长度(如 RabbitMQ 的 x-max-length),超出时拒绝新消息或转入死信队列
限流分为:
- 生产者限流
- 消费者限流
降低系统耦合性
降低系统耦合性
:生产者(客户端)发送消息到消息队列中去,消费者(服务端)处理消息,需要消费的系统直接去消息队列取消息进行消费即可而不需要和其他系统有耦合
,这显然也提高了系统的扩展性
消息队列使用发布-订阅模式工作,消息发送者(生产者)发布消息,一个或多个消息接受者(消费者)订阅消息
消息发送者(生产者)和消息接受者(消费者)之间没有直接耦合
对新增业务,只要对该类消息感兴趣,即可订阅该消息
,对原有系统和业务没有任何影响,从而实现网站业务的可扩展性设计
消息队列不只能利用发布-订阅模式工作,只是在解耦这个特定业务环境下是使用发布-订阅模式的
。
除了发布-订阅模式,还有点对点订阅模式(一个消息只有一个消费者)
,我们比较常用的是发布-订阅模式。
这两种消息模型是 JMS 提供的,AMQP 协议还提供了另外 5 种消息模型
实现分布式事务
事务允许事件流应用将消费,处理,生产消息整个过程定义为一个原子操作
核心实现原理:事务消息机制
半消息(Prepared Message)
:生产者发送消息到消息队列,但该消息对消费者不可见
(处于"待确认"状态),避免消费者过早处理未完成的事务执行本地事务
:生产者执行本地业务操作(如订单创建),并根据结果决定提交或回滚事务消息
-
提交
:消息变为可见状态,供消费者消费。 回滚
:消息直接被丢弃
-
事务状态回查:若生产者未明确提交/回滚(如宕机),消息队列会主动回查生产者本地事务状态,确保事务完整性
典型流程(以订单+积分场景为例)
顺序保证
在很多应用场景中,处理数据的顺序至关重要。
消息队列保证数据按照特定的顺序被处理
,适用于那些对数据顺序有严格要求的场景。
大部分消息队列,例如 RocketMQ、RabbitMQ、Pulsar、Kafka,都支持顺序消息。
消息队列主要通过队列机制、分区策略和消费端控制实现消息顺序保证
,核心是确保同一业务标识的消息在同一个处理通道内串行化
顺序性保证本质是牺牲并发度换取业务一致性
。主流方案均采用 “分区有序”(同一 Key 同队列同线程
),避免全局有序的性能瓶颈
延时/定时处理
消息发送后不会立即被消费,而是指定一个时间,到时间后再消费
。
大部分消息队列,例如 RocketMQ、RabbitMQ、Pulsar,都支持定时/延时消息。
即时通讯
消息队列实现即时通讯(IM)主要通过异步消息传递协议和实时推送机制实现,核心依赖长连接、消息路由和状态管理
MQTT(消息队列遥测传输协议)
是一种轻量级的通讯协议,采用发布/订阅模式,非常适合于物联网(IoT)等需要在低带宽、高延迟或不可靠网络环境下工作的应用
。
它支持即时消息传递,即使在网络条件较差的情况下也能保持通信的稳定性
。
RabbitMQ 内置了 MQTT 插件用于实现 MQTT 功能(默认不启用,需要手动开
典型工作流程(以私聊为例)
数据流处理
针对分布式系统产生的海量数据流,如业务日志、监控数据、用户行为等,消息队列可以实时或批量收集这些数据,并将其导入到大数据处理引擎中,实现高效的数据流管理和处理
。
问题注意
系统可用性降低
:在加入 MQ 之前,你不用考虑消息丢失或者说 MQ 挂掉等等
的情况,但是,引入 MQ 之后你就需要去考虑
系统复杂性提高
: 加入 MQ 之后,需要保证消息没有被重复消费、处理消息丢失的情况、保证消息传递的顺序性等等问题
一致性问题
: 消息队列可以实现异步
,消息队列带来的异步确实可以提高系统响应速度。但是,万一消息的真正消费者并没有正确消费消息就会导致数据不一致的情况
JMS 规范
基本概念
JMS(JAVA Message Service,java 消息服务)是 Java 的消息服务,JMS 的客户端之间可以通过 JMS 服务进行异步的消息传输
JMS 是一个消息服务的标准或者说是规范,允许应用程序组件基于 JavaEE 平台创建、发送、接收和读取消息
支持消息类型
JMS 定义了五种不同的消息正文格式以及调用的消息类型
,允许发送并接收以一些不同形式的数据:
- StreamMessage:Java 原始值的数据流
- MapMessage:一套名称-值对
- TextMessage:一个字符串对象
- ObjectMessage:一个序列化的 Java 对象
- BytesMessage:一个字节的数据流
JMS 两种消息模型
点到点(P2P)模型
使用队列(Queue)作为消息通信载体
;满足生产者与消费者模式,一条消息只能被一个消费者使用
,未被消费的消息在队列中保留直到被消费或超时
发布/订阅(Pub/Sub)模型
发布订阅模型(Pub/Sub) 使用主题(Topic)作为消息通信载体
,类似于广播模式;发布者发布一条消息,该消息通过主题传递给所有的订阅者
AMQP协议
AMQP,即 Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准 高级消息队列协议
(二进制应用层协议),是应用层协议
的一个开放标准,为面向消息的中间件设计,兼容 JMS
基于此协议
的客户端与消息中间件可传递消息,并不受客户端/中间件同产品,不同的开发语言等条件的限制。
RabbitMQ 就是基于 AMQP 协议实现的
JMS vs AMQP
AMQP 为消息定义了线路层(wire-level protocol)的协议
,JMS 定义的是 API 规范
。
在 Java 体系中,多个 client 均可以通过 JMS 进行交互,不需要应用修改代码,但是其对跨平台的支持较差。
而 AMQP 天然具有跨平台、跨语言特性
JMS 支持 TextMessage、MapMessage 等复杂的消息类型
;而 AMQP 仅支持 byte[] 消息类型
(复杂的类型可序列化后发送)
由于 Exchange 提供的路由算法,AMQP 可以提供多样化的路由方式
来传递消息到消息队列,而 JMS 仅支持 队列 和 主题/订阅
方式两种
对比方向 | JMS | AMQP |
---|---|---|
定义 | Java API | 协议 |
跨语言 | 否 | 是 |
跨平台 | 否 | 是 |
支持消息模型 | 提供两种消息模型:①Peer-2-Peer;②Pub/sub | 提供了五种消息模型:①direct exchange;②fanout exchange;③topic change;④headers exchange;⑤system exchange 。本质来讲,后四种和 JMS 的 pub/sub 模型没有太大差别,仅是在路由机制上做了更详细的划分; |
支持消息类型 | 支持多种消息类型 | byte[](二进制) |
RPC 和消息队列的区别
RPC 和消息队列本质上是网络通讯的两种不同的实现机制,两者的用途不同
从用途
来看:
RPC 主要用来解决两个服务的远程通信问题
,不需要了解底层网络的通信机制。通过 RPC 可以帮助我们调用远程计算机上某个服务的方法,这个过程就像调用本地方法一样简单。消息队列主要用来降低系统耦合性、实现任务异步、有效地进行流量削峰。
从通信方式
来看:
RPC 是双向直接网络通讯
消息队列是单向引入
中间载体的网络通讯。
从架构上来看:
消息队列需要把消息存储
起来RPC 不需要存储消息
,因为前面也说了 RPC 是双向直接网络通讯。
从请求处理的时效性
来看:
- 通过
RPC 发出的调用一般会立即被处理
, - 存放在
消息队列中的消息并不一定会立即
被处理。