导语

本文将系统阐述 Apache RocketMQ 消息过滤机制的技术架构与实践要点。首先从业务应用场景切入,解析消息过滤的核心价值;接着介绍 Apache RocketMQ 支持的两种消息过滤实现方式,帮助读者建立基础认知框架;随后深入剖析 SQL 语法过滤与标签(Tag)过滤的技术实现的核心原理以及规则限制;最后介绍腾讯云在消息过滤性能优化方面的具体实践。

消息过滤的应用场景

消息过滤功能指消息生产者向 Topic 中发送消息时,设置消息属性对消息进行分类,消费者订阅 Topic 时,根据消息属性设置过滤条件对消息进行过滤,只有符合过滤条件的消息才会被投递到消费端进行消费。

消费者订阅 Topic 时若未设置过滤条件,无论消息发送时是否有设置过滤属性,Topic 中的所有消息都将被投递到消费端进行消费。

消息过滤功能可以应用在如下场景:

用户想使用一个 Topic,但是被多个 Group 订阅,每个 Group 只想消费其中一部分消息。

默认情况下,消费组1和消费组2 会全量消费 Topic 里面的所有消息。但如果我们想选择性的消费里面一些消息的时候,就可以使用消息过滤功能对消息进行区分过滤。

消息过滤原理介绍

目前消息过滤主要支持两种过滤方式,分别是 SQL 过滤和 Tag 过滤。其核心逻辑都是在发送消息的时候,设置一些自定义字段,然后通过消费组订阅的时候指定对应的过滤表达式,消息在服务端进行过滤后,才被消费组消费。

Tag 过滤

代码示例

在介绍原理之前,我们先直观的看一下用法,以 RocketMQ 4.x 的 SDK 为例:

// 创建消息生产者
DefaultMQProducer producer = ClientCreater.createProducer(GROUP_NAME);
// 创建消息实例,设置topic和消息内容,设置Tag1
Message msg = new Message(TOPIC_NAME, "Tag1", "Hello RocketMQ.".getBytes(StandardCharsets.UTF_8));
// 发送消息
SendResult sendResult = producer.send(msg, 3000);
System.out.println(sendResult + ":" + new String(msg.getBody()));
producer.shutdown();

在发送消息的时候,我们可以给消息指定 Tag。而在消费组这一侧,我们可以订阅不同的 Tag,例如使用星号(*)匹配全部 Tag。

// 创建消息消费者
DefaultMQPushConsumer pushConsumer = ClientCreater.createPushConsumer(GROUP_NAME);
// 订阅topic 订阅所有的TAG
pushConsumer.subscribe(TOPIC_NAME, "*");
//订阅指定的TAG
pushConsumer.subscribe(TOPIC_NAME, "Tag1");
//订阅多个TAG
pushConsumer.subscribe(TOPIC_NAME, "Tag1||Tag2");
// 省略其他代码
核心原理

那么,这个功能在底层究竟是怎么实现的呢?我们都知道,RocketMQ 在存储消息的时候,会先把消息写入 CommitLog。CommitLog 可以看作是 RocketMQ 存储消息的一个大日志,所有消息都会先写到这里。

写完 CommitLog 之后,RocketMQ 还会把消息的相关信息再写入ConsumeQueue。ConsumeQueue 可以理解为是 CommitLog 的一个索引,它里面存储的不是完整的消息内容,而是消息的一些关键信息,方便消费者快速找到和读取消息。

具体来说,当 RocketMQ 写入一条原始消息到 CommitLog 之后,它会提取这条消息的一些重要信息,比如这条消息在 CommitLog 里的物理偏移量(Offset)、消息的大小,还有这条消息 Tag 的哈希码(Hashcode),然后把这些信息写入到 ConsumeQueue 中。这样一来,消费者就可以通过 ConsumeQueue 快速定位到 CommitLog 里对应的消息,而不需要每次都去遍历整个 CommitLog,大大提高了消息消费的效率。

在发送消息的的时候,RocketMQ 会将消息 Tag 的 Hashcode 写入到 ConsumeQueue 字段中。

msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(topicConfig.getTopicFilterType(), msgInner.getTags()));
//上面这个方法
public static long tagsString2tagsCode(final TopicFilterType filter, final String tags) {if (Strings.isNullOrEmpty(tags)) {return 0; }return tags.hashCode();
}

当用户在消费消息时,服务端会从 ConsumeQueue 中一条一条地检查消息,并对这些消息进行过滤。过滤的时候,服务端会用 ConsumeQueue 中存储的消息 Tag 的 Hashcode,和当前订阅组所订阅的 Tag 进行匹配。这个匹配的过程,是在 RocketMQ 的核心代码 org.apache.rocketmq.store.DefaultMessageStore#getMessage 方法里实现的。简单来说,就是服务端会根据订阅组订阅的 Tag,从 ConsumeQueue 中找出符合条件的消息,然后交给用户去消费。

if (messageFilter != null && !messageFilter.isMatchedByConsumeQueue(cqUnit.getValidTagsCodeAsLong(), cqUnit.getCqExtUnit())) {//省略其他代码continue;
}

匹配逻辑

@Override
public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
if (null == tagsCode || null == subscriptionData) {return true;
}
if (subscriptionData.isClassFilterMode()) {return true;
}
return subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)||     subscriptionData.getCodeSet().contains(tagsCode.intValue());
}

可以看到,只有当订阅的 Tag 是 “*”(表示订阅所有消息),或者消息的 Tag 和订阅的 Tag 匹配上的时候,消费者才能消费到这条消息。这里您可能会有疑问,为啥要用 CodeSet 呢?其实是因为这里会把用户订阅组订阅的 Tag 进行拆分,然后把这些拆分后的 Tag 放到 CodeSet 里。这样在匹配的时候,就可以快速判断消息的 Tag 是否在 CodeSet 中,从而决定这条消息能不能被消费。

org.apache.rocketmq.remoting.protocol.filter.FilterAPI#buildSubscriptionData(java.lang.String, java.lang.String)
String[] tags = subString.split("\\|\\|");
if (tags.length > 0) {Arrays.stream(tags).map(String::trim).filter(tag -> !tag.isEmpty()).forEach(tag -> {subscriptionData.getTagsSet().add(tag);subscriptionData.getCodeSet().add(tag.hashCode());});
} else {throw new Exception("subString split error");
}

然后,CodeSet 里面存的就是多个 Hashcode 。

这里您可能又会有疑问:要是在服务端通过哈希码来匹配,那万一两个不同的 Tag 经过哈希运算之后,得到的哈希码是一样的,这样不就匹配错了吗?没错,这种情况确实有可能发生。

为了避免这种问题,客户端还会再做一层过滤,使用真正的 Tag 字符串再过滤一次,这样就能保证最终消费到的消息一定是符合订阅要求的,不会出现因为哈希冲突而导致的匹配错误。

org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#processPullResult
List<MessageExt> msgListFilterAgain = msgList;
if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {msgListFilterAgain = new ArrayList<>(msgList.size());for (MessageExt msg : msgList) {if (msg.getTags() != null) {if (subscriptionData.getTagsSet().contains(msg.getTags())) {msgListFilterAgain.add(msg);}}}
}

综上所述,我们就得到了这样一个 Tag 过滤的工作流程。

规则限制
  • 发送消息只能设置一个 Tag。

  • 多个 Tag 之间为或的关系,不同 Tag 间使用两个竖线(||)隔开。例如,Tag1||Tag2||Tag3,表示标签为 Tag1 或 Tag2 或 Tag3 的消息都满足匹配条件,都会被发送给消费者进行消费。

  • 多个 Tag 的顺序也要保持一致,否则会导致订阅关系不一致,例如 Tag1||Tag2 和 Tag2||Tag1 就是不同的。

SQL 过滤

从上面可以看到,Tag 过滤比较简单,通过在 ConsumeQueue 直接进行匹配,效率比较高,但是能支持的消息过滤比较简单,如果想通过消息的某个扩展字段来进行匹配,做一些更复杂的逻辑,就需要使用 SQL 过滤了。

代码示例

在发送方我们可以设置一下 putUserProperty,来扩展字段。

public static void main(String[] args) throws Exception {// 创建消息生产者DefaultMQProducer producer = ClientCreater.createProducer(GROUP_NAME);// 创建消息实例,设置topic和消息内容,设置自定义属性Message msg = new Message(TOPIC_NAME, "Hello RocketMQ.".getBytes(StandardCharsets.UTF_8));msg.putUserProperty("key1","value1");// 发送消息SendResult sendResult = producer.send(msg, 3000);System.out.println(sendResult + ":" + new String(msg.getBody()));producer.shutdown();
}

当消费组去消费消息的时候,它可以用 SQL 表达式来精准地筛选消息。例如我们可以设定条件,像 “Key1 必须等于 Value1”,或者设置更复杂的条件,用 “AND” 和 “OR” 这些逻辑运算符把多个条件组合起来。SQL 过滤能按照设置的条件,精确地过滤出符合条件的消息。

public static void main(String[] args) throws Exception {// 创建消息消费者DefaultMQPushConsumer pushConsumer = ClientCreater.createPushConsumer(GROUP_NAME);//订阅所有消息pushConsumer.subscribe(TOPIC_NAME, MessageSelector.bySql("True"));// 订阅topic 订阅单个key的sqlpushConsumer.subscribe(TOPIC_NAME,MessageSelector.bySql("key1 IS NOT NULL AND key1='value1'"));//订阅多个属性pushConsumer.subscribe(TOPIC_NAME,MessageSelector.bySql("key1 IS NOT NULL AND key2 IS NOT NULL  AND key1='value1' AND key2='value2'"));// 省略其他代码
}
核心原理

从前面的介绍可以知道,ConsumeQueue 里并没有存储用于 SQL 表达式匹配的相关信息。所以要是想根据 SQL 表达式来匹配消息,就只能把 CommitLog 里的消息读取出来,然后进行运算处理。实现这部分功能的代码,同样是在 org.apache.rocketmq.store.DefaultMessageStore#getMessage 这个方法里。

if (messageFilter != null && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {                                              // 省略部分代码continue;
}
//匹配逻辑。          
if (tempProperties == null && msgBuffer != null) {tempProperties = MessageDecoder.decodeProperties(msgBuffer);
}
Object ret = null;
try {MessageEvaluationContext context = new MessageEvaluationContext(tempProperties);ret = realFilterData.getCompiledExpression().evaluate(context);
} catch (Throwable e) {log.error("Message Filter error, " + realFilterData + ", " + tempProperties, e);
}

可以看到,这里是把所有消息的属性字段都放到 MessageEvaluationContext 里面,然后根据用户写的 SQL 表达式来对消息进行过滤。这里用到的 getCompiledExpression() 方法,其实就是通过 SqlFilter 把用户写的 SQL 表达式编译成一个 BooleanExpression,最终计算结果就是匹配或者不匹配。

如果您想了解这个功能的完整 SQL 实现细节,可以去深入研究 Rocketmq-filter 模块。

@Override
public Expression compile(final String expr) throws MQFilterException {return SelectorParser.parse(expr);
}

和 Tag 的过滤方式不太一样,BooleanExpression 需要读取真实的消息内容,并且是基于消息的实际字符串去做匹配。这种方式的好处是客户端不需要额外做什么配置,但缺点是性能相对会差一些,因为每次都要读取消息内容来匹配。

为了提升性能,社区想了个办法,就是在 ConsumeQueue 里增加一个扩展字段。不过要使用这个功能,需要先打开 enableConsumeQueueExt 这个开关。打开之后,就可以利用布隆过滤器(Bloom Filter)来做优化了。

布隆过滤器的原理大概是这样的:它会根据消息的属性生成一个序列化的布隆过滤器数据。在过滤的时候,如果布隆过滤器判断消息不符合条件,那这条消息肯定是不符合的,就可以直接过滤掉;如果布隆过滤器判断消息符合条件,那还需要进一步做精确匹配。

综上所述,我们就得到了这样一个 SQL 过滤的工作流程。

规则限制

由于 SQL 属性过滤是生产者定义消息属性,消费者设置 SQL 过滤条件,计算之后,可能有不同的结果,因此服务端的处理方式如下:

  • 异常情况处理:如果过滤条件的表达式计算抛异常,消息默认被过滤,不会被投递给消费者。例如比较数字和非数字类型的值。

  • 空值情况处理:如果过滤条件的表达式计算值为 null 或不是布尔类型(true 和 false),则消息默认被过滤,不会被投递给消费者。例如发送消息时不存在某个属性,在订阅时过滤条件中直接使用该属性,则过滤条件的表达式计算结果为 null。

  • 类型不符处理:如果消息自定义属性为浮点型,但过滤条件中使用整数进行判断,则消息默认被过滤,不会被投递给消费者。

虽然这种方式是灵活的,但是在消息头中还是不建议设置太多的值,因为总的消息头部属性有大小限制(32k),内置的已经占用了不少。超长之后,可能导致消息发送或者消费异常。

两种过滤方式的对比总结

过滤方式TAG过滤SQL过滤
是否支持多个过滤条件
性能
处理方客户端+服务端服务端
易用性一般

使用建议

合理划分主题和 Tag 标签。

从消息的过滤机制和主题的原理机制可以看出,业务消息的拆分可以基于主题进行筛选,也可以基于主题内消息的 Tag 标签及属性进行筛选。关于拆分方式的选择,应遵循以下原则:

  • 消息类型是否一致:不同类型的消息,如顺序消息和普通消息需要使用不同的主题进行拆分,无法通过 Tag 标签进行分类。

  • 业务域是否相同:不同业务域和部门的消息应该拆分不同的主题。例如物流消息和支付消息应该使用两个不同的主题;同样是一个主题内的物流消息,普通物流消息和加急物流消息则可以通过不同的 Tag 进行区分。

  • 消息量级和重要性是否一致:如果消息的量级规模存在巨大差异,或者说消息的链路重要程度存在差异,则应该使用不同的主题进行隔离拆分。

腾讯云消息过滤轨迹展示

从上述消息过滤的原理介绍可以发现,如果消息被过滤掉了,用户收不到这条消息,和消息本身没有被消费的情况看起来是一样的。这时候用户可能会有一些疑惑:在 RocketMQ 的管理控制台(dashboard)上,消息显示是“已消费”状态,可实际上自己并没收到。

在腾讯云 TDMQ RocketMQ 版上,我们针对过滤条件的查询进行了优化。通过这个优化,能够区分展示消息过滤和真正被消费两种情况的消息轨迹展示。这样一来,用户就能很直观地看到消息到底是被过滤掉了,还是真正被消费了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/pingmian/85475.shtml
繁体地址,请注明出处:http://hk.pswp.cn/pingmian/85475.shtml
英文地址,请注明出处:http://en.pswp.cn/pingmian/85475.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

安卓JetPack篇——LifeCycle原理

LifeCycle 一、什么是Lifecycle 具备宿主生命周期感知能力的组件。它能持有组件&#xff08;如Activity或Fragment&#xff09;生命周期状态的信息&#xff0c;并且允许其他观察者监听宿主的状态。 二、基本原理 1、安卓10以下版本 隐形的Fragment注入在LifecycleOwner&am…

CSS 圆角边框属性(`border-radius`)笔记

一、作用&#xff1a; 用于设置元素四个角的圆角效果&#xff0c;让元素不再死板&#xff0c;更加柔和。 二、基本语法&#xff1a; border-radius: 圆角大小; 单位&#xff1a;px&#xff08;像素&#xff09;或 %&#xff08;百分比&#xff09; 示例&#xff1a; div {  …

python自助棋牌室管理系统

目录 技术栈介绍具体实现截图系统设计研究方法&#xff1a;设计步骤设计流程核心代码部分展示研究方法详细视频演示试验方案论文大纲源码获取/详细视频演示 技术栈介绍 Django-SpringBoot-php-Node.js-flask 本课题的研究方法和研究步骤基本合理&#xff0c;难度适中&#xf…

计算机——硬盘分区和格式化

硬盘驱动器 硬盘驱动器&#xff08;HDD&#xff09;是一种成熟、经济的大容量存储解决方案。它的核心优势在于每GB成本低和超大容量。然而&#xff0c;其机械结构带来的速度瓶颈、噪音、功耗和对物理冲击的敏感性是其主要的缺点。随着 SSD 价格的持续下降和性能的绝对领先&…

从IEC到UL:技术主权竞争下的断路器合规性战略

1 国际标准体系割裂的现状 在全球低压电器领域&#xff0c;国际标准体系呈现出日益明显的割裂态势。当前主要存在四大标准体系&#xff1a;国际通用的​​IEC标准体系​​、欧洲采用的​​EN标准体系​​、北美实施的​​UL与CSA标准体系​​&#xff0c;以及具有地域特色的​…

第十六届蓝桥杯_省赛B组(D).产值调整

题目如下 这道题看似很简单&#xff0c;其实还是得观察一下&#xff0c;要不然就会… 话不多说回到题目&#xff0c;这个题的坑就在于当A,B,C三个产值相同的时候&#xff0c;再怎么变还是之前的产值&#xff0c;或者也可以通过另外一种方法理解&#xff1a; 通过一个案例来举…

设计模式 | 单例模式——饿汉模式 懒汉模式

单例模式 文章目录 单例模式一、饿汉模式&#xff08;Eager Initialization&#xff09;1. 定义2. 特点3. 饿汉单例模式&#xff08;定义时-类外初始化&#xff09;4. 实现细节 二、懒汉模式&#xff08;Lazy Initialization&#xff09;1. 定义2. 特点3. 懒汉单例模式&#xf…

dify本地部署及添加ollama模型(ubuntu24.04)

说明&#xff1a;ubuntu是虚拟机的&#xff0c;用的桥接模式&#xff0c;与本地同局域网不同ip地址。 参考VM虚拟机网络配置&#xff08;ubuntu24桥接模式&#xff09;&#xff1a;配置静态IP前提&#xff1a;需要有docker及docker-compose环境 参考ubuntu24安装docker及docker…

Python爬虫实战:研究multiprocessing相关技术

一、引言 1.1 研究背景与意义 随着互联网信息的爆炸式增长,网络爬虫已成为获取海量数据的重要工具。传统的单线程爬虫在面对大规模数据采集任务时效率低下,无法充分利用现代计算机多核 CPU 的优势。多线程爬虫虽然在一定程度上提高了效率,但受限于 Python 的全局解释器锁(…

6.18 redis面试题 日志 缓存淘汰过期删除 集群

Redis有哪2种持久化方式&#xff1f;分别的优缺点是什么&#xff1f; Redis 的重写 AOF 过程是由后台子进程 bgrewriteaof 来完成的。 过期删除策略和内存淘汰策略有什么区别&#xff1f; 内存淘汰策略是在内存满了的时候&#xff0c;redis 会触发内存淘汰策略&#xff0c;来淘…

什么时候会发生内存泄漏?

1. 内存泄漏是什么&#xff1f; 定义&#xff1a;内存泄漏是指程序中的对象已经不再需要&#xff0c;但由于被其他对象错误引用&#xff0c;导致垃圾回收器&#xff08;GC&#xff09;无法回收它&#xff0c;从而长期占用内存空间的现象。 2. 内存泄漏的危害 问题具体表现内存…

用RSA算法模拟类的适配器模式

“RAS算法”这个术语本身并不常见或标准&#xff0c;它可能指向两个主要领域的不同概念&#xff0c;具体取决于上下文&#xff1a; 更可能是拼写错误&#xff1a;指 RSA 算法&#xff08;密码学&#xff09; 这是最常见的情况。 “RAS” 极有可能是 “RSA” 的拼写错误。RSA 算…

CARSIM-与C#自动化测试方案

using System; using System.Runtime.InteropServices; using System.Collections.Generic;namespace CarSimAutomation {/// <summary>/// CarSim COM 自动化测试接口/// 封装所有 CarSim COM 功能用于自动化测试/// </summary>[ComVisible(true)][ClassInterface…

企微CRM系统中的任务分配与效率提升技巧

在数字化管理时代&#xff0c;企业微信(企微)与CRM系统的深度融合&#xff0c;为企业提供了更高效的客户管理与团队协作方案。企微CRM软件不仅整合了客户沟通、销售跟进、数据分析等功能&#xff0c;还能通过智能任务分配优化团队效率。本文将深入探讨企微CRM管理系统的任务分配…

day66—BFS—最短的桥(LeetCode-934)

题目描述 给你一个大小为 n x n 的二元矩阵 grid &#xff0c;其中 1 表示陆地&#xff0c;0 表示水域。 岛 是由四面相连的 1 形成的一个最大组&#xff0c;即不会与非组内的任何其他 1 相连。grid 中 恰好存在两座岛 。 你可以将任意数量的 0 变为 1 &#xff0c;以使两座…

FramePack 安装指南(中文)

FramePack 安装指南&#xff08;中文&#xff09; -Windows FramePack 是最前沿的 AI 视频生成框架&#xff0c;以极小的硬件需求颠覆视频创作&#xff01;它能在仅 6GB 笔记本 GPU 内存上&#xff0c;驱动 13B 模型以 30 FPS 生成超长 120 秒视频&#xff0c;几乎无内容限制&…

Redis Sentinel 非集群模式高可用部署指南

1. Sentinel 在非集群模式的定位 一句话&#xff1a;在单主多从架构中&#xff0c;用 Sentinel 替你盯哨——探测故障、选举新主、通知客户端。 核心四职能&#xff1a; 职能作用点Monitoring定时 PING 主从&#xff0c;自身也互相探测Notification通过日志/PubSub/外部调用报…

2025Java面试八股文

文章目录 Java基础JVM多线程SpringSpring Boot数据库与SQL分布式系统其他 Java基础 自动装箱与拆箱&#xff1a;Java中基础数据类型与包装类之间的转换。例如&#xff0c;Integer x 1; 是装箱&#xff0c;int y x; 是拆箱。Object类常用方法&#xff1a;如clone()、getClass…

宝塔安装nginx-rtmp,音视频直播

前置&#xff1a;需要自己开发音视频直播&#xff0c; 注意不是实时音视频&#xff0c;不是一对一视频聊天&#xff0c;不是视频会议 方案有 srs &#xff0c;nginx-rtmp&#xff0c;live555&#xff0c;node-media-server&#xff0c;EasyDarwin等 今天是说 nginx-rtmp 怎么…

基于微信小程序和深度学习的宠物照片拍摄指导平台的设计与实现

文章目录 摘要前言绪论1. 课题背景2. 国内外现状与趋势2.1 国内研究现状2.2 国外研究现状2.3 发展趋势3. 课题内容相关技术与方法介绍1. 微信小程序开发技术2. 深度学习模型选型2.1 MobileNetV22.2 ResNet-503. 系统架构设计4. 关键技术实现4.1 实时拍摄指导4.2 多模态建议生成…