前言

在 Apache RocketMQ 的消息消费体系中,RocketMQ 提供了DefaultMQPushConsumer(推送消费)和DefaultMQPullConsumer(拉取消费)两种主要消费方式。DefaultMQPushConsumer与DefaultMQPullConsumer在消息获取方式,消息处理逻辑,负载均衡与消费管理都有着不同的处理逻辑,这篇博文主要进行分析DefaultMQPushConsumer。

一、DefaultMQPushConsumerImpl 的主要属性

/*** consumer重平衡的组件*/private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this);/*** 过滤消息的钩子*/private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();/*** 消费者启动的时间戳*/private final long consumerStartTimestamp = System.currentTimeMillis();/*** 消费消息的钩子list*/private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();/*** rpc的钩子*/private final RPCHook rpcHook;/*** 服务状态*/private volatile ServiceState serviceState = ServiceState.CREATE_JUST;/*** 网络客户端通信*/private MQClientInstance mQClientFactory;/*** 消息拉取的Api*/private PullAPIWrapper pullAPIWrapper;/*** 是否暂停拉取消息的标识*/private volatile boolean pause = false;/*** 是否顺序消费的标识*/private boolean consumeOrderly = false;/*** 消息处理监听器*/private MessageListener messageListenerInner;/*** 消费偏移量的存储组件*/private OffsetStore offsetStore;/*** 消费消息的服务*/private ConsumeMessageService consumeMessageService;/*** 队列流量控制次数*/private long queueFlowControlTimes = 0;/*** 队列最大跨度流量控制次数*/private long queueMaxSpanFlowControlTimes = 0;

其中比较重要的属性位:

  • pullAPIWrapper:负责与Broker进行消息拉取的交互,封装了底层的网络请求与响应处理。

  • rebalanceImpl:实现消费者的负载均衡逻辑,动态分配消息队列给消费者实例

  • offsetStore:负责消费者消费进度(偏移量)的存储与管理,确保消息不重复消费、不丢失

  • consumeMessageService:管理消息消费的线程池,执行具体的消息消费任务

二、DefaultMQPushConsumerImpl 启动方法

//启动方法public synchronized void start() throws MQClientException {switch (this.serviceState) {case CREATE_JUST:log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());this.serviceState = ServiceState.START_FAILED;// 检查配置合法性this.checkConfig();// 复制订阅信息this.copySubscription();if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {this.defaultMQPushConsumer.changeInstanceNameToPID();}// 获取或创建MQClientInstancethis.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);this.pullAPIWrapper = new PullAPIWrapper(mQClientFactory,this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);if (this.defaultMQPushConsumer.getOffsetStore() != null) {this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();} else {switch (this.defaultMQPushConsumer.getMessageModel()) {case BROADCASTING:this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;case CLUSTERING:this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;default:break;}this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);}//从磁盘加载数据this.offsetStore.load();if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {this.consumeOrderly = true;this.consumeMessageService =new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {this.consumeOrderly = false;this.consumeMessageService =new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());}this.consumeMessageService.start();boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);if (!registerOK) {this.serviceState = ServiceState.CREATE_JUST;this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);}mQClientFactory.start();log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());this.serviceState = ServiceState.RUNNING;break;case RUNNING:case START_FAILED:case SHUTDOWN_ALREADY:throw new MQClientException("The PushConsumer service state not OK, maybe started once, "+ this.serviceState+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),null);default:break;}this.updateTopicSubscribeInfoWhenSubscriptionChanged();this.mQClientFactory.checkClientInBroker();//向broker发送心跳this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();//立即进行rebalancethis.mQClientFactory.rebalanceImmediately();}

从DefaultMQPushConsumerImpl的start方法中可以看出来主要作用为:

1. 配置校验与状态管理

  • 配置合法性检查:验证消费者组名、NameServer 地址、订阅关系等核心配置是否正确,确保后续操作的基础条件满足。

  • 状态机控制:通过状态枚举(ServiceState)确保消费者只能从初始状态(CREATE_JUST)启动,防止重复启动或非法状态转换。

2. 核心组件初始化

  • 复制订阅信息:将用户配置的主题(Topic)和过滤表达式(Tag)复制到内部数据结构,用于后续消息过滤。

  • 获取或创建 MQClientInstance:通过单例模式获取或创建与 RocketMQ 集群通信的客户端实例,该实例负责管理网络连接、心跳机制和请求路由。

  • 注册消费者:将当前消费者注册到 MQClientInstance 的消费者注册表中,便于统一管理和协调。

3. 启动核心服务

  • 负载均衡服务:初始化并启动RebalanceImpl,定期(默认 20 秒)执行负载均衡算法,动态分配消息队列给消费者实例,确保集群内负载均匀。

  • 消息拉取服务:启动PullAPIWrapper,初始化拉取线程池,为后续从 Broker 拉取消息做准备。

  • 消息消费服务:根据消费模式(并发 / 顺序)启动对应的ConsumeMessageService,初始化消费线程池,处理拉取到的消息。

4. 状态更新与资源准备

  • 更新服务状态:将消费者状态从START_FAILED切换为RUNNING,标志启动成功。

  • 订阅信息同步:向 NameServer 同步订阅信息,确保 Broker 知晓消费者的订阅关系。

三、消息拉取PullAPIWrapper

消息拉取由pullAPIWrapper主导。pullKernelImpl方法构建请求并发送到Broker,Broker在无消息时挂起请求,超时后返回结果。消费者接收到响应后,解析PullResult。

主要属性

 /*** 网路通信组件*/private final MQClientInstance mQClientFactory;/*** 消费组*/private final String consumerGroup;/*** 单元*/private final boolean unitMode;/*** 消息队列和broker机器的映射关系*/private ConcurrentMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable =new ConcurrentHashMap<MessageQueue, AtomicLong>(32);/*** 是否连接broker*/private volatile boolean connectBrokerByUser = false;private volatile long defaultBrokerId = MixAll.MASTER_ID;private Random random = new Random(System.currentTimeMillis());/*** 过滤消息的钩子*/private ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();

拉取方法

 /*** 拉取消息的核心方法* @param mq 从哪个MessageQueue中拉取消息* @param subExpression 子表达式* @param expressionType 表达式类型* @param subVersion 子版本号* @param offset 拉取偏移量* @param maxNums 拉取的最大数量* @param sysFlag 系统标志* @param commitOffset 我们已经拉取处理完毕的数据偏移量 做一个提交* @param brokerSuspendMaxTimeMillis broker挂起最大的时间戳* @param timeoutMillis 拉取消息的超时时间* @param communicationMode 通信模式* @param pullCallback 回调的接口* @return* @throws MQClientException* @throws RemotingException* @throws MQBrokerException* @throws InterruptedException*/public PullResult pullKernelImpl(final MessageQueue mq,final String subExpression,final String expressionType,final long subVersion,final long offset,final int maxNums,final int sysFlag,final long commitOffset,final long brokerSuspendMaxTimeMillis,final long timeoutMillis,final CommunicationMode communicationMode,final PullCallback pullCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {//根据messageQueue获取broker地址 可以针对针对这个MessageQueue重新计算FindBrokerResult findBrokerResult =this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),this.recalculatePullFromWhichNode(mq), false);//未找到broker 从nameServer中进行更新数据 然后进行再次寻找if (null == findBrokerResult) {this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());findBrokerResult =this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),this.recalculatePullFromWhichNode(mq), false);}if (findBrokerResult != null) {{// check versionif (!ExpressionType.isTagType(expressionType)&& findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {throw new MQClientException("The broker[" + mq.getBrokerName() + ", "+ findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);}}int sysFlagInner = sysFlag;if (findBrokerResult.isSlave()) {sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);}//构建拉取消息的请求头PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();requestHeader.setConsumerGroup(this.consumerGroup);requestHeader.setTopic(mq.getTopic());requestHeader.setQueueId(mq.getQueueId());requestHeader.setQueueOffset(offset);requestHeader.setMaxMsgNums(maxNums);requestHeader.setSysFlag(sysFlagInner);requestHeader.setCommitOffset(commitOffset);requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);requestHeader.setSubscription(subExpression);requestHeader.setSubVersion(subVersion);requestHeader.setExpressionType(expressionType);requestHeader.setBname(mq.getBrokerName());String brokerAddr = findBrokerResult.getBrokerAddr();if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {brokerAddr = computePullFromWhichFilterServer(mq.getTopic(), brokerAddr);}//拉取消息PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(brokerAddr,requestHeader,timeoutMillis,communicationMode,pullCallback);return pullResult;}throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);}

四、RebalanceImpl负载均衡

RebalanceImpl#rebalanceByTopic 是 RocketMQ 实现消费者负载均衡的核心方法,其主要作用是根据当前集群中消费者实例和消息队列的状态,动态分配每个消费者应负责消费的队列。这个过程确保了消息消费的均匀性和高可用性,避免了部分消费者过载而其他消费者空闲的情况。

 private void rebalanceByTopic(final String topic, final boolean isOrder) {switch (messageModel) {case BROADCASTING: {Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);if (mqSet != null) {boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);if (changed) {this.messageQueueChanged(topic, mqSet, mqSet);log.info("messageQueueChanged {} {} {} {}",consumerGroup,topic,mqSet,mqSet);}} else {log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);}break;}case CLUSTERING: {//订阅的topic 有哪些queuesSet<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);//查询consumer分组中有哪些consumer实例List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);if (null == mqSet) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);}}if (null == cidAll) {log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);}if (mqSet != null && cidAll != null) {List<MessageQueue> mqAll = new ArrayList<MessageQueue>();mqAll.addAll(mqSet);Collections.sort(mqAll);Collections.sort(cidAll);//获取配置的负载均衡策略(默认:平均分配)AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;List<MessageQueue> allocateResult = null;try {//执行负载均衡算法,计算当前消费者应分配的队列allocateResult = strategy.allocate(this.consumerGroup,this.mQClientFactory.getClientId(),mqAll,cidAll);} catch (Throwable e) {log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),e);return;}Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();if (allocateResult != null) {allocateResultSet.addAll(allocateResult);}//更新consumer要进行处理的消息队列boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);if (changed) {log.info("rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),allocateResultSet.size(), allocateResultSet);this.messageQueueChanged(topic, mqSet, allocateResultSet);}}break;}default:break;}}

 核心功能概述

rebalanceByTopic 方法会针对指定主题(Topic)执行以下操作:

  • 获取当前消费组内所有活跃的消费者实例列表

  • 获取该主题下的所有消息队列(MessageQueue)

  • 根据负载均衡策略(如平均分配、轮询等)计算每个消费者应分配的队列

  • 更新本地队列分配信息,并触发消息拉取或停止不必要的拉取任务

负载均衡会在以下情况触发

  • 消费者启动时

  • 消费者实例数量变化(新实例加入或旧实例退出)

  • 主题的队列数量变化(如 Broker 动态调整队列数)

  • 定时任务(默认每 20 秒执行一次)

五、总结

通过对DefaultMQPushConsumerImpl了解,我们清晰地看到了 RocketMQ 消息推送消费的完整实现逻辑。从类结构设计到各个核心流程的源码细节,每一部分都紧密协作,共同保障了消息消费的高效性和可靠性。理解这些源码,开发者能够更好地进行性能优化、问题排查和功能扩展,让 RocketMQ 在实际项目中发挥更大价值。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/82894.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/82894.shtml
英文地址,请注明出处:http://en.pswp.cn/web/82894.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux编程:2、进程基础知识

一、进程基本概念 1、进程与程序的区别 程序&#xff1a;静态的可执行文件&#xff08;如电脑中的vs2022安装程序&#xff09;。进程&#xff1a;程序的动态执行过程&#xff08;如启动后的vs2022实例&#xff09;&#xff0c;是操作系统分配资源的单位&#xff08;如 CPU 时…

React Router 中 navigate 后浏览器返回按钮不起作用的问题记录

React Router 中 navigate 后浏览器返回按钮不起作用的问题记录 在使用 React Router&#xff08;v6&#xff09;开发项目时&#xff0c;我遇到了一个让人困惑的问题&#xff1a; 当我从 /article 页面使用 navigate("/article/next") 进行跳转后&#xff0c;点击浏…

[面试精选] 0094. 二叉树的中序遍历

文章目录 1. 题目链接2. 题目描述3. 题目示例4. 解题思路5. 题解代码6. 复杂度分析 1. 题目链接 94. 二叉树的中序遍历 - 力扣&#xff08;LeetCode&#xff09; 2. 题目描述 给定一个二叉树的根节点 root &#xff0c;返回 它的 中序 遍历 。 3. 题目示例 示例 1 : 输入&…

Addressable-配置相关

1、Profile 概述窗口配置 主要用于配置Addressable打包&#xff08;构建&#xff09;加载AB包时使用的一些变量,这些变量定义了 在哪里保存打包&#xff08;构建&#xff09;的AB包运行时在哪里加载AB包 可以添加自定义变量&#xff0c;以便在打包加载时使用,之后在设置 组中…

aws(学习笔记第四十三课) s3_sns_sqs_lambda_chain

文章目录 aws(学习笔记第四十三课) s3_sns_sqs_lambda_chain学习内容&#xff1a;1. 整体架构1.1 代码链接1.2 整体架构1.3 测试代码需要的修改1.3.1 unit test代码中引入stack的修改1.3.2 test_outputs_created代码中把错误的去掉 2. 代码解析2.1 生成dead_letter_queue死信队…

Python训练营打卡Day43

kaggle找到一个图像数据集&#xff0c;用cnn网络进行训练并且用grad-cam做可视化 进阶&#xff1a;并拆分成多个文件 config.py import os# 基础配置类 class Config:def __init__(self):# Kaggle配置self.kaggle_username "" # Kaggle用户名self.kaggle_key &quo…

hive 3集成Iceberg 1.7中的Java版本问题

hive 3.1.3 集成iceberg 1.7.2创建Iceberg表报错如下&#xff1a; Exception in thread "main" java.lang.UnsupportedClassVersionError: org/apache/iceberg/mr/hive/HiveIcebergStorageHandler has been compiled by a more recent version of the Java Runtime …

文本切块技术(Splitter)

为什么要分块&#xff1f; 将长文本分解成适当大小的片段&#xff0c;以便于嵌入、索引和存储&#xff0c;并提高检索的精确度。 用ChunkViz工具可视化分块 在线使用 ChunkViz github https://github.com/gkamradt/ChunkViz 如何确定大模型所能接受的最长上下文 可以从…

C++:用 libcurl 发送一封带有附件的邮件

编写mingw C 程序&#xff0c;用 libcurl 发送一封带有附件的邮件 下面是一个使用 MinGW 编译的 C 程序&#xff0c;使用 libcurl 发送带附件的邮件。这个程序完全通过代码实现 SMTP 邮件发送&#xff0c;不依赖外部邮件客户端&#xff1a; // send_email.cpp #include <i…

tensorflow image_dataset_from_directory 训练数据集构建

以数据集 https://www.kaggle.com/datasets/vipoooool/new-plant-diseases-dataset 为例 目录结构 训练图像数据集要求&#xff1a; 主目录下包含多个子目录&#xff0c;每个子目录代表一个类别。每个子目录中存储属于该类别的图像文件。 例如 main_directory/ ...cat/ ...…

遨游Spring AI:第一盘菜Hello World

Spring AI的正式版已经发布了&#xff0c;很显然&#xff0c;接下来我们要做的事情就是写一个Hello World。 总体思路就是在本地搭建一个简单的大模型&#xff0c;然后编写Spring AI代码与模型进行交互。 分五步&#xff1a; 1. 安装Ollama&#xff1b; 2. 安装DeepSeek&…

华为云Flexus+DeepSeek征文|基于华为云Flexus X和DeepSeek-R1打造个人知识库问答系统

目录 前言 1 快速部署&#xff1a;一键搭建Dify平台 1.1 部署流程详解 1.2 初始配置与登录 2 构建专属知识库 2.1 进入知识库模块并创建新库 2.2 选择数据源导入内容 2.3 上传并识别多种文档格式 2.4 文本处理与索引构建 2.5 保存并完成知识库创建 3接入ModelArts S…

Java优化:双重for循环

在工作中&#xff0c;经常性的会出现在两张表中查找相同ID的数据&#xff0c;许多开发者会使用两层for循环嵌套&#xff0c;虽然实现功能没有问题&#xff0c;但是效率极低&#xff0c;一下是一个简单的优化过程&#xff0c;代码耗时凑从26856ms优化到了748ms。 功能场景 有两…

Prompt Tuning:生成的模型文件有什么构成

一、为什么Prompt Tuning会生成模型文件? 1. Prompt Tuning的本质:优化可训练的「提示参数」 核心逻辑:Prompt Tuning(提示调优)是一种轻量级的微调技术,仅优化模型输入层的提示向量(Prompt Embedding)或少量额外参数,而非更新整个预训练模型的权重。生成模型文件的原…

ARM SMMUv3简介(一)

1.概述 SMMU&#xff08;System Memory Management Unit&#xff0c;系统内存管理单元&#xff09;是ARM架构中用于管理设备访问系统内存的硬件模块。SMMU和MMU的功能类似&#xff0c;都是将虚拟地址转换成物理地址&#xff0c;不同的是MMU转换的虚拟地址来自CPU&#xff0c;S…

在 Windows 系统上运行 Docker 容器中的 Ubuntu 镜像并显示 GUI

在 Windows 上安装一个 X Server&#xff08;如 VcXsrv 或 X410&#xff09;&#xff0c;Ubuntu 容器通过网络将图形界面转发到 Windows。 步骤&#xff1a; 安装 X Server&#xff1a; 推荐使用VcXsrv&#xff0c;免费开源。 安装后运行 XLaunch&#xff0c;选择&#xff1…

Vue3学习(4)- computed的使用

1. 简述与使用 作用&#xff1a;computed 用于基于响应式数据派生出新值&#xff0c;其值会自动缓存并在依赖变化时更新。 ​缓存机制​&#xff1a;依赖未变化时直接返回缓存值&#xff0c;避免重复计算&#xff08;通过 _dirty 标志位实现&#xff09;。​响应式更新​&…

【HarmonyOS 5】出行导航开发实践介绍以及详细案例

以下是 ‌HarmonyOS 5‌ 出行导航的核心能力详解&#xff08;无代码版&#xff09;&#xff0c;聚焦智能交互、多端协同与场景化创新&#xff1a; 一、交互革新&#xff1a;从被动响应到主动服务 ‌意图驱动导航‌ ‌自然语义理解‌&#xff1a;用户通过语音指令&#xff08;如…

csrf攻击学习

原理 csrf又称跨站伪造请求攻击&#xff0c;现代网站利用Cookie、Session 或 Token 等机制识别用户身份&#xff0c;一旦用户访问某个网站&#xff0c;浏览器在之后请求会自动带上这些信息来识别用户身份。用户在网站进行请求或者操作时服务器会给出对应的内容&#xff0c;比如…

深入剖析MySQL锁机制,多事务并发场景锁竞争

一、隐藏字段对 InnoDB 的行锁&#xff08;Record Lock&#xff09;与间隙锁&#xff08;Gap Lock&#xff09;的影响 1. 隐藏字段与锁的三大核心影响 类型影响维度描述DB_TRX_IDMVCC 可见性控制决定是否读取当前版本&#xff0c;或在加锁时避开不可见版本&#xff08;影响加锁…