RocketMQ、RabbitMQ与Kafka对比及常见问题解决方案

一、概述

消息队列(Message Queue, MQ)是企业IT系统内部通信的核心手段,用于提升性能、实现系统解耦和流量削峰。它具有低耦合、可靠投递、广播、流量控制、最终一致性等功能,是异步RPC的主要手段之一。当前主流消息中间件包括ActiveMQ、RabbitMQ、Kafka和RocketMQ等。本文对比RocketMQ、RabbitMQ和Kafka,并总结常见问题的解决方案。

Kafka是一种分布式流处理平台,最初由LinkedIn开发,现由Apache维护,专注于高吞吐量、持久化和实时数据流处理,广泛用于大数据和日志聚合场景。


二、特性对比

1. RocketMQ

RocketMQ是阿里巴巴自主研发的分布式消息中间件,具有高性能、高可靠性和高可扩展性,广泛应用于高并发场景。

  • NameServer:轻量级服务协调与治理组件,负责记录和维护Topic、Broker信息,监控Broker运行状态。NameServer几乎无状态,可集群部署,节点间无信息同步,类似注册中心。
  • Broker:消息服务器,提供核心消息服务。每个Broker与NameServer集群中的所有节点保持长连接,定时注册Topic信息。
  • Producer:消息生产者,负责生成消息并发送至Broker。
  • Consumer:消息消费者,从Broker获取消息并处理业务逻辑。

2. RabbitMQ

RabbitMQ是基于AMQP协议的开源消息中间件,注重灵活的路由机制和易用性,适合中小型企业或复杂路由场景。

  • Exchange:交换机,根据路由规则将消息转发到对应队列。
  • Broker:消息服务器,提供核心消息服务。
  • Channel:基于TCP连接的虚拟连接,用于消息传输。
  • Routing Key:生产者发送消息时携带的键,指定消息路由规则。
  • Binding Key:绑定Exchange与Queue时指定的键,Routing Key与Binding Key匹配时,消息被路由到对应Queue。

3. Kafka

Kafka是分布式、持久化的消息系统,设计为高吞吐量的日志流处理平台,支持分区和副本机制,适合大数据管道和实时分析。

  • Topic:消息主题,可分为多个分区(Partition),每个分区是一个有序的日志序列,支持并行消费。
  • Broker:Kafka服务器节点,负责存储和转发消息。多个Broker组成集群。
  • Producer:消息生产者,将消息发送到指定Topic的分区,支持分区策略(如轮询、键哈希)。
  • Consumer:消息消费者,通过Consumer Group实现负载均衡,每个消费者订阅Topic并消费分区消息。
  • Zookeeper/KRaft:早期依赖Zookeeper进行元数据管理和选举,新版使用KRaft(Kafka Raft)模式实现内置共识,无需Zookeeper。

三、常见问题及解决方案

1. 重复消费

问题描述:消费者消费消息后需发送确认消息(ACK)给消息队列,通知消息已被消费。若确认消息因网络故障等原因未送达,消息队列可能重复分发消息给其他消费者。

解决方案

  • 保证消息幂等性

    :确保消息多次消费不影响结果。常见方法:

    • 使用唯一消息ID,消费者检查是否已处理。
    • 数据库操作使用唯一约束或版本号控制。
  • RocketMQ

    • 消费者在业务逻辑处理完成后发送ACK,确保消息被正确消费。
    • 使用事务性消息或本地事务状态表,防止重复消费影响业务。
  • RabbitMQ

    • 采用手动确认模式(Manual ACK),处理消息成功后再回复确认。
    • 消费者通过检查消息的唯一标识(如Message ID)避免重复处理。
  • Kafka

    • 消费者管理Offset(消费偏移量),手动提交Offset(disable auto-commit)。
    • 如果消费者崩溃未提交Offset,重启后从上次Offset消费,可能重复;通过幂等操作或Exactly-Once语义(启用idempotence)处理。
    • Consumer Group中,Rebalance可能导致重复消费,使用唯一ID或状态存储(如数据库)确保幂等。

2. 数据丢失

问题描述:消息可能在生产者、消息队列或消费者端丢失,导致业务异常。

RocketMQ
  • 生产者丢数据
    • 使用同步发送(send()),同步感知发送结果,失败可重试(默认重试3次)。
    • 失败消息存储在CommitLog中,支持后续重试。
  • 消息队列丢数据
    • 消息持久化到CommitLog,即使Broker宕机后重启,未消费消息可恢复。
    • 支持同步刷盘(确保消息写入磁盘)和异步刷盘(高性能但可能丢失少量数据)。
    • Broker集群支持1主N从,同步复制确保主节点磁盘故障不丢失消息,异步复制性能更高但有毫秒级延迟。
  • 消费者丢数据
    • 完全消费成功后发送ACK。
    • 维护持久化的Offset记录消费进度,防止因故障丢失消费状态。
RabbitMQ
  • 生产者丢数据
    • 使用事务模式(支持回滚)或Confirm模式(ACK确认),确保生产者可靠发送。
  • 消息队列丢数据
    • 开启消息持久化,消息写入磁盘后通知生产者ACK。
    • 配合Confirm机制,确保消息持久化到磁盘。
  • 消费者丢数据
    • 禁用自动确认模式,改为手动确认(Manual ACK),确保消息处理成功后再确认。
    • 消费者维护消费状态,避免因故障重复消费或丢失。
Kafka
  • 生产者丢数据
    • 配置acks参数:acks=0(不确认,高性能可能丢失)、acks=1(Leader确认)、acks=all(所有副本确认,确保不丢失)。
    • 启用重试和幂等生产者(enable.idempotence=true),防止重复发送。
  • 消息队列丢数据
    • 消息持久化到日志文件(Log Segments),支持配置保留策略(时间或大小)。
    • 通过Replication Factor(副本因子)设置分区副本数,Leader-Follower机制确保高可用;min.insync.replicas配置最小同步副本数。
    • Broker宕机时,副本可选举新Leader,消息不丢失(视副本配置)。
  • 消费者丢数据
    • 禁用自动提交Offset(enable.auto.commit=false),手动提交确保处理成功。
    • 如果自动提交,处理中崩溃可能丢失消息;使用Exactly-Once语义结合事务处理。

3. 消费顺序

问题描述:某些业务场景要求消息按顺序消费,但分布式系统或多线程消费可能导致乱序。

解决方案

  • 单线程消费:保证队列内消息按顺序处理,但可能影响性能。
  • 消息编号:为消息分配序列号,消费者根据编号判断顺序。
  • Queue有序性
    • 消息队列内部数据天然有序。
    • 消费者端通过单线程消费或内存队列排序,确保顺序处理。
  • RocketMQ
    • 使用顺序消息(Sequential Message),将相关消息发送到同一分区,保证分区内顺序。
    • 消费者单线程拉取并处理分区消息。
  • RabbitMQ
    • 利用Queue的FIFO特性,单线程消费确保顺序。
    • 多线程消费时,消费者内部维护内存队列进行排序。
  • Kafka
    • 分区内消息严格有序(append-only日志),但多分区无全局顺序。
    • 对于顺序需求,将相关消息发送到同一分区(基于键哈希)。
    • 消费者组中,每个分区分配给单一消费者,确保分区内顺序;多线程消费需消费者内部协调。

4. 高可用

问题描述:消息队列需保证高可用,防止单点故障导致服务不可用。

RocketMQ
  • 多Master模式
    • 配置简单,性能最高。
    • 单机宕机或重启期间,该机器未消费消息不可用,影响实时性。
    • 可能有少量消息丢失(视配置)。
  • 多Master多Slave异步模式
    • 每Master配一个Slave,消息写入Master,异步复制到Slave。
    • 性能接近多Master,实时性高,主备切换对应用透明。
    • Master宕机或磁盘损坏可能丢失少量消息(毫秒级延迟)。
  • 多Master多Slave同步模式
    • 每Master配一个Slave,消息同步写入主备,成功后返回。
    • 服务和数据可用性高,但性能略低于异步模式。
    • 主节点宕机后,备节点无法自动切换为主,需人工干预。
RabbitMQ
  • 普通集群模式
    • 多台机器运行RabbitMQ实例,Queue仅存储在一个实例上,其他实例同步元数据。
    • 消费时,若连接到非Queue所在实例,会从Queue所在实例拉取数据。
    • 若Queue所在实例宕机,需等待其恢复(持久化消息不丢失),影响实时性。
  • 镜像集群模式
    • Queue的元数据和消息同步到多个实例,写入消息时自动同步到所有实例的Queue。
    • 优点:高可用,单节点宕机不影响服务。
    • 缺点:
      • 数据同步导致性能开销大。
      • 无法线性扩容,因每个节点存储全量数据,单节点容量受限。
Kafka
  • 分布式Broker集群
    • 通过分区和副本机制实现高可用,每个分区有多个副本分布在不同Broker。
    • Leader选举由Controller(基于Zookeeper或KRaft)管理,Broker宕机时自动切换到Follower副本。
    • 支持水平扩展,添加Broker可重新分配分区,提高吞吐量。
  • 优点:高吞吐(百万级TPS),数据持久化强,适合大规模数据流。
  • 缺点:配置复杂,依赖外部协调(如Zookeeper,KRaft缓解);实时性不如RabbitMQ,但延迟低(毫秒级)。

四、总结

  • RocketMQ:适合高并发交易场景,强调性能和分布式架构,NameServer和Broker设计支持大规模集群。数据丢失防护完善,适合对实时性要求高的场景,但在同步模式下主备切换需人工干预。
  • RabbitMQ:基于AMQP协议,灵活的路由机制适合复杂路由场景,易用性强。但镜像集群性能开销大,扩展性受限,适合中小规模应用。
  • Kafka:专注于高吞吐量和数据流处理,分区机制支持并行消费和扩展,适合日志、大数据管道。但不原生支持复杂路由,顺序消费限于分区内,配置较复杂。
  • 常见问题解决方案
    • 重复消费:三者均通过幂等性和手动确认/提交Offset解决。
    • 数据丢失:RocketMQ和Kafka通过主从/副本复制,RabbitMQ通过持久化和Confirm。
    • 消费顺序:利用分区/队列有序性,结合单线程或键分区。
    • 高可用:Kafka的分布式副本最强扩展性,RabbitMQ镜像集群数据一致性高,RocketMQ平衡性能与可靠性。

市场上几大消息队列对比如下:

对比项RabbitMQActiveMQRocketMQKafka
公司RabbitApache阿里Apache
语言ErlangJavaJavaScala & Java
协议支持AMQPOpenWire、STOMP、REST、XMPP、AMQP自定义自定义协议,社区封装了 HTTP 协议支持
客户端支持语言官方支持 Erlang、Java、Ruby 等,社区产出多种 API,几乎支持所有语言Java、C、C++、Python、PHP、Perl、.NET 等Java、C++(不成熟)官方支持 Java,社区产出多种 API,如 PHP、Python 等
单机吞吐量万级(约 3 万)万级(约 4 万)十万级(约 10 万)十万级(约 10 万)
消息延迟微秒级毫秒级毫秒级毫秒以内
可用性高,基于主从架构实现可用性高,基于主从架构实现可用性非常高,分布式架构非常高,分布式架构,一个数据多副本
消息可靠性-有较低概率丢失数据经过参数优化配置,可做到零丢失经过参数配置,可做到零丢失
功能支持基于 Erlang 开发,并发性能极强,性能极好,延时低MQ 领域功能极其完备MQ 功能较为完备,分布式扩展性好功能较为简单,主要支持基本 MQ 功能
优势Erlang 开发,性能极好、延时低,吞吐量万级,功能完备,管理界面优秀,社区活跃,互联网公司使用多成熟稳定,功能强大,业内大量应用接口简单易用,阿里出品,吞吐量大,分布式扩展方便,社区活跃,支持大规模 Topic 和复杂业务场景,可定制开发超高吞吐量,毫秒级延时,极高可用性和可靠性,分布式扩展方便
劣势吞吐量较低,Erlang 开发不易定制,集群动态扩展麻烦偶尔有低概率消息丢失,社区活跃度不高不遵循 JMS 规范,系统迁移需改大量代码,存在被替代风险可能发生消息重复消费
应用各类场景均有使用主要用于解耦和异步,较少用于大规模吞吐适用于大规模吞吐、复杂业务场景大数据实时计算、日志采集等场景的业界标准

选择中间件的可以从这些维度来考虑:可靠性,性能,功能,可运维行,可拓展性,社区活跃度。目前常用的几个中间件,ActiveMQ作为“老古董”,市面上用的已经不多,其它几种:

RabbitMQ:

优点:轻量,迅捷,容易部署和使用,拥有灵活的路由配置
缺点:性能和吞吐量不太理想,不易进行二次开发
RocketMQ:

优点:性能好,高吞吐量,稳定可靠,有活跃的中文社区
缺点:兼容性上不是太好
Kafka:

优点:拥有强大的性能及吞吐量,兼容性很好
缺点:由于“攒一波再处理”导致延迟比较高

RocketMQ专栏

1. 推模式(Push)与拉模式(Pull)的区别与实现
推模式:RocketMQ 的 PushConsumer 实际基于长轮询(Long Polling)实现,Broker 收到请求后若队列无消息,会挂起请求并在新消息到达时立即响应。
拉模式:消费者主动拉取,需自行控制频率(如 DefaultLitePullConsumer),适用于需精准控制消费速率的场景。
对比:
推模式实时性高,但需 Broker 维护连接状态,可能因消费能力不足导致积压。
拉模式灵活性高,但需处理消息延迟与空轮询问题。

2. 如何保证消息顺序性?
生产者:通过 MessageQueueSelector 将同一业务 ID 的消息发送至固定队列(如哈希取模)。
消费者:使用 MessageListenerOrderly 监听器,锁定队列并单线程消费。
源码关键点:RebalanceLockManager 管理队列锁,确保同一队列仅被一个线程消费。

3. 事务消息的实现机制
两阶段提交:
发送 Half 消息(预提交),Broker 存储但暂不投递。
执行本地事务,返回 Commit/Rollback 状态。
Broker 根据状态投递或删除消息,若未收到确认则发起事务回查。
应用场景:跨系统分布式事务(如订单创建与库存扣减)。

4. 消息积压的解决方案
临时扩容:增加 Consumer 实例或线程数,提升消费能力。
批量消费:调整 consumeMessageBatchMaxSize 参数,一次处理多条消息。
跳过非关键消息:若允许部分消息丢失,可重置消费位点(resetOffsetByTime)。
异步处理:将耗时操作(如 DB 写入)异步化,减少消费阻塞。

5. 消息的存储结构是怎样的?CommitLog 和 ConsumeQueue 的关系?
CommitLog 存储原始消息,ConsumeQueue 存储逻辑队列的偏移量,通过偏移量快速定位消息。

6. Consumer 的负载均衡策略是什么?
平均分配、一致性 Hash 等,通过 RebalanceService 定时调整队列分配。

7. 如何实现消息的精准一次投递?
RocketMQ 不保证,需业务端结合事务消息 + 幂等性实现。

8. Broker 的刷盘机制如何选择?
高可靠性场景用 SYNC_FLUSH,高性能场景用 ASYNC_FLUSH。

9. NameServer 宕机后,Producer 和 Consumer 还能工作吗?
可以,客户端会缓存路由信息,但无法感知新 Broker 或 Topic 变化。

10. 性能调优
Broker 参数:
sendMessageThreadPoolNums:发送线程数。
pullMessageThreadPoolNums:拉取线程数。
零拷贝技术:通过 MappedFile 内存映射文件减少数据拷贝。

11. Broker 如何处理拉取请求?
长轮询机制:Consumer 拉取请求无消息时,Broker 挂起请求(默认 30s),新消息到达后立即响应。
源码关键点:PullRequestHoldService 管理挂起请求,通过 checkHoldRequest 周期性检查消息到达。

12. RocketMQ 消息存储结构:CommitLog 与 ConsumeQueue 的关系
CommitLog:所有 Topic 的消息按顺序追加写入,文件名格式为 {文件起始偏移量}.log,固定大小 1GB(可配置)。
ConsumeQueue:逻辑队列索引,存储消息在 CommitLog 中的偏移量、大小、Tag HashCode,文件名格式为 {Topic}/{QueueId}/{ConsumeQueueOffset}。
关系:消费者通过 ConsumeQueue 快速定位 CommitLog 中的消息,实现高效检索。

13. 主从同步机制(SYNC/ASYNC)的区别与选型
SYNC_MASTER:
生产者收到 Slave 写入成功 ACK 后才返回,保证数据强一致。
适用场景:金融交易、资金扣减。
ASYNC_MASTER:
主节点写入成功即返回,Slave 异步复制,性能更高。
适用场景:日志传输、允许短暂不一致。

14. 消息重试与死信队列(DLQ)机制
重试队列:消费失败的消息进入重试队列(命名格式:%RETRY%{ConsumerGroup}),按延迟等级(1s, 5s, 10s…)重试。
死信队列:重试 16 次后仍失败,消息进入死信队列(%DLQ%{ConsumerGroup}),需人工处理。
配置参数:maxReconsumeTimes(默认 16 次)。

15. 如何实现消息轨迹(Trace)追踪?
开启方式:Broker 配置 traceTopicEnable=true,Producer/Consumer 设置 enableMsgTrace=true。
原理:消息发送/消费时,额外生成轨迹数据写入内部 Topic RMQ_SYS_TRACE_TOPIC。
查询工具:RocketMQ Console 或自定义消费者订阅轨迹 Topic。

16. Rebalance 机制如何工作?
触发条件:Consumer 数量变化、Broker 上下线、Topic 路由变更。
流程:
客户端定时向 Broker 发送心跳,上报 Consumer Group 信息。
Broker 通过 RebalanceService 计算队列分配策略(平均分配、一致性 Hash)。
Consumer 根据新分配结果调整拉取队列。
源码入口:RebalanceImpl#rebalanceByTopic。

17. RocketMQ 5.0 新特性(如 Proxy 模式)
Proxy 模式:解耦 Broker 与客户端协议,支持多语言客户端(如 HTTP/gRPC),增强云原生兼容性。
事务增强:支持 TCC 模式,提供更灵活的事务解决方案。
轻量级 SDK:简化客户端依赖,提升启动速度。
三、高级特性与源码原理

18. 零拷贝技术
RocketMQ:使用 mmap 内存映射文件,减少用户态与内核态数据拷贝。
Kafka:采用 sendfile 系统调用,实现更高吞吐但灵活性较低。

19. DLedger 高可用机制
基于 Raft 协议实现主从选举,主节点故障时自动切换,保障数据一致性。

20. 消息过滤
Tag 过滤:Broker 端过滤,减少网络传输。
SQL 过滤:需开启 enablePropertyFilter=true,支持复杂条件匹配。

21. 事务消息实现细节
两阶段提交:
发送 Half 消息(预提交),Broker 存储但暂不投递。
执行本地事务,返回 Commit/Rollback 状态。
Broker 根据状态投递或删除消息,若未收到确认则发起事务回查。
源码分析:TransactionMQProducer 处理本地事务回调,TransactionalMessageService 管理事务状态。

22. 消息索引文件(IndexFile)的作用
存储结构:哈希索引(Key: Message Key, Value: CommitLog Offset)。
用途:通过 Message Key 或 Unique Key 快速查询消息,支持按时间范围检索。
源码类:IndexService, IndexFile。

23. PageCache 与 Mmap 如何提升性能?
PageCache:利用操作系统缓存,将磁盘文件映射到内存,加速读写。
Mmap:通过内存映射文件,避免 read()/write() 系统调用的数据拷贝,提升 CommitLog 写入效率。
刷盘策略:SYNC_FLUSH(同步刷盘)依赖 FileChannel.force(),ASYNC_FLUSH 使用后台线程批量刷盘。

24. 消息消费位点(Offset)管理机制
本地存储:Consumer 默认将 Offset 存储在本地文件(~/.rocketmq_offsets)。
远程存储:集群模式下,Offset 上报至 Broker(ConsumerOffsetManager)。
重置方式:
CONSUME_FROM_LAST_OFFSET:从最大位点开始消费。
CONSUME_FROM_FIRST_OFFSET:从最小位点开始消费。

25. 消息索引文件(IndexFile)的作用
存储结构:哈希索引(Key: Message Key, Value: CommitLog Offset)。
用途:通过 Message Key 或 Unique Key 快速查询消息,支持按时间范围检索。
源码类:IndexService, IndexFile。

26. PageCache 与 Mmap 如何提升性能?
PageCache:利用操作系统缓存,将磁盘文件映射到内存,加速读写。
Mmap:通过内存映射文件,避免 read()/write() 系统调用的数据拷贝,提升 CommitLog 写入效率。
刷盘策略:SYNC_FLUSH(同步刷盘)依赖 FileChannel.force(),ASYNC_FLUSH 使用后台线程批量刷盘。

27. 消息消费位点(Offset)管理机制
本地存储:Consumer 默认将 Offset 存储在本地文件(~/.rocketmq_offsets)。
远程存储:集群模式下,Offset 上报至 Broker(ConsumerOffsetManager)。
重置方式:
CONSUME_FROM_LAST_OFFSET:从最大位点开始消费。
CONSUME_FROM_FIRST_OFFSET:从最小位点开始消费。

场景设计题

1 .设计一个高并发秒杀系统,如何利用 RocketMQ 优化?

    流量削峰:将秒杀请求写入 RocketMQ 队列,异步处理订单创建与库存扣减。顺序消息:使用哈希选择器将同一用户请求路由到固定队列,避免超卖。事务消息:扣减库存与生成订单通过事务消息保证最终一致性。动态扩容:根据监控指标(如堆积消息数)自动扩容 Consumer,快速消化积压

2 . 设计一个秒杀系统,如何用 RocketMQ 解决超卖问题?

    消息队列削峰填谷 + 数据库乐观锁 + 事务消息保证最终库存一致。

3 . 如何实现分布式事务(订单扣库存+生成订单)?

    事务消息:半消息预扣库存,本地事务生成订单,失败则回滚库存。

4.如何设计一个异地多活消息队列系统?

    跨城同步:Broker 集群分机房部署,通过 Async replication 同步消息。单元化路由:Producer 根据用户 ID 哈希选择本地机房 Broker,减少跨城延迟。容灾切换:监控机房状态,自动切换消息路由至可用机房。

5.消息丢失的可能原因与解决方案

    生产者丢失:原因:异步发送未处理 SendCallback 异常。解决:使用同步发送 + 重试机制。Broker 丢失:原因:异步刷盘时宕机,PageCache 数据未落盘。解决:SYNC_FLUSH 刷盘 + 主从同步。消费者丢失:原因:消费成功但 Offset 未提交。解决:先处理业务逻辑,再手动提交 Offset。、
Demo实操
  1. 引入依赖
<dependencies><!-- RocketMQ 客户端 --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.4.0</version></dependency><!-- Spring Boot Web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>
</dependencies>

2. 配置文件 application.yml

yamlrocketmq:namesrvAddr: 127.0.0.1:9876producer:group: test-producer-groupconsumer:group: test-consumer-grouptopic: TestTopic

3. 生产者配置类

javapackage com.example.demo.config;import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RocketMQProducerConfig {@Value("${rocketmq.namesrvAddr}")private String namesrvAddr;@Value("${rocketmq.producer.group}")private String producerGroup;@Beanpublic DefaultMQProducer mqProducer() throws MQClientException {DefaultMQProducer producer = new DefaultMQProducer(producerGroup);producer.setNamesrvAddr(namesrvAddr);producer.start();System.out.println("🚀 RocketMQ Producer 启动成功");return producer;}
}

4. 生产者发送接口

javapackage com.example.demo.controller;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.*;import java.nio.charset.StandardCharsets;@RestController
@RequestMapping("/mq")
public class ProducerController {@Autowiredprivate DefaultMQProducer producer;@Value("${rocketmq.topic}")private String topic;@PostMapping("/send")public String sendMessage(@RequestParam String msg) throws Exception {Message message = new Message(topic, msg.getBytes(StandardCharsets.UTF_8));SendResult result = producer.send(message);return "发送成功: " + result;}
}

5. 消费者配置类

java复制编辑
package com.example.demo.config;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.nio.charset.StandardCharsets;
import java.util.List;@Configuration
public class RocketMQConsumerConfig {@Value("${rocketmq.namesrvAddr}")private String namesrvAddr;@Value("${rocketmq.consumer.group}")private String consumerGroup;@Value("${rocketmq.topic}")private String topic;@Beanpublic DefaultMQPushConsumer mqConsumer() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);consumer.setNamesrvAddr(namesrvAddr);consumer.subscribe(topic, "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {String body = new String(msg.getBody(), StandardCharsets.UTF_8);System.out.println("📩 收到消息: " + body);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println("✅ RocketMQ Consumer 启动成功");return consumer;}
}

最后有兴趣可以尝试自动重试,TraceId 追踪, 异步发送, 批量发送

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/93049.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/93049.shtml
英文地址,请注明出处:http://en.pswp.cn/bicheng/93049.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【CSS 变量】让你的 CSS “活”起来:深入理解 CSS 自定义属性与主题切换

【CSS 变量】让你的 CSS “活”起来&#xff1a;深入理解 CSS 自定义属性与主题切换 所属专栏&#xff1a; 《前端小技巧集合&#xff1a;让你的代码更优雅高效》 上一篇&#xff1a; 【CSS 视觉】无需JS&#xff0c;纯 CSS 实现酷炫视觉效果&#xff08;clip-path, filter, b…

RAG初步实战:从 PDF 到问答:我的第一个轻量级 RAG 系统(附详细项目代码内容与说明)

RAG初步实战&#xff1a;从 PDF 到问答&#xff1a;我的第一个轻量级 RAG 系统 项目背景与目标 在大模型逐渐普及的今天&#xff0c;Retrieval-Augmented Generation&#xff08;RAG&#xff0c;检索增强生成&#xff09;作为连接“知识库”和“大语言模型”的核心范式&#…

自主泊车算法

看我的git 在 open space 空间下规划出⼀条⾃⻋到停⻋位的⽆碰撞轨迹 满⾜平滑约束 可跟踪 考虑动态障碍物约束 在路径不可⽤的情况下 具备重规划能⼒ 重规划时能够做到⽆缝切换 即从原路径⽆缝切换到重规划路径 ⽆明显体感 规划频率 10HZ

USB 2.0 学习(2)- 连接

上回说到 usb的信号 k 状态和 j 状态&#xff0c;补充一下 usb的一些电气小知识。 1.USB设备有四根线 电源线VBus、 D、 D-、 地线GND 2.USB主机端的 D 和 D-各有1个15k下拉电阻&#xff0c;这是为了准确检测 D还是D-线上电平的变化 因为USB总线检测USB设备是低速还是全速设备…

解锁 Appium Inspector:移动端 UI 自动化定位的利器

​ 在移动端 UI 自动化测试中&#xff0c;元素定位是绕不开的核心环节。无论是 Android 还是 iOS 应用&#xff0c;能否精准、高效地定位到界面元素&#xff0c;直接决定了自动化脚本的稳定性和可维护性。而 Appium Inspector 作为 Appium 生态中专门用于元素定位的工具&#…

机器学习概念1

了解机器学习1、什么是机器学习机器学习是一门通过编程让计算机从数据中进行学习的科学 通用定义&#xff1a;机器学习是一个研究领域让计算机无须进行明确编程就具备学习能力 工程化定义&#xff1a;一个计算机程序利用经验E来学习任务T&#xff0c;性能是P&#xff0c;如果针…

前端html学习笔记5:框架、字符实体与 HTML5 新增标签

本文为个人学习总结&#xff0c;如有谬误欢迎指正。前端知识众多&#xff0c;后续将继续记录其他知识点&#xff01; 目录 前言 一、框架标签 作用&#xff1a; 语法&#xff1a; 属性&#xff1a; 二、字符实体 作用&#xff1a; 三、html5新增标签 语义化 状态 列…

Day05 店铺营业状态设置 Redis

Redis 入门 Redis 简介 Redis 是一个基于内存的 key-value 结构数据库。 基于内存存储&#xff0c;读写性能高 适合存储热点数据&#xff08;热点商品&#xff0c;资讯&#xff0c;新闻&#xff09; 企业应用广泛 redis 中文网&#xff1a;Redis中文网 Redis 下载与安装 R…

Linux驱动开发probe字符设备的完整创建流程

一、 设备号分配1.静态分配通过register_chrdev_region预先指定设备号&#xff08;需要确保未被占用&#xff09;2.动态分配通过alloc_chrdev_region由内核自动分配主设备号&#xff0c;一般都是动态分配以避免冲突。3316 xxxx_dev.major 0; 3317 3318 if (xx…

生产环境中Spring Cloud Sleuth与Zipkin分布式链路追踪实战经验分享

生产环境中Spring Cloud Sleuth与Zipkin分布式链路追踪实战经验分享 在复杂的微服务架构中&#xff0c;服务调用链路繁杂&#xff0c;单点故障或性能瓶颈往往难以定位。本文结合真实生产环境案例&#xff0c;分享如何基于Spring Cloud Sleuth与Zipkin构建高可用、低开销的分布…

基于Python的《红楼梦》文本分析与机器学习应用

本文将详细介绍如何使用Python和机器学习技术对《红楼梦》进行深入的文本分析和处理&#xff0c;包括文本分卷、分词、停用词处理、TF-IDF特征提取以及文本可视化等关键技术。一、项目概述本项目的目标是对中国古典文学名著《红楼梦》进行全面的自动化处理和分析&#xff0c;主…

Bevy渲染引擎核心技术深度解析:架构、体积雾与Meshlet渲染

本文将深入探讨Bevy游戏引擎的渲染架构&#xff0c;重点分析其体积雾实现原理、Meshlet渲染技术以及基于物理的渲染&#xff08;PBR&#xff09;系统。内容严格基于技术实现细节&#xff0c;覆盖从底层渲染管线到高级特效的全套解决方案。一、Bevy渲染架构深度解析1.1 核心架构…

CASS11计算斜面面积

1.生成三角网2.工程应用--计算表面积--根据三角网

借助Rclone快速从阿里云OSS迁移到AWS S3

本文作者: 封磊 Eclicktech SA | AWS Community Builder DevTool | AWS UGL | 亚马逊云科技云博主 阿里云&InfoQ&CSDN签约作者 概述 随着企业云战略的调整和多云架构的普及&#xff0c;数据迁移成为了一个常见需求。本文将详细介绍如何使用Rclone工具&#xff0c;高效…

【入门系列】图像算法工程师如何入门计算机图形学?

作为图像算法工程师&#xff0c;入门计算机图形学&#xff08;CG&#xff09;有天然优势——你熟悉图像处理的像素级操作、数学工具&#xff08;如矩阵运算&#xff09;和优化思维&#xff0c;而图形学的核心目标&#xff08;从3D信息生成2D图像&#xff09;与图像处理有很强的…

淘宝API列表:高效获取商品详情图主图商品视频参数item_get

淘宝商品详情信息基本都是用图片展示的&#xff0c;制作精美&#xff0c;能更好的展示商品信息。如何通过API实现批量获取商品详情信息呢&#xff1f;1、在API平台注册账号&#xff0c;获取调用API的key和密钥。2、查看API文档&#xff0c;了解相关请求参数和返回参数。item_ge…

第23章,景深:技术综述

一&#xff0c;定义&#xff1a; 中景&#xff1a;物体聚焦的范围&#xff08;即清晰成像的范围&#xff09;。 景深&#xff1a;在中景之外&#xff0c;都会成像模糊&#xff0c;即景深。景深通常用来指示对场景的注意范围&#xff0c;并提供场景深度的感觉。 背景&#xff1a…

飞算 JavaAI -智慧城市项目实践:从交通协同到应急响应的全链路技术革新

免责声明&#xff1a;此篇文章所有内容都是本人实验&#xff0c;并非广告推广&#xff0c;并非抄袭&#xff0c;如有侵权&#xff0c;请联系。 目录 一、智慧城市核心场景的技术攻坚 1.1 交通信号智能优化系统的实时决策 1.1.1 实时车流数据处理与分析 1.1.2 动态信号配时…

GM3568JHF快速入门教程【二】FPGA+ARM异构开发板环境编译教程

SDK 可通过搭建好的 Docker 镜像环境进行编译。 具体参可考该部分文档内容。1 Docker镜像环境编译SDK1.1 SDK 自动编译命令切换到 Docker 内需要编译的 SDK 根目录&#xff0c;全自动编译默认是 Buildroot&#xff0c; 可以通过设置环境变量 RK_ROOTFS_SYSTEM 指定不同 rootfs.…

Vue3 整合高德地图完成搜索、定位、选址功能,已封装为组件开箱即用(最新)

Vue3 整合高德地图完成搜索、定位、选址功能&#xff08;最新&#xff09;1、效果演示2、前端代码2.1 .env.development2.2 GaodeMap.vue2.3使用示例1、效果演示 2、前端代码 2.1 .env.development https://console.amap.com/dev/key/app# 地图配置 VITE_AMAP_KEY "您的…