文章目录

      • 示例:电商秒杀系统中的流量削峰
        • 1. 依赖引入(Maven)
        • 2. 消息队列配置(RabbitMQ)
        • 3. 生产者:订单服务(接收高并发请求)
        • 4. 消费者:库存服务(按系统容量处理订单)
        • 5. 模拟高并发测试
      • 关键技术点解析
        • 1. 流量削峰的实现
        • 2. 消息可靠性保障
        • 3. 削峰前后对比
      • 生产环境优化建议
      • 其他 MQ 选型参考

以下是一个基于 Java 和 RabbitMQ 实现流量削峰的示例,展示如何通过消息队列处理高并发下单请求:

示例:电商秒杀系统中的流量削峰

1. 依赖引入(Maven)
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.14.2</version>
</dependency>
2. 消息队列配置(RabbitMQ)
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class MQConfig {private static final String HOST = "localhost";private static final int PORT = 5672;private static final String USERNAME = "guest";private static final String PASSWORD = "guest";public static final String QUEUE_NAME = "order_queue";// 创建连接工厂public static ConnectionFactory getConnectionFactory() {ConnectionFactory factory = new ConnectionFactory();factory.setHost(HOST);factory.setPort(PORT);factory.setUsername(USERNAME);factory.setPassword(PASSWORD);return factory;}
}
3. 生产者:订单服务(接收高并发请求)
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;public class OrderService {private final Connection connection;public OrderService() throws IOException, TimeoutException {this.connection = MQConfig.getConnectionFactory().newConnection();}// 处理下单请求(削峰前)public void createOrderDirectly(Long productId, Integer count) {// 传统模式:直接处理订单(高并发时会压垮数据库)System.out.println("直接处理订单:商品ID=" + productId + ", 数量=" + count);// 模拟数据库操作try {Thread.sleep(200); // 假设处理一个订单需要200ms} catch (InterruptedException e) {e.printStackTrace();}}// 处理下单请求(削峰后)public void createOrderWithMQ(Long productId, Integer count) throws IOException {try (Channel channel = connection.createChannel()) {// 声明队列(如果不存在则创建)channel.queueDeclare(MQConfig.QUEUE_NAME, false, false, false, null);// 封装订单信息为JSONString orderInfo = "{\"productId\":" + productId + ",\"count\":" + count + "}";// 发送消息到队列channel.basicPublish("", MQConfig.QUEUE_NAME, null, orderInfo.getBytes());System.out.println("订单已放入队列:" + orderInfo);}}
}
4. 消费者:库存服务(按系统容量处理订单)
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;public class InventoryService {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = MQConfig.getConnectionFactory();try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 声明队列channel.queueDeclare(MQConfig.QUEUE_NAME, false, false, false, null);// 设置消费者每次只处理1条消息(限流)channel.basicQos(1);System.out.println("库存服务已启动,等待订单消息...");// 创建消费者DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("收到订单:" + message);try {// 模拟处理订单(扣库存、更新数据库等)processOrder(message);// 手动确认消息已处理channel.basicAck(envelope.getDeliveryTag(), false);} catch (Exception e) {e.printStackTrace();// 处理失败,拒绝消息并重新入队channel.basicNack(envelope.getDeliveryTag(), false, true);}}};// 启动消费者(手动确认模式)channel.basicConsume(MQConfig.QUEUE_NAME, false, consumer);}}private static void processOrder(String orderInfo) {try {// 模拟处理订单耗时(如扣减库存、写入订单表)Thread.sleep(500);System.out.println("订单处理完成:" + orderInfo);} catch (InterruptedException e) {e.printStackTrace();}}
}
5. 模拟高并发测试
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;public class TrafficPeakTest {public static void main(String[] args) throws IOException, TimeoutException {OrderService orderService = new OrderService();ExecutorService executor = Executors.newFixedThreadPool(100); // 模拟100个并发用户// 模拟1000个并发下单请求(流量峰值)for (int i = 0; i < 1000; i++) {final int orderId = i;executor.submit(() -> {try {// 削峰前:直接处理订单(可能导致系统崩溃)// orderService.createOrderDirectly(1001L, 1);// 削峰后:通过MQ异步处理订单orderService.createOrderWithMQ(1001L, 1);} catch (Exception e) {e.printStackTrace();}});}executor.shutdown();}
}

关键技术点解析

1. 流量削峰的实现
  • 生产者端:将订单请求快速放入队列后立即返回,避免请求堆积
  • 消费者端:
    • 通过 channel.basicQos(1) 限制每次只处理 1 条消息
    • 单线程消费(可扩展为多线程),每秒处理约 2 个订单(500ms / 订单)
  • 效果:1000 个并发请求被队列缓冲,系统按自身容量(2TPS)平稳处理
2. 消息可靠性保障
  • 持久化:RabbitMQ 默认将消息存储在内存中,可配置持久化到磁盘
  • 手动确认:消费者处理完成后手动 basicAck,失败则 basicNack 并重试
  • 死信队列:可配置死信队列存储多次处理失败的消息
3. 削峰前后对比
指标无 MQ(传统模式)有 MQ(流量削峰)
最大并发处理量受数据库连接数限制(如 100)队列可缓冲无限量请求
响应时间平均 200ms(直接处理)立即返回(<10ms)
系统稳定性峰值时易崩溃平稳处理,无崩溃风险
资源利用率峰值时资源耗尽,平时闲置按固定速率使用资源

生产环境优化建议

  1. 队列监控

    • 监控队列长度,设置告警阈值(如超过 10 万条未处理消息)
    • 使用 RabbitMQ Management 插件或 Prometheus + Grafana 监控
  2. 消费者扩容

    • 垂直扩容:增加消费者机器配置
    • 水平扩容:增加消费者实例数(需注意幂等性)
  3. 降级策略

    • 队列过长时,拒绝新请求并返回 “系统繁忙”
    • 非核心业务降级(如暂时关闭短信通知)
  4. 持久化配置

    // 设置队列持久化
    boolean durable = true;
    channel.queueDeclare(MQConfig.QUEUE_NAME, durable, false, false, null);// 设置消息持久化
    AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(2) // 2表示持久化.build();
    channel.basicPublish("", MQConfig.QUEUE_NAME, properties, message.getBytes());
    

其他 MQ 选型参考

消息队列吞吐量优势场景示例项目
RabbitMQ万级 TPS强一致性、支持事务、灵活路由金融系统订单处理
Kafka百万级 TPS大数据实时处理日志收集、实时数据流处理
RocketMQ十万级 TPS高可用、顺序消息电商订单、物流系统
Pulsar百万级 TPS云原生、多租户分布式微服务架构

根据业务场景选择合适的 MQ,本例使用 RabbitMQ 是因其易用性和可靠性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/84505.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/84505.shtml
英文地址,请注明出处:http://en.pswp.cn/web/84505.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【二进制安全作业】250616课上作业2 - 栈溢出漏洞利用

文章目录 前言一、使用环境二、程序源码1. C语言源码2. 编译方式 三、源码分析四、反汇编分析1. 检查文件安全性2. 查找目标函数3. 计算偏移量4. 绕过 strlen5. 绕过 if 五、编写EXP结语 前言 直接进入正题 一、使用环境 处理器架构&#xff1a;x86_64 操作系统&#xff1a;U…

Python类型处理与推导式

欢迎来到啾啾的博客&#x1f431;。 记录学习点滴。分享工作思考和实用技巧&#xff0c;偶尔也分享一些杂谈&#x1f4ac;。 有很多很多不足的地方&#xff0c;欢迎评论交流&#xff0c;感谢您的阅读和评论&#x1f604;。 目录 1 引言2 类型处理3 高效操作AI开发常见数据结构3…

数据库char字段做trim之后查询很慢的解决方式

select * from TABLE0 where trim(column1):param 当表数据量大时&#xff0c;即使给column1字段加上索引&#xff0c;这条查询语句也会很慢。 因为使用trim函数后&#xff0c;column1的索引会失效&#xff0c;有两种处理方法&#xff1a; 1.给表加上trim(column1)函数索引 …

Kafka核心架构解析:从CAP理论到消息可靠性的设计哲学

摘要 本文从分布式系统CAP理论和消息可靠性两个视角深入解析Kafka的架构设计&#xff0c;通过概念关系图和组件交互图揭示其核心设计思想&#xff0c;并详细拆解各组件功能与协作机制。文章包含完整的交互流程分析和配置参数说明&#xff0c;是理解Kafka设计精髓的实用指南。 一…

LeetCode 275.H指数 II

题目&#xff1a; 给你一个整数数组 citations &#xff0c;其中 citations[i] 表示研究者的第 i 篇论文被引用的次数&#xff0c;citations 已经按照 非降序排列 。计算并返回该研究者的 h 指数。 h 指数的定义&#xff1a;h 代表“高引用次数”&#xff08;high citations&…

OV汽车摄像头cmos sensor 相关情况介绍

OV汽车摄像头cmos sensor 相关情况介绍 文章目录 OV汽车摄像头cmos sensor 相关情况介绍**1. 汽车摄像头三大场景应用****2. 车载CMOS SENSOR的核心技术****3. 两大车规认证:实现真正的车规可靠性****4. 最新产品**2022年,汽车智能化加码提速,被誉为“智能驾驶之眼”的车载摄…

Pinia在多步骤表单中的实践应用

引言 Pinia是Vue 3推荐的状态管理库&#xff0c;相比Vuex提供了更简洁的API、更好的TypeScript支持和更灵活的组合式风格。本文基于实际项目代码&#xff0c;详细介绍Pinia在多步骤表单场景中的应用方法。 1. Pinia Store的创建与设计 1.1 基础Store结构 在src/store/modul…

目标检测之YOLOV11的环境搭建

1 创建虚拟环境 conda create -n yolov11 python3.9 conda activate yolov112 安装ultralytics 默认是有cuda的情况下 # Install all packages together using conda conda install pytorch torchvision conda 还不能直接安装ultralytics&#xff0c;需要通过pip进行安装 …

Android 构建配置中的变量(通常在设备制造商或定制 ROM 的 AndroidProducts.mk 或产品配置文件中定义)

以下是 Android 构建系统中常见的用于产品配置、资源复制和构建规则的变量 1. PRODUCT_COPY_FILES 作用&#xff1a;指定需要从源码树复制到镜像的文件。示例&#xff1a;PRODUCT_COPY_FILES \device/manufacturer/device_name/file.conf:$(TARGET_COPY_OUT_VENDOR)/etc/file…

火山引擎项亮:机器学习与智能推荐平台多云部署解决方案正式发布

资料来源&#xff1a;火山引擎-开发者社区 2022年7月20日&#xff0c;火山引擎2022 Force原动力大会在北京诺金酒店成功举办。在上午的议程中&#xff0c;《推荐系统实践》一书的作者、同时也是火山引擎机器学习系统负责人——项亮&#xff0c;展开了题目为《开放AI基建&#x…

NVR的方法多种取决于应用场景

摄像头接入NVR&#xff08;网络视频录像机&#xff09;的方法通常取决于具体的应用场景和设备支持的功能。 一、通过局域网接入 设备连接 &#xff1a; 将摄像机通过网络线缆连接到NVR的对应端口&#xff0c;或者将摄像机和NVR都连接到同一个路由器/交换机上&#xff0c;确保它…

JAVA从入门到精通一文搞定

博主介绍&#xff1a; 大家好&#xff0c;我是想成为Super的Yuperman&#xff0c;互联网宇宙厂经验&#xff0c;17年医疗健康行业的码拉松奔跑者&#xff0c;曾担任技术专家、架构师、研发总监负责和主导多个应用架构。 近期专注&#xff1a; DeepSeek应用&#xff0c;RPA应用研…

火山引擎发布大模型生态广场MCP Servers,LAS MCP助力AI数据湖构建

资料来源&#xff1a;火山引擎-开发者社区 近日&#xff0c;火山引擎发布大模型生态广场—— MCP Servers&#xff0c;借助字节跳动生态能力&#xff0c;通过“MCP Market&#xff08;工具广场&#xff09; 火山方舟&#xff08;大模型服务&#xff09;Trae&#xff08;应用开…

NodeJS 对接 Outlook 发信服务器实现发信功能

示例代码&#xff1a; const express require(express); const nodemailer require(nodemailer); const querystring require(querystring); const axios require(axios);const app express(); app.use(express.json());const transporter nodemailer.createTransport({…

【同声传译】RealtimeSTT:超低延迟语音转文字,支持唤醒词与中译英

把你说的话实时变成文字&#xff1a;RealtimeSTT 上手体验 想找一个真正好用的语音转文字工具吗&#xff1f;不用等说完一整段才出结果&#xff0c;也不用反复点击按钮。RealtimeSTT 这个开源项目能做到​​实时​​转录&#xff0c;你说一句&#xff0c;屏幕上几乎同时出现文…

【大模型lora微调】关于推理时如何使用 LoRA Adapter

假设你有两部分&#xff1a; 一个是原始大模型&#xff08;base model&#xff09; 一个是保存的 LoRA Adapter&#xff08;adapter_config.json adapter_model.bin&#xff09; 不合并的情况下推理方法 你可以用 peft 的方式加载 LoRA Adapter&#xff0c;推理时这样写&a…

谷歌时间序列算法:零样本预测如何重塑行业决策?

谷歌时间序列算法&#xff1a;零样本预测如何重塑行业决策&#xff1f; TimesFM 你是否曾面临这样的困境&#xff1f;—— ▸ 需要预测新产品销量&#xff0c;却苦于缺乏历史数据&#xff1b; ▸ 依赖传统模型&#xff08;如ARIMA&#xff09;&#xff0c;但调参耗时且泛化能力…

国产服务器【银河麒麟v10】【CPU鲲鹏920】部署Minio文件服务器

目录 准备工作操作步骤1. 确认挂载点状态2. 创建专用用户和目录3. 下载ARM版Minio到挂在盘4. 环境变量配置5. 更新Systemd服务配置6. 启动、重启7. 防火墙8. 访问验证9. 故障排查&#xff08;如服务未启动&#xff09;​ 结束 准备工作 环境要求&#xff1a;Linux虚拟机 操作…

解决: React Native android webview 空白页

Android react-native-webview 之前是正常的, 升级了 react-native / react-native-webview 等 之后, 就变成了空白页. 通过下面的修改, 可以修复, 回到正常的状态. 来源: https://github.com/react-native-webview/react-native-webview/issues/3697 注意 ts 文件一定要改,…

高中编程教学中教师专业发展的困境与突破:基于实践与理论的双重审视

一、引言 1.1 研究背景 在数字化时代&#xff0c;编程已成为一项基本技能&#xff0c;其重要性日益凸显。编程不仅是计算机科学领域的核心能力&#xff0c;更是培养学生逻辑思维、创新能力和问题解决能力的有效途径。高中阶段作为学生成长和发展的关键时期&#xff0c;开展编…