一、客户端使用MQ基本代码示例

1、添加maven依赖

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>5.3.0</version>
</dependency>

2、生产者代码示例

public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {//初始化一个消息生产者DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");// 指定nameserver地址producer.setNamesrvAddr("192.168.65.112:9876");// 启动消息生产者服务producer.start();for (int i = 0; i < 2; i++) {try {// 创建消息。消息由Topic,Tag和body三个属性组成,其中Body就是消息内容Message msg = new Message("TopicTest","TagA",("Hello RocketMQ " +i).getBytes(RemotingHelper.DEFAULT_CHARSET));//发送消息,获取发送结果SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);} catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}}//消息发送完后,停止消息生产者服务。producer.shutdown();}
}

3、消费者代码示例

public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {//构建一个消息消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");//指定nameserver地址consumer.setNamesrvAddr("192.168.65.112:9876");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);// 订阅一个感兴趣的话题,这个话题需要与消息的topic一致consumer.subscribe("TopicTest", "*");// 注册一个消息回调函数,消费到消息后就会触发回调。consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {msgs.forEach(messageExt -> {try {System.out.println("收到消息:"+new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET));} catch (UnsupportedEncodingException e) {}});return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者服务consumer.start();System.out.print("Consumer Started");}
}

4、代码逻辑解读

生产者:

1. 创建消息生产者producer,并指定生产者组名
2. 指定Nameserver地址
3. 启动producer。可以认为这是消息生产者与服务端建立连接的过程。
4. 创建消息对象,指定Topic、Tag和消息体
5. 发送消息
6. 关闭生产者producer,释放资源。

消费者:

1. 创建消费者Consumer,必须指定消费者组名
2. 指定Nameserver地址
3. 订阅主题Topic和Tag
4. 设置回调函数,处理消息
5. 启动消费者consumer。消费者会一直挂起,持续处理消息。

二、消息确认机制

1、生产者确认机制

        生产者发送消息的方式有三种:

(1)单向发送:消息生产者只管往Broker发送消息,而全然不关心Broker端有没有成功接收到消息。

public class OnewayProducer {public static void main(String[] args)throws Exception{DefaultMQProducer producer = new DefaultMQProducer("producerGroup");producer.start();Message message = new Message("Order","tag","order info : orderId = xxx".getBytes(StandardCharsets.UTF_8));producer.sendOneway(message);Thread.sleep(50000);producer.shutdown();}
}

        sendOneway方法没有返回值,如果发送失败,生产者无法补救。

        单向发送有一个好处,就是发送消息的效率更高。适用于一些追求消息发送效率,而允许消息丢失的业务场景。比如日志。

(2)同步发送:消息生产者在往Broker端发送消息后,会阻塞当前线程,等待Broker端的相应结果。

 SendResult sendResult = producer.send(msg);

        SendResult来自于Broker的反馈。producer在send发出消息,到Broker返回SendResult的过程中,无法做其他的事情。在SendResult中有一个SendStatus属性,这个SendStatus是一个枚举类型,其中包含了Broker端的各种情况。

public enum SendStatus {SEND_OK,FLUSH_DISK_TIMEOUT,FLUSH_SLAVE_TIMEOUT,SLAVE_NOT_AVAILABLE,
}

        在这几种枚举值中,SEND_OK表示消息已经成功发送到Broker上。至于其他几种枚举值,都是表示消息在Broker端处理失败了。使用同步发送的机制,我们就可以在消息生产者发送完消息后,对发送失败的消息进行补救。例如重新发送。

        但是此时要注意,如果Broker端返回的SendStatus不是SEND_OK,也并不表示消息就一定不会推送给下游的消费者。仅仅只是表示Broker端并没有完全正确的处理这些消息。因此,如果要重新发送消息,最好要带上唯一的系统标识,这样在消费者端,才能自行做幂等判断。也就是用具有业务含义的OrderID这样的字段来判断消息有没有被重复处理。

        这种同步发送的机制能够很大程度上保证消息发送的安全性。但是,这种同步发送机制的发送效率比较低。毕竟,send方法需要消息在生产者和Broker之间传输一个来回后才能结束。如果网速比较慢,同步发送的耗时就会很长。

(3)异步发送:生产者在向Broker发送消息时,会同时注册一个回调函数。接下来生产者并不等待Broker的响应。当Broker端有响应数据过来时,自动触发回调函数进行对应的处理。

	producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {countDownLatch.countDown();System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());}@Overridepublic void onException(Throwable e) {countDownLatch.countDown();System.out.printf("%-10d Exception %s %n", index, e);e.printStackTrace();}});

        在SendCallback接口中有两个方法,onSuccess和onException。当Broker端返回消息处理成功的响应信息SendResult时,就会调用onSuccess方法。当Broker端处理消息超时或者失败时,就会调用onExcetion方法,生产者就可以在onException方法中进行补救措施。

        此时同样有几个问题需要注意。一是与同步发送机制类似,触发了SendCallback的onException方法同样并不一定就表示消息不会向消费者推送,例如:如果Broker端返回响应信息太慢,超过了超时时间,也会触发onException方法。二是在SendCallback的对应方法被触发之前,生产者不能调用shutdown()方法。如果消息处理完之前,生产者线程就关闭了,生产者的SendCallback对应方法就不会触发。这是因为使用异步发送机制后,生产者虽然不用阻塞下来等待Broker端响应,但是SendCallback还是需要附属于生产者的主线程才能执行。

2、消费者确认机制

        消费者收到消息后,向 Broker 响应消息来进行确认。

consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});

        这个返回值是一个枚举值,有两个选项 CONSUME_SUCCESS和RECONSUME_LATER。如果消费者返回CONSUME_SUCCESS,那么消息自然就处理结束了。但是如果消费者没有处理成功,返回的是RECONSUME_LATER,Broker就会过一段时间再发起消息重试。

        为了兼顾重试机制的成功率和性能,RocketMQ设计了一套非常完善的消息重试机制:

(1)失败重试

        当消费者没能正常消费消息时,Broker 会进行消息重试,也就是将消息移到重试Topic中。

        为了让这些重试的消息不会影响Topic下其他正常的消息,Broker会给每个消费者组设计对应的重试Topic,名称为 %RETRY%+ConsumeGroup。这是因为,MessageQueue是一个具有严格FIFO特性的数据结构,如果需要重试的这些消息还是放在原来的MessageQueue中,就会对当前MessageQueue产生阻塞,让其他正常的消息无法处理。

        RocketMQ默认的最大重试次数是16次。重试的间隔时间如下图所示,是延迟消息的后16个延迟级别。

        如果我们修改了重试次数为20次,那么超过16次后每次重试间隔为2小时。同一个消费者组中,如果多个消费者都设置了重试次数,那么后设置的会覆盖先设置的。

(2)死信队列

        Broker不可能无限制的向消费失败的消费者推送消息,当超过最大重试次数后,消息会移到死信队列,它相当于windows当中的回收站。我们可以人工介入对死信队列中的消息进行补救,也可以直接彻底删除这些消息。

        死信队列的 topic 名称为 %DLQ%+ConsumGroup,一个消费者组只有一个死信队列,且只有死信消息产生时,才会生成死信队列。

        当我们对死信队列中的消息进行补救时,通常会创建一个新的消费者组获取死信队列中的消息,对消息内容进行修正后,重新发送到正常的 topic 中。

        需要注意的是,死信队列被创建出来后,它的权限 perm 被设置为 2(2:禁读,4:禁写,6:可读可写),所以它里面的消息是无法读取的。在补救前,需要将死信队列的权限修改为 6。

        死信队列的有效期跟正常消息相同,默认3天,对应broker.conf中的fileReservedTime属性。超过这个最长时间的消息都会被删除,而不管消息是否消费过。

(3)尽量保证同一消费者组具有相同的逻辑

        RocketMQ中设定的消费者组都是订阅主题和消费逻辑相同的服务备份,所以当消息重试时,Broker会往消费者组中任意一个实例推送。因此,我们在编码时,尽量要保证一个消费者组处理业务的逻辑相同。

(4)消费逻辑尽量避免异步

        Broker端最终只通过消费者组返回的状态来确定消息有没有处理成功。至于消费者组自己的业务执行是否正常,Broker端是没有办法知道的。因此,在实现消费者的业务逻辑时,应该要尽量使用同步实现方式,保证在自己业务处理完成之后再向Broker端返回状态。

3、幂等性保证

        在 MQ 系统中,幂等性有三种实现语义:

1、at most once:每条消息最多被消费一次。对于 at most once,生产者使用 sendOneWay 发送消息即可。

2、at least once:每条消息至少被消费一次。对于 at least once,利用生产者和消费者的消息确认机制,即可确保消息成功发送和接收。

3、exactly once:每条消息正好被消费一次。对于 exactly once,难以通过 MQ 本身直接实现。通常的方法是利用消息确认机制确保 at least once,再通过对消息设置业务主键进行消息去重来确保 at most once,两者组合实现 exactly once。

        如何使用消息的业务主键去重呢?

        当消息发送到 RocketMQ 时,RocketMQ 会为消息生成唯一的 msgId,该 msgId 在消息重复生产和消费时都不会发生改变,通常可用于区分每条消息。但这个 msgId 并不能完全确保全局唯一,在对幂等性要求严格的场景,可以在发送消息时设置全局唯一的 message key,并在获取消息时根据 message key 来去重(MQ 会对 message key 进行索引,我们除了可以使用 message key 保证幂等性,还能用它来快速查找消息)。

        什么时候会出现消息重复呢?

1、发送时重复:生产者客户端已成功发送消息且消息已在服务端持久化,但由于网络阻塞或客户端宕机,导致服务端向客户端应答失败。故障恢复后,客户端由于未收到应答,会认为消息发送失败而重新发送,服务端就会存在两条内容相同并且 msgId 也相同的消息。

2、接收时重复:消费者客户端已成功收到消息并完成业务处理,但由于网络阻塞或客户端宕机,导致客户端向服务端应答失败。故障恢复后,服务端由于未收到应带,会认为消息投递失败而重新投递,客户端就会收到两条内容相同并且 msgId 也相同的消息。

3、Rebalance 导致重复:当 Broker 或消费者出现重启、扩容和缩容时,会触发 Rebalance,此时可能导致消息重复。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/91216.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/91216.shtml
英文地址,请注明出处:http://en.pswp.cn/bicheng/91216.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[leetcode] 组合总和

39. 组合总和 - 力扣&#xff08;LeetCode&#xff09; i class Solution {int aim;vector<vector<int>> ret;vector<int> path; public:vector<vector<int>> combinationSum(vector<int>& nums, int target) {aim target;dfs(nums…

新能源行业B端极简设计:碳中和目标下的交互轻量化实践

新能源行业B端极简设计&#xff1a;碳中和目标下的交互轻量化实践内容摘要在新能源行业&#xff0c;碳中和目标正推动着企业追求更高的运营效率和更低的资源消耗。然而&#xff0c;传统的B端交互设计往往复杂繁琐&#xff0c;不仅增加了用户的操作成本&#xff0c;还可能导致资…

减速机:自动化生产线的“精密传动心脏”

减速机作为自动化生产线的核心传动部件&#xff0c;通过调节转速与扭矩实现设备精准控制&#xff0c;其在自动化生产线中发挥着关键作用。以下是其具体应用方式&#xff1a;输送线驱动在自动化生产线中&#xff0c;输送线用于运输物料、半成品或成品&#xff0c;通过减速机可以…

从0到1学PHP(五):PHP 数组:高效存储与处理数据

目录一、数组的定义与分类1.1 索引数组1.2 关联数组1.3 多维数组二、数组的基本操作2.1 数组元素的添加、删除、修改和访问2.2 数组指针的操作三、数组处理函数3.1 数组排序函数3.2 数组统计函数3.3 数组过滤与转换函数一、数组的定义与分类 在 PHP 中&#xff0c;数组是一种非…

vscode 字体的跟换

打开vscode 左下角输入电脑中已经有的字体&#xff1a;有想要用的可以自己进行安装刷新这样就可改变了

墨者:SQL过滤字符后手工注入漏洞测试(第3题)

1. 墨者学院&#xff1a;SQL过滤字符后手工注入漏洞测试(第3题)&#x1f680; 因为练习过太多的sql注入&#xff0c;废话不多介绍&#xff0c;我会通过围绕手动注入和工具爆破的方式达到靶场目标&#xff0c;开练&#xff01;&#xff01;&#xff01; 2. 手工注入方式&#x1…

【Spring AI实战】实现仿DeepSeek页面对话机器人(支持多模态上传)

一、前言 二、实现效果 三、代码实现 3.1 后端代码 3.2 前端代码 一、前言 Spring AI详解&#xff1a;【Spring AI详解】开启Java生态的智能应用开发新时代(附不同功能的Spring AI实战项目)-CSDN博客 二、实现效果 可上传图片或音频数据给大模型分析 三、代码实现 3.1 后…

Vue 正在热映模块

Vue 渐进式JavaScript 框架 基于Vue2的移动端项目&#xff1a;正在热映模块 目录 正在热映 数据修改 导入axios 配置反向代理 正在热映渲染 赋值数据 渲染列表 显示图片 优化列表 设置列表样式 主演 定义过滤器 使用过滤器 主演过长处理 无主演情况处理 观众评…

阿里云上进行k8s集群的配置

在阿里云容器服务Kubernetes&#xff08;ACK&#xff09;中配置集群的核心步骤可分为以下六大关键环节&#xff0c;涵盖架构设计到运维管理&#xff1a;1. 集群规划与基础配置 集群类型选择 托管版&#xff1a;Master节点由阿里云托管&#xff08;推荐生产环境&#xff09;专有…

页面性能优化

优化点解决方案效果双向绑定数量过多竞对设置单元格内部涉及双向绑定的输入组件过多&#xff0c;线上页面最多有88个该和抽屉中的编辑表格一样的组件&#xff0c;共计930个&#xff08;按每行最少6个来计算的&#xff09;双向绑定的组件&#xff0c;严重拖累页面性能。数据计算…

详细说明零拷贝

详细说明零拷贝【一】零拷贝介绍【1】说明【2】为什么需要零拷贝&#xff1f;—— 传统数据传输的问题【3】零拷贝的核心优化【4】零拷贝的实现方式&#xff08;1&#xff09;mmap&#xff08;内存映射&#xff09;&#xff08;2&#xff09;sendfile&#xff08;Linux 系统调用…

docker部署自己写的c++http服务器教程

我用的是ubuntu 22.04环境下 qt c 写的应用程序&#xff0c;是终端程序&#xff0c;不是界面&#xff0c;然后用linuxdeployqt工具将其打包成了AppImage可执行文件&#xff0c;以上是部署前的准备工作&#xff0c;需要确保AppImage可执行文件在自己的ubuntu上可以运行才能执行以…

Caffeine 缓存库的常用功能使用介绍

&#x1f9d1; 博主简介&#xff1a;CSDN博客专家&#xff0c;历代文学网&#xff08;PC端可以访问&#xff1a;https://literature.sinhy.com/#/?__c1000&#xff0c;移动端可微信小程序搜索“历代文学”&#xff09;总架构师&#xff0c;15年工作经验&#xff0c;精通Java编…

C# _列表(List<T>)_ 字典(Dictionary<TKey, TValue>)

目录 列表&#xff08;List&#xff09;特点 创建列表 RemoveAll 删除与之条件相匹配的数据 会返回删除的个数 Capacity 获取或设置列表的容量 更多方法可参照上篇文章&#xff1a;C#_ArrayList动态数组 字典&#xff08;Dictionary&#xff09;特点 定义一个字典 向字…

【实时Linux实战系列】实时网络控制与调度

在实时控制系统中&#xff0c;网络调度是确保实时数据流传输和处理不受延迟影响的关键。实时网络控制与调度技术对于工业自动化、金融交易、多媒体流等领域至关重要。通过合理设计网络调度策略&#xff0c;可以显著提高系统的实时性和可靠性。本文将介绍如何在实时控制系统中实…

Qwen3-Coder:介绍及使用 -- 超强AI编程助手

更多内容&#xff1a;XiaoJ的知识星球 目录一、Qwen3-Coder模型介绍1.预训练阶段&#xff08;Pre-Training&#xff09;2.后训练阶段&#xff08;Post-Training&#xff09;1&#xff09;Scaling Code RL: Hard to Solve, Easy to Verify2&#xff09;Scaling Long-Horizon RL二…

uniapp 如果进入页面输入框自动聚焦,此时快速返回页面或者跳转到下一个页面,输入法顶上来的页面出现半屏的黑屏问题。

如果进入页面输入框自动聚焦&#xff0c;此时快速返回页面或者跳转到下一个页面&#xff0c;输入法顶上来的页面出现半屏的黑屏问题。输入法出来后&#xff0c;设置了自动将页面顶上来的配置&#xff1a;pages.json"softinputMode": "adjustResize""g…

深入了解 Kubernetes(k8s):从概念到实践

目录 一、k8s 核心概念 二、k8s 的优势 三、k8s 架构组件 控制平面组件 节点组件 四、k8s docker 运行前后端分离项目的例子 1. 准备前端项目 2. 准备后端项目 3. 创建 k8s 部署配置文件 4. 部署应用到 k8s 集群 在当今云计算和容器化技术飞速发展的时代&#xff0c…

Android User版本默认用test-keys,如何改用release-keys

Android User版本 默认用test-keys&#xff0c; 如何改用release-keys 开发云 - 一站式云服务平台 --- build/core/Makefile | 5 1 file changed, 5 insertions() diff --git a/build/core/Makefile b/build/core/Makefile index --- a/build/core/Makefile b/build/core…

从零开始学习Dify-数据库数据可视化(五)

概述上一篇文章我们围绕 Excel 文件展开数据可视化教学&#xff0c;逐步掌握了数据导入、图表构建和 AI 智能分析。在实际业务环境中&#xff0c;很多数据并不是保存在表格中&#xff0c;而是存储于数据库系统中&#xff0c;尤其是最常见的 MySQL。本篇作为本系列的第五篇&…