一、前言
再JAVA系统开发中,再高并发的场景经常需要使用到消息队列,有时候是不得不使用到消息对了。特别是大数据量的并发处理。对数据实时性要求又没那么高的情况下。
用户请求 → 接入层(Nginx) → 限流 → 消息队列 → 订单服务 → 库存服务 → 支付服务
↑ ↓
结果缓存 ←───────────────┘
在高并发场景下,消息队列(MQ)作为系统解耦、流量削峰和异步处理的核心组件,其性能优化和稳定性保障至关重要。下面我将从架构设计、性能优化、可靠性保障等方面详细解析高并发场景下的MQ使用策略。
高并发场景下的MQ选型策略
1. 主流MQ性能对比
特性 | RabbitMQ | Kafka | RocketMQ | Pulsar |
---|---|---|---|---|
吞吐量 | 万级 | 百万级 | 十万级 | 百万级 |
延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒级 |
持久化 | 支持 | 支持 | 支持 | 支持 |
事务消息 | 支持 | 支持(0.11+) | 支持 | 支持 |
高可用 | 镜像队列 | 分区复制 | 主从复制 | 分层存储 |
适用场景 | 业务解耦/复杂路由 | 日志/流处理 | 订单/交易 | 多租户/流计算 |
2. 选型建议
电商秒杀:RocketMQ(事务消息+高吞吐)
日志收集:Kafka(超高吞吐+分区存储)
金融支付:RabbitMQ(强一致性+复杂路由)
物联网IoT:Pulsar(多租户+低延迟)
二、MQ主要使用,一是数据产生,第二是消费
消息队列(MQ)是分布式系统中常用的异步通信机制,Java中常用的MQ实现包括RabbitMQ、Apache Kafka、ActiveMQ、RocketMQ等。下面我将介绍这些MQ在Java中的基本使用方法。
三、直接上代码示例
生产者:
/*** 消息队列的生产者*/ package cn.xxx.module.member.mq.producer;
@Slf4j
@Component
public class MemberUserProducer {@Resourceprivate ApplicationContext applicationContext;/*** 发送 {@link MemberUserCreateMessage} 消息** @param userId 用户编号*/public void sendUserCreateMessage(Long userId) {applicationContext.publishEvent(new MemberUserCreateMessage().setUserId(userId));}}
消费者:
/*** 消息队列的消费者*/ package cn.xxx.module.member.mq.consumer;
@Slf4j
@Component
public class MemberRegisterPointIssueConsumer implements ApplicationRunner {@Resourceprivate RocketTXMqService rocketTXMqService;@Resourceprivate MemberPointIssueApi memberPointIssueApi;@Value("${rocketmq.producer2.topic}")private String memberRegisterPointIssueTopic;@Overridepublic void run(ApplicationArguments args) throws Exception {DefaultMQPushConsumer pushConsumer = rocketTXMqService.getPushConsumer2();if (null != pushConsumer) {try {pushConsumer.subscribe(memberRegisterPointIssueTopic, "*");// 注册回调实现类来处理从broker拉取回来的消息pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {// 消息处理逻辑IssueRegisterTaskPointReqVO issueRegisterTaskPointReqVO = JSONObject.parseObject(msgs.get(0).getBody(), IssueRegisterTaskPointReqVO.class);log.info("%s Receive Topic %s New Messages: %s issueRegisterTaskPointReqVO: %s %n", Thread.currentThread().getName(), memberRegisterPointIssueTopic, msgs, JSONObject.toJSONString(issueRegisterTaskPointReqVO));memberPointIssueApi.issueRegisterTaskPoint(issueRegisterTaskPointReqVO);// 标记该消息已经被成功消费, 根据消费情况,返回处理状态return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});// 启动消费者实例pushConsumer.start();log.info("push topic{} consumer start success", memberRegisterPointIssueTopic);} catch (MQClientException e) {log.error("push topic{} MQClientException:{}", memberRegisterPointIssueTopic, e.getMessage());}}}
}
可靠性保障机制
1. 消息不丢失设计
环节 | 保障措施 |
---|---|
生产者 | 事务消息/confirm机制+本地消息表+定时任务补偿 |
Broker | 同步刷盘+多副本同步复制(ISR)+ RAID磁盘阵列 |
消费者 | 手动ACK+消费幂等设计+死信队列+消息轨迹追踪 |
消息积压处理方案
// 动态扩容消费者实例(Kafka示例)
// 1. 监控积压量
long lag = getConsumerLag("order-topic", "consumer-group");
// 2. 自动扩容规则
if (lag > 100000) { // 积压超过10万scaleConsumerInstances(2); // 双倍扩容
} else if (lag < 1000) {scaleConsumerInstances(0.5); // 缩容50%
}// 3. 紧急处理方案
if (lag > 500000) {// 启动降级处理服务startDegradeService();// 消息转存至冷存储transferToColdStorage();
}