Java中的RabbitMQ完全指南

1. 引言

什么是RabbitMQ

RabbitMQ是一个开源的消息代理和队列服务器,实现了高级消息队列协议(AMQP)。它充当应用程序之间的消息中间件,允许分布式系统中的不同组件进行异步通信。RabbitMQ使用Erlang语言开发,以其高性能、可靠性和可扩展性而闻名。

消息队列的核心概念

消息队列是一种异步通信机制,它允许应用程序通过发送和接收消息来进行通信,而不需要直接连接。这种模式带来了以下优势:

  • 解耦:生产者和消费者不需要同时在线
  • 可扩展性:可以独立扩展生产者和消费者
  • 可靠性:消息可以持久化存储,确保不丢失
  • 灵活性:支持多种消息传递模式

RabbitMQ的优势和应用场景

RabbitMQ在企业级应用中具有以下优势:

  • 多协议支持:支持AMQP、STOMP、MQTT等多种协议
  • 灵活的路由:支持多种Exchange类型和复杂的路由规则
  • 集群支持:可以构建高可用的集群架构
  • 管理界面:提供Web管理控制台
  • 丰富的客户端库:支持多种编程语言

常见应用场景包括:微服务解耦、异步任务处理、系统集成、削峰填谷等。

2. RabbitMQ基础概念

Exchange(交换器)类型详解

Exchange是RabbitMQ的核心组件,负责接收生产者发送的消息并将其路由到相应的队列。主要有四种类型:

Direct Exchange(直连交换器)

  • 根据routing key精确匹配路由消息
  • 适用于单播消息传递
  • 默认的交换器类型

Fanout Exchange(扇形交换器)

  • 将消息广播到所有绑定的队列
  • 忽略routing key
  • 适用于广播场景

Topic Exchange(主题交换器)

  • 基于通配符模式匹配routing key
  • 支持"*“(单个单词)和”#"(零个或多个单词)
  • 灵活的路由规则

Headers Exchange(头交换器)

  • 基于消息头属性进行路由
  • 较少使用,性能相对较低

Queue(队列)和消息持久化

Queue是存储消息的容器,具有以下特性:

  • FIFO原则:先进先出的消息处理顺序
  • 持久化:可以配置队列和消息的持久化
  • 排他性:可以设置队列只能被一个连接使用
  • 自动删除:当没有消费者时自动删除队列

Routing Key和Binding

  • Routing Key:生产者发送消息时指定的路由键
  • Binding:Exchange和Queue之间的绑定关系
  • Binding Key:绑定时指定的键,用于匹配routing key

Virtual Host(虚拟主机)

Virtual Host提供了逻辑隔离,类似于网络中的虚拟主机概念:

  • 不同vhost中的Exchange、Queue等资源完全隔离
  • 每个vhost有独立的权限控制
  • 默认vhost为"/"

3. Java环境准备

RabbitMQ服务器安装配置

Docker方式安装(推荐):

# 拉取RabbitMQ镜像(包含管理插件)
docker pull rabbitmq:3-management# 运行RabbitMQ容器
docker run -d --name rabbitmq \-p 5672:5672 \-p 15672:15672 \-e RABBITMQ_DEFAULT_USER=admin \-e RABBITMQ_DEFAULT_PASS=admin123 \rabbitmq:3-management

安装完成后,可以通过 http://localhost:15672 访问管理界面。

Maven/Gradle依赖配置

Maven配置:

<dependencies><!-- RabbitMQ Java客户端 --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.18.0</version></dependency><!-- Spring Boot RabbitMQ Starter --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- 日志依赖 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>2.0.7</version></dependency>
</dependencies>

Gradle配置:

dependencies {implementation 'com.rabbitmq:amqp-client:5.18.0'implementation 'org.springframework.boot:spring-boot-starter-amqp'implementation 'org.slf4j:slf4j-simple:2.0.7'
}

连接工厂和连接管理

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class RabbitMQConnection {public static Connection getConnection() throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);factory.setUsername("admin");factory.setPassword("admin123");factory.setVirtualHost("/");// 连接超时设置factory.setConnectionTimeout(30000);factory.setRequestedHeartbeat(60);return factory.newConnection();}
}

4. 基础消息模式实现

Simple Queue(简单队列)

最基本的消息模式,一个生产者发送消息到队列,一个消费者从队列接收消息。

生产者代码:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class SimpleProducer {private final static String QUEUE_NAME = "simple_queue";public static void main(String[] args) throws Exception {try (Connection connection = RabbitMQConnection.getConnection();Channel channel = connection.createChannel()) {// 声明队列channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 发送消息String message = "Hello, RabbitMQ!";channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));System.out.println("Sent: " + message);}}
}

消费者代码:

import com.rabbitmq.client.*;public class SimpleConsumer {private final static String QUEUE_NAME = "simple_queue";public static void main(String[] args) throws Exception {Connection connection = RabbitMQConnection.getConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 创建消费者DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("Received: " + message);};// 开始消费channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});System.out.println("Waiting for messages...");}
}

Work Queues(工作队列)

工作队列模式用于在多个消费者之间分发耗时任务,实现负载均衡。

生产者代码:

public class WorkQueueProducer {private final static String QUEUE_NAME = "work_queue";public static void main(String[] args) throws Exception {try (Connection connection = RabbitMQConnection.getConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 发送多个任务for (int i = 1; i <= 10; i++) {String message = "Task " + i;channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));System.out.println("Sent: " + message);}}}
}

消费者代码:

public class WorkQueueConsumer {private final static String QUEUE_NAME = "work_queue";public static void main(String[] args) throws Exception {Connection connection = RabbitMQConnection.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 设置每次只处理一个消息(公平分发)channel.basicQos(1);DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("Processing: " + message);try {// 模拟耗时处理Thread.sleep(2000);} catch (InterruptedException e) {Thread.currentThread().interrupt();}// 手动确认消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);System.out.println("Completed: " + message);};// 关闭自动确认channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});System.out.println("Worker waiting for tasks...");}
}

Publish/Subscribe(发布订阅)

发布订阅模式使用fanout类型的Exchange将消息广播到所有绑定的队列。

发布者代码:

public class PublishSubscribeProducer {private final static String EXCHANGE_NAME = "fanout_exchange";public static void main(String[] args) throws Exception {try (Connection connection = RabbitMQConnection.getConnection();Channel channel = connection.createChannel()) {// 声明fanout类型的交换器channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);String message = "Broadcast message to all subscribers!";channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));System.out.println("Published: " + message);}}
}

订阅者代码:

public class PublishSubscribeConsumer {private final static String EXCHANGE_NAME = "fanout_exchange";public static void main(String[] args) throws Exception {Connection connection = RabbitMQConnection.getConnection();Channel channel = connection.createChannel();// 声明交换器channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);// 创建临时队列String queueName = channel.queueDeclare().getQueue();// 将队列绑定到交换器channel.queueBind(queueName, EXCHANGE_NAME, "");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("Received: " + message);};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});System.out.println("Subscriber waiting for messages...");}
}

Routing(路由模式)

路由模式使用direct类型的Exchange根据routing key将消息路由到特定队列。

生产者代码:

public class RoutingProducer {private final static String EXCHANGE_NAME = "direct_exchange";public static void main(String[] args) throws Exception {try (Connection connection = RabbitMQConnection.getConnection();Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);// 发送不同级别的日志消息String[] levels = {"info", "warning", "error"};for (String level : levels) {String message = "This is a " + level + " message";channel.basicPublish(EXCHANGE_NAME, level, null, message.getBytes("UTF-8"));System.out.println("Sent [" + level + "]: " + message);}}}
}

消费者代码:

public class RoutingConsumer {private final static String EXCHANGE_NAME = "direct_exchange";public static void main(String[] args) throws Exception {Connection connection = RabbitMQConnection.getConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);String queueName = channel.queueDeclare().getQueue();// 只接收error级别的消息channel.queueBind(queueName, EXCHANGE_NAME, "error");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");String routingKey = delivery.getEnvelope().getRoutingKey();System.out.println("Received [" + routingKey + "]: " + message);};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});System.out.println("Consumer waiting for error messages...");}
}

Topics(主题模式)

主题模式使用topic类型的Exchange支持通配符路由。

生产者代码:

public class TopicProducer {private final static String EXCHANGE_NAME = "topic_exchange";public static void main(String[] args) throws Exception {try (Connection connection = RabbitMQConnection.getConnection();Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);// 发送不同主题的消息String[] routingKeys = {"user.info.create","user.warning.update", "order.error.payment","system.info.startup"};for (String routingKey : routingKeys) {String message = "Message for " + routingKey;channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));System.out.println("Sent [" + routingKey + "]: " + message);}}}
}

消费者代码:

public class TopicConsumer {private final static String EXCHANGE_NAME = "topic_exchange";public static void main(String[] args) throws Exception {Connection connection = RabbitMQConnection.getConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);String queueName = channel.queueDeclare().getQueue();// 绑定多个模式channel.queueBind(queueName, EXCHANGE_NAME, "user.*.*");  // 所有用户相关消息channel.queueBind(queueName, EXCHANGE_NAME, "*.error.*"); // 所有错误消息DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");String routingKey = delivery.getEnvelope().getRoutingKey();System.out.println("Received [" + routingKey + "]: " + message);};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});System.out.println("Consumer waiting for messages matching patterns...");}
}

RPC模式

RPC模式实现远程过程调用,客户端发送请求并等待服务端响应。

RPC服务端:

public class RPCServer {private final static String QUEUE_NAME = "rpc_queue";public static void main(String[] args) throws Exception {Connection connection = RabbitMQConnection.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.basicQos(1);DeliverCallback deliverCallback = (consumerTag, delivery) -> {AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder().correlationId(delivery.getProperties().getCorrelationId()).build();String response = "";try {String message = new String(delivery.getBody(), "UTF-8");int n = Integer.parseInt(message);System.out.println("Computing fibonacci(" + n + ")");response = String.valueOf(fibonacci(n));} catch (RuntimeException e) {System.out.println("Error: " + e.toString());response = "Error: " + e.toString();} finally {channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}};channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});System.out.println("RPC Server waiting for requests...");}private static int fibonacci(int n) {if (n <= 1) return n;return fibonacci(n - 1) + fibonacci(n - 2);}
}

RPC客户端:

import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;public class RPCClient implements AutoCloseable {private Connection connection;private Channel channel;private String requestQueueName = "rpc_queue";public RPCClient() throws Exception {connection = RabbitMQConnection.getConnection();channel = connection.createChannel();}public String call(String message) throws Exception {final String corrId = UUID.randomUUID().toString();String replyQueueName = channel.queueDeclare().getQueue();AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build();channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {if (delivery.getProperties().getCorrelationId().equals(corrId)) {response.offer(new String(delivery.getBody(), "UTF-8"));}}, consumerTag -> {});String result = response.take();channel.basicCancel(ctag);return result;}@Overridepublic void close() throws Exception {connection.close();}public static void main(String[] args) throws Exception {try (RPCClient client = new RPCClient()) {System.out.println("Requesting fibonacci(10)");String response = client.call("10");System.out.println("Got: " + response);}}
}

5. Spring Boot集成RabbitMQ

Spring AMQP配置

application.yml配置:

spring:rabbitmq:host: localhostport: 5672username: adminpassword: admin123virtual-host: /connection-timeout: 30000publisher-confirm-type: correlatedpublisher-returns: truelistener:simple:acknowledge-mode: manualconcurrency: 2max-concurrency: 10

配置类:

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {public static final String DIRECT_EXCHANGE = "spring.direct.exchange";public static final String TOPIC_EXCHANGE = "spring.topic.exchange";public static final String DIRECT_QUEUE = "spring.direct.queue";public static final String TOPIC_QUEUE = "spring.topic.queue";// 声明Direct Exchange@Beanpublic DirectExchange directExchange() {return new DirectExchange(DIRECT_EXCHANGE, true, false);}// 声明Topic Exchange  @Beanpublic TopicExchange topicExchange() {return new TopicExchange(TOPIC_EXCHANGE, true, false);}// 声明队列@Beanpublic Queue directQueue() {return QueueBuilder.durable(DIRECT_QUEUE).build();}@Beanpublic Queue topicQueue() {return QueueBuilder.durable(TOPIC_QUEUE).build();}// 绑定关系@Beanpublic Binding directBinding() {return BindingBuilder.bind(directQueue()).to(directExchange()).with("direct.routing.key");}@Beanpublic Binding topicBinding() {return BindingBuilder.bind(topicQueue()).to(topicExchange()).with("topic.*.key");}
}

RabbitTemplate使用

消息生产者服务:

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class MessageProducerService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendDirectMessage(String message) {rabbitTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE, "direct.routing.key", message);System.out.println("Sent direct message: " + message);}public void sendTopicMessage(String routingKey, String message) {rabbitTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE, routingKey, message);System.out.println("Sent topic message with key " + routingKey + ": " + message);}// 发送对象消息public void sendObjectMessage(Object obj) {rabbitTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE,"direct.routing.key",obj);}
}

@RabbitListener注解详解

消息消费者服务:

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
import com.rabbitmq.client.Channel;@Service
public class MessageConsumerService {// 基础消费者@RabbitListener(queues = RabbitMQConfig.DIRECT_QUEUE)public void handleDirectMessage(String message) {System.out.println("Received direct message: " + message);}// 手动确认消息@RabbitListener(queues = RabbitMQConfig.TOPIC_QUEUE)public void handleTopicMessage(@Payload String message,@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag,Channel channel) throws Exception {try {System.out.println("Processing topic message: " + message);// 模拟业务处理Thread.sleep(1000);// 手动确认channel.basicAck(deliveryTag, false);} catch (Exception e) {System.err.println("Error processing message: " + e.getMessage());// 拒绝消息并重新入队channel.basicNack(deliveryTag, false, true);}}// 接收完整消息对象@RabbitListener(queues = RabbitMQConfig.DIRECT_QUEUE)public void handleCompleteMessage(Message message, Channel channel) throws Exception {String body = new String(message.getBody());String routingKey = message.getMessageProperties().getReceivedRoutingKey();System.out.println("Received message: " + body + " with routing key: " + routingKey);// 手动确认channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}
}

消息转换器配置

JSON消息转换器:

import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class MessageConverterConfig {@Beanpublic Jackson2JsonMessageConverter jsonMessageConverter() {return new Jackson2JsonMessageConverter();}@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate template = new RabbitTemplate(connectionFactory);template.setMessageConverter(jsonMessageConverter());// 配置确认回调template.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {System.out.println("Message sent successfully");} else {System.out.println("Message send failed: " + cause);}});// 配置返回回调template.setReturnsCallback(returned -> {System.out.println("Message returned: " + returned.getMessage());});return template;}
}

使用JSON转换器发送对象:

// 定义消息对象
public class UserMessage {private Long id;private String name;private String email;// 构造函数、getter、setterpublic UserMessage() {}public UserMessage(Long id, String name, String email) {this.id = id;this.name = name;this.email = email;}// getter和setter方法...
}// 发送和接收对象消息
@Service
public class UserMessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendUserMessage(UserMessage user) {rabbitTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE,"user.routing.key",user);}@RabbitListener(queues = "user.queue")public void handleUserMessage(UserMessage user) {System.out.println("Received user: " + user.getName() + " (" + user.getEmail() + ")");}
}

6. 高级特性

消息确认机制(ACK)

RabbitMQ提供了多种消息确认机制来保证消息的可靠传递:

自动确认(Auto ACK):

// 消息被消费者接收后立即确认
@RabbitListener(queues = "auto.ack.queue", ackMode = "AUTO")
public void handleAutoAck(String message) {System.out.println("Auto ACK: " + message);
}

手动确认(Manual ACK):

@RabbitListener(queues = "manual.ack.queue", ackMode = "MANUAL")
public void handleManualAck(String message,@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag,Channel channel) throws Exception {try {// 处理业务逻辑processMessage(message);// 确认消息channel.basicAck(deliveryTag, false);} catch (Exception e) {// 拒绝消息,重新入队channel.basicNack(deliveryTag, false, true);}
}

发布确认:

@Configuration
public class PublisherConfirmConfig {@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate template = new RabbitTemplate(connectionFactory);// 启用发布确认template.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {System.out.println("Message published successfully");} else {System.out.println("Message publish failed: " + cause);}});// 启用消息返回template.setReturnsCallback(returned -> {System.out.println("Message returned: " + returned.getMessage().toString());System.out.println("Reply code: " + returned.getReplyCode());System.out.println("Reply text: " + returned.getReplyText());});return template;}
}

死信队列(DLX)处理

死信队列用于处理无法正常消费的消息,常见的死信场景包括:

  • 消息被拒绝且不重新入队
  • 消息TTL过期
  • 队列达到最大长度

死信队列配置:

@Configuration
public class DeadLetterConfig {public static final String BUSINESS_EXCHANGE = "business.exchange";public static final String BUSINESS_QUEUE = "business.queue";public static final String DEAD_LETTER_EXCHANGE = "dlx.exchange";public static final String DEAD_LETTER_QUEUE = "dlx.queue";// 业务交换器@Beanpublic DirectExchange businessExchange() {return new DirectExchange(BUSINESS_EXCHANGE);}// 死信交换器@Beanpublic DirectExchange deadLetterExchange() {return new DirectExchange(DEAD_LETTER_EXCHANGE);}// 业务队列(配置死信交换器)@Beanpublic Queue businessQueue() {return QueueBuilder.durable(BUSINESS_QUEUE).withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE).withArgument("x-dead-letter-routing-key", "dead.letter.routing.key").withArgument("x-message-ttl", 10000) // 消息TTL 10秒.build();}// 死信队列@Beanpublic Queue deadLetterQueue() {return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();}// 绑定关系@Beanpublic Binding businessBinding() {return BindingBuilder.bind(businessQueue()).to(businessExchange()).with("business.routing.key");}@Beanpublic Binding deadLetterBinding() {return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with("dead.letter.routing.key");}
}

死信处理服务:

@Service
public class DeadLetterService {@Autowiredprivate RabbitTemplate rabbitTemplate;// 发送业务消息public void sendBusinessMessage(String message) {rabbitTemplate.convertAndSend(DeadLetterConfig.BUSINESS_EXCHANGE,"business.routing.key",message);}// 业务消息处理@RabbitListener(queues = DeadLetterConfig.BUSINESS_QUEUE)public void handleBusinessMessage(String message,@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag,Channel channel) throws Exception {try {System.out.println("Processing business message: " + message);// 模拟处理失败if (message.contains("error")) {throw new RuntimeException("Business processing failed");}channel.basicAck(deliveryTag, false);} catch (Exception e) {System.err.println("Business processing failed: " + e.getMessage());// 拒绝消息,不重新入队,进入死信队列channel.basicNack(deliveryTag, false, false);}}// 死信消息处理@RabbitListener(queues = DeadLetterConfig.DEAD_LETTER_QUEUE)public void handleDeadLetterMessage(String message) {System.out.println("Handling dead letter message: " + message);// 记录日志、发送告警、人工处理等logDeadLetterMessage(message);}private void logDeadLetterMessage(String message) {// 实现日志记录逻辑System.out.println("Dead letter logged: " + message);}
}

消息TTL和队列过期

消息TTL配置:

@Service
public class TTLMessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;// 发送带TTL的消息public void sendTTLMessage(String message, int ttlSeconds) {MessageProperties properties = new MessageProperties();properties.setExpiration(String.valueOf(ttlSeconds * 1000)); // 毫秒Message msg = new Message(message.getBytes(), properties);rabbitTemplate.send("ttl.exchange", "ttl.routing.key", msg);}// 使用MessagePostProcessor设置TTLpublic void sendTTLMessageWithProcessor(String message, int ttlSeconds) {rabbitTemplate.convertAndSend("ttl.exchange","ttl.routing.key", message,msg -> {msg.getMessageProperties().setExpiration(String.valueOf(ttlSeconds * 1000));return msg;});}
}

队列TTL配置:

@Bean
public Queue ttlQueue() {return QueueBuilder.durable("ttl.queue").withArgument("x-message-ttl", 60000) // 队列中消息的默认TTL.withArgument("x-expires", 300000)    // 队列没有消费者时的过期时间.build();
}

优先级队列

优先级队列配置:

@Bean
public Queue priorityQueue() {return QueueBuilder.durable("priority.queue").withArgument("x-max-priority", 10) // 最大优先级为10.build();
}

发送优先级消息:

@Service
public class PriorityMessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendPriorityMessage(String message, int priority) {rabbitTemplate.convertAndSend("priority.exchange","priority.routing.key",message,msg -> {msg.getMessageProperties().setPriority(priority);return msg;});}@RabbitListener(queues = "priority.queue")public void handlePriorityMessage(String message, @Header("priority") Integer priority) {System.out.println("Received priority " + priority + " message: " + message);}
}

7. 性能优化和最佳实践

连接池管理

连接池配置:

@Configuration
public class RabbitConnectionConfig {@Beanpublic CachingConnectionFactory connectionFactory() {CachingConnectionFactory factory = new CachingConnectionFactory("localhost");factory.setUsername("admin");factory.setPassword("admin123");// 连接池配置factory.setChannelCacheSize(50);           // 缓存的Channel数量factory.setConnectionCacheSize(10);        // 缓存的Connection数量factory.setChannelCheckoutTimeout(30000);  // Channel获取超时时间// 连接超时配置factory.setConnectionTimeout(30000);factory.setRequestedHeartBeat(60);// 发布确认factory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);factory.setPublisherReturns(true);return factory;}
}

批量处理消息

批量发送消息:

@Service
public class BatchMessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendBatchMessages(List<String> messages) {// 使用事务批量发送rabbitTemplate.execute(channel -> {channel.txSelect(); // 开启事务try {for (String message : messages) {channel.basicPublish("batch.exchange","batch.routing.key",null,message.getBytes("UTF-8"));}channel.txCommit(); // 提交事务} catch (Exception e) {channel.txRollback(); // 回滚事务throw new RuntimeException("Batch send failed", e);}return null;});}// 使用发布确认批量发送public void sendBatchWithConfirm(List<String> messages) {rabbitTemplate.execute(channel -> {channel.confirmSelect(); // 开启发布确认模式for (String message : messages) {channel.basicPublish("batch.exchange","batch.routing.key", null,message.getBytes("UTF-8"));}// 等待所有消息确认boolean allConfirmed = channel.waitForConfirms(5000);if (!allConfirmed) {throw new RuntimeException("Not all messages were confirmed");}return null;});}
}

批量消费消息:

@Component
public class BatchConsumerService {private final List<String> messageBuffer = new ArrayList<>();private final int BATCH_SIZE = 100;@RabbitListener(queues = "batch.queue", ackMode = "MANUAL")public void handleBatchMessage(String message,@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag,Channel channel) throws Exception {synchronized (messageBuffer) {messageBuffer.add(message);if (messageBuffer.size() >= BATCH_SIZE) {processBatch(new ArrayList<>(messageBuffer));messageBuffer.clear();// 批量确认消息channel.basicAck(deliveryTag, true);}}}private void processBatch(List<String> messages) {System.out.println("Processing batch of " + messages.size() + " messages");// 批量处理逻辑for (String message : messages) {// 处理单个消息System.out.println("Processing: " + message);}}// 定时处理剩余消息@Scheduled(fixedDelay = 5000)public void processRemainingMessages() {synchronized (messageBuffer) {if (!messageBuffer.isEmpty()) {processBatch(new ArrayList<>(messageBuffer));messageBuffer.clear();}}}
}

消费者并发控制

并发配置:

spring:rabbitmq:listener:simple:concurrency: 5          # 初始消费者数量max-concurrency: 20     # 最大消费者数量prefetch: 10           # 预取消息数量acknowledge-mode: manualretry:enabled: trueinitial-interval: 1000max-attempts: 3multiplier: 2

动态并发控制:

@Component
public class DynamicConcurrencyService {@Autowiredprivate SimpleRabbitListenerContainerFactory factory;@EventListenerpublic void handleHighLoad(HighLoadEvent event) {// 动态调整并发数factory.setConcurrentConsumers(10);factory.setMaxConcurrentConsumers(50);}@EventListener  public void handleLowLoad(LowLoadEvent event) {factory.setConcurrentConsumers(2);factory.setMaxConcurrentConsumers(10);}
}

监控和日志记录

监控配置:

@Component
public class RabbitMQMetrics {private final MeterRegistry meterRegistry;private final Counter messagesSent;private final Counter messagesReceived;private final Timer messageProcessingTime;public RabbitMQMetrics(MeterRegistry meterRegistry) {this.meterRegistry = meterRegistry;this.messagesSent = Counter.builder("rabbitmq.messages.sent").description("Number of messages sent").register(meterRegistry);this.messagesReceived = Counter.builder("rabbitmq.messages.received").description("Number of messages received").register(meterRegistry);this.messageProcessingTime = Timer.builder("rabbitmq.message.processing.time").description("Message processing time").register(meterRegistry);}public void incrementMessagesSent() {messagesSent.increment();}public void incrementMessagesReceived() {messagesReceived.increment();}public Timer.Sample startProcessingTimer() {return Timer.start(meterRegistry);}
}

日志配置:

@Component
@Slf4j
public class MessageLoggingService {@Autowiredprivate RabbitMQMetrics metrics;@RabbitListener(queues = "monitored.queue")public void handleMonitoredMessage(String message) {Timer.Sample sample = metrics.startProcessingTimer();try {log.info("Processing message: {}", message);// 业务处理逻辑processMessage(message);metrics.incrementMessagesReceived();log.info("Message processed successfully: {}", message);} catch (Exception e) {log.error("Error processing message: {}", message, e);throw e;} finally {sample.stop(metrics.getMessageProcessingTime());}}private void processMessage(String message) {// 实际业务处理}
}

8. 实战案例

订单处理系统

在电商系统中,订单处理涉及多个步骤:库存检查、支付处理、物流安排等。使用RabbitMQ可以实现异步处理和系统解耦。

订单消息定义:

public class OrderMessage {private Long orderId;private Long userId;private List<OrderItem> items;private BigDecimal totalAmount;private String status;private Date createTime;// 构造函数、getter、setterpublic OrderMessage() {}public OrderMessage(Long orderId, Long userId, List<OrderItem> items, BigDecimal totalAmount) {this.orderId = orderId;this.userId = userId;this.items = items;this.totalAmount = totalAmount;this.status = "CREATED";this.createTime = new Date();}// getter和setter方法...
}public class OrderItem {private Long productId;private String productName;private Integer quantity;private BigDecimal price;// 构造函数、getter、setter...
}

订单处理配置:

@Configuration
public class OrderProcessingConfig {public static final String ORDER_EXCHANGE = "order.exchange";public static final String ORDER_CREATED_QUEUE = "order.created.queue";public static final String INVENTORY_CHECK_QUEUE = "inventory.check.queue";public static final String PAYMENT_PROCESS_QUEUE = "payment.process.queue";public static final String SHIPPING_ARRANGE_QUEUE = "shipping.arrange.queue";@Beanpublic TopicExchange orderExchange() {return new TopicExchange(ORDER_EXCHANGE);}@Beanpublic Queue orderCreatedQueue() {return QueueBuilder.durable(ORDER_CREATED_QUEUE).build();}@Beanpublic Queue inventoryCheckQueue() {return QueueBuilder.durable(INVENTORY_CHECK_QUEUE).build();}@Beanpublic Queue paymentProcessQueue() {return QueueBuilder.durable(PAYMENT_PROCESS_QUEUE).build();}@Beanpublic Queue shippingArrangeQueue() {return QueueBuilder.durable(SHIPPING_ARRANGE_QUEUE).build();}// 绑定关系@Beanpublic Binding orderCreatedBinding() {return BindingBuilder.bind(orderCreatedQueue()).to(orderExchange()).with("order.created");}@Beanpublic Binding inventoryCheckBinding() {return BindingBuilder.bind(inventoryCheckQueue()).to(orderExchange()).with("order.inventory.check");}@Beanpublic Binding paymentProcessBinding() {return BindingBuilder.bind(paymentProcessQueue()).to(orderExchange()).with("order.payment.process");}@Beanpublic Binding shippingArrangeBinding() {return BindingBuilder.bind(shippingArrangeQueue()).to(orderExchange()).with("order.shipping.arrange");}
}

订单处理服务:

@Service
@Slf4j
public class OrderProcessingService {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate InventoryService inventoryService;@Autowiredprivate PaymentService paymentService;@Autowiredprivate ShippingService shippingService;// 创建订单public void createOrder(OrderMessage order) {log.info("Creating order: {}", order.getOrderId());// 发送订单创建消息rabbitTemplate.convertAndSend(OrderProcessingConfig.ORDER_EXCHANGE,"order.created",order);}// 处理订单创建@RabbitListener(queues = OrderProcessingConfig.ORDER_CREATED_QUEUE)public void handleOrderCreated(OrderMessage order) {log.info("Processing created order: {}", order.getOrderId());// 触发库存检查rabbitTemplate.convertAndSend(OrderProcessingConfig.ORDER_EXCHANGE,"order.inventory.check",order);}// 处理库存检查@RabbitListener(queues = OrderProcessingConfig.INVENTORY_CHECK_QUEUE)public void handleInventoryCheck(OrderMessage order) {log.info("Checking inventory for order: {}", order.getOrderId());try {boolean inventoryAvailable = inventoryService.checkInventory(order.getItems());if (inventoryAvailable) {order.setStatus("INVENTORY_CONFIRMED");// 触发支付处理rabbitTemplate.convertAndSend(OrderProcessingConfig.ORDER_EXCHANGE,"order.payment.process",order);} else {order.setStatus("INVENTORY_INSUFFICIENT");log.warn("Insufficient inventory for order: {}", order.getOrderId());// 发送库存不足通知}} catch (Exception e) {log.error("Error checking inventory for order: {}", order.getOrderId(), e);order.setStatus("INVENTORY_CHECK_FAILED");}}// 处理支付@RabbitListener(queues = OrderProcessingConfig.PAYMENT_PROCESS_QUEUE)public void handlePaymentProcess(OrderMessage order) {log.info("Processing payment for order: {}", order.getOrderId());try {boolean paymentSuccess = paymentService.processPayment(order.getUserId(), order.getTotalAmount());if (paymentSuccess) {order.setStatus("PAID");// 触发物流安排rabbitTemplate.convertAndSend(OrderProcessingConfig.ORDER_EXCHANGE,"order.shipping.arrange",order);} else {order.setStatus("PAYMENT_FAILED");log.warn("Payment failed for order: {}", order.getOrderId());}} catch (Exception e) {log.error("Error processing payment for order: {}", order.getOrderId(), e);order.setStatus("PAYMENT_ERROR");}}// 处理物流安排@RabbitListener(queues = OrderProcessingConfig.SHIPPING_ARRANGE_QUEUE)public void handleShippingArrange(OrderMessage order) {log.info("Arranging shipping for order: {}", order.getOrderId());try {String trackingNumber = shippingService.arrangeShipping(order);order.setStatus("SHIPPED");log.info("Order {} shipped with tracking number: {}", order.getOrderId(), trackingNumber);} catch (Exception e) {log.error("Error arranging shipping for order: {}", order.getOrderId(), e);order.setStatus("SHIPPING_FAILED");}}
}

异步任务处理

对于耗时的任务,如图片处理、报表生成、邮件发送等,可以使用RabbitMQ实现异步处理。

任务处理配置:

@Configuration
public class TaskProcessingConfig {public static final String TASK_EXCHANGE = "task.exchange";public static final String IMAGE_PROCESS_QUEUE = "task.image.process.queue";public static final String EMAIL_SEND_QUEUE = "task.email.send.queue";public static final String REPORT_GENERATE_QUEUE = "task.report.generate.queue";@Beanpublic DirectExchange taskExchange() {return new DirectExchange(TASK_EXCHANGE);}@Beanpublic Queue imageProcessQueue() {return QueueBuilder.durable(IMAGE_PROCESS_QUEUE).withArgument("x-max-priority", 10) // 支持优先级.build();}@Beanpublic Queue emailSendQueue() {return QueueBuilder.durable(EMAIL_SEND_QUEUE).build();}@Beanpublic Queue reportGenerateQueue() {return QueueBuilder.durable(REPORT_GENERATE_QUEUE).withArgument("x-message-ttl", 300000) // 5分钟TTL.build();}// 绑定关系@Beanpublic Binding imageProcessBinding() {return BindingBuilder.bind(imageProcessQueue()).to(taskExchange()).with("task.image.process");}@Beanpublic Binding emailSendBinding() {return BindingBuilder.bind(emailSendQueue()).to(taskExchange()).with("task.email.send");}@Beanpublic Binding reportGenerateBinding() {return BindingBuilder.bind(reportGenerateQueue()).to(taskExchange()).with("task.report.generate");}
}

任务消息定义:

public class TaskMessage {private String taskId;private String taskType;private Map<String, Object> parameters;private String status;private Date createTime;private Date processTime;// 构造函数、getter、setter...
}public class ImageProcessTask extends TaskMessage {private String imageUrl;private String targetFormat;private Map<String, Object> processingOptions;// 构造函数、getter、setter...
}public class EmailTask extends TaskMessage {private String to;private String subject;private String content;private List<String> attachments;// 构造函数、getter、setter...
}

任务处理服务:

@Service
@Slf4j
public class TaskProcessingService {@Autowiredprivate RabbitTemplate rabbitTemplate;// 提交图片处理任务public void submitImageProcessTask(ImageProcessTask task, int priority) {rabbitTemplate.convertAndSend(TaskProcessingConfig.TASK_EXCHANGE,"task.image.process",task,message -> {message.getMessageProperties().setPriority(priority);return message;});log.info("Submitted image process task: {}", task.getTaskId());}// 处理图片处理任务@RabbitListener(queues = TaskProcessingConfig.IMAGE_PROCESS_QUEUE, concurrency = "2-10") // 动态并发public void handleImageProcessTask(ImageProcessTask task) {log.info("Processing image task: {}", task.getTaskId());try {task.setStatus("PROCESSING");task.setProcessTime(new Date());// 图片处理逻辑processImage(task);task.setStatus("COMPLETED");log.info("Image task completed: {}", task.getTaskId());} catch (Exception e) {task.setStatus("FAILED");log.error("Image task failed: {}", task.getTaskId(), e);}}// 处理邮件发送任务@RabbitListener(queues = TaskProcessingConfig.EMAIL_SEND_QUEUE)public void handleEmailSendTask(EmailTask task) {log.info("Sending email task: {}", task.getTaskId());try {sendEmail(task);task.setStatus("SENT");log.info("Email sent successfully: {}", task.getTaskId());} catch (Exception e) {task.setStatus("FAILED");log.error("Email send failed: {}", task.getTaskId(), e);// 重试逻辑或进入死信队列throw new AmqpRejectAndDontRequeueException("Email send failed", e);}}private void processImage(ImageProcessTask task) {// 实现图片处理逻辑log.info("Processing image: {} -> {}", task.getImageUrl(), task.getTargetFormat());// 模拟处理时间try {Thread.sleep(2000);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}private void sendEmail(EmailTask task) {// 实现邮件发送逻辑log.info("Sending email to: {}, subject: {}", task.getTo(), task.getSubject());// 模拟发送时间try {Thread.sleep(1000);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}
}

微服务间通信

在微服务架构中,RabbitMQ可以作为服务间异步通信的消息总线。

事件驱动架构配置:

@Configuration
public class EventDrivenConfig {public static final String EVENT_EXCHANGE = "event.exchange";public static final String USER_SERVICE_QUEUE = "user.service.queue";public static final String ORDER_SERVICE_QUEUE = "order.service.queue";public static final String NOTIFICATION_SERVICE_QUEUE = "notification.service.queue";@Beanpublic TopicExchange eventExchange() {return new TopicExchange(EVENT_EXCHANGE);}@Beanpublic Queue userServiceQueue() {return QueueBuilder.durable(USER_SERVICE_QUEUE).build();}@Beanpublic Queue orderServiceQueue() {return QueueBuilder.durable(ORDER_SERVICE_QUEUE).build();}@Beanpublic Queue notificationServiceQueue() {return QueueBuilder.durable(NOTIFICATION_SERVICE_QUEUE).build();}// 用户服务监听用户相关事件@Beanpublic Binding userServiceBinding() {return BindingBuilder.bind(userServiceQueue()).to(eventExchange()).with("user.*");}// 订单服务监听订单和用户事件@Beanpublic Binding orderServiceBinding1() {return BindingBuilder.bind(orderServiceQueue()).to(eventExchange()).with("order.*");}@Beanpublic Binding orderServiceBinding2() {return BindingBuilder.bind(orderServiceQueue()).to(eventExchange()).with("user.registered");}// 通知服务监听所有事件@Beanpublic Binding notificationServiceBinding() {return BindingBuilder.bind(notificationServiceQueue()).to(eventExchange()).with("*.*");}
}

事件处理服务:

@Service
@Slf4j
public class EventService {@Autowiredprivate RabbitTemplate rabbitTemplate;// 发布用户注册事件public void publishUserRegisteredEvent(UserRegisteredEvent event) {rabbitTemplate.convertAndSend(EventDrivenConfig.EVENT_EXCHANGE,"user.registered",event);log.info("Published user registered event: {}", event.getUserId());}// 发布订单创建事件public void publishOrderCreatedEvent(OrderCreatedEvent event) {rabbitTemplate.convertAndSend(EventDrivenConfig.EVENT_EXCHANGE,"order.created",event);log.info("Published order created event: {}", event.getOrderId());}// 用户服务处理用户事件@RabbitListener(queues = EventDrivenConfig.USER_SERVICE_QUEUE)public void handleUserEvents(@Payload Object event,@Header("amqp_receivedRoutingKey") String routingKey) {log.info("User service received event: {} with routing key: {}", event.getClass().getSimpleName(), routingKey);switch (routingKey) {case "user.registered":handleUserRegistered((UserRegisteredEvent) event);break;case "user.updated":handleUserUpdated((UserUpdatedEvent) event);break;default:log.warn("Unknown user event: {}", routingKey);}}// 订单服务处理订单和用户事件@RabbitListener(queues = EventDrivenConfig.ORDER_SERVICE_QUEUE)public void handleOrderEvents(@Payload Object event,@Header("amqp_receivedRoutingKey") String routingKey) {log.info("Order service received event: {} with routing key: {}", event.getClass().getSimpleName(), routingKey);switch (routingKey) {case "user.registered":// 为新用户创建优惠券createWelcomeCoupon((UserRegisteredEvent) event);break;case "order.created":handleOrderCreated((OrderCreatedEvent) event);break;case "order.paid":handleOrderPaid((OrderPaidEvent) event);break;default:log.warn("Unknown order event: {}", routingKey);}}// 通知服务处理所有事件@RabbitListener(queues = EventDrivenConfig.NOTIFICATION_SERVICE_QUEUE)public void handleNotificationEvents(@Payload Object event,@Header("amqp_receivedRoutingKey") String routingKey) {log.info("Notification service received event: {} with routing key: {}", event.getClass().getSimpleName(), routingKey);// 根据事件类型发送不同的通知switch (routingKey) {case "user.registered":sendWelcomeNotification((UserRegisteredEvent) event);break;case "order.created":sendOrderConfirmationNotification((OrderCreatedEvent) event);break;case "order.shipped":sendShippingNotification((OrderShippedEvent) event);break;default:log.debug("No notification needed for event: {}", routingKey);}}private void handleUserRegistered(UserRegisteredEvent event) {log.info("Handling user registration: {}", event.getUserId());// 用户服务内部处理逻辑}private void handleUserUpdated(UserUpdatedEvent event) {log.info("Handling user update: {}", event.getUserId());// 用户更新处理逻辑}private void createWelcomeCoupon(UserRegisteredEvent event) {log.info("Creating welcome coupon for user: {}", event.getUserId());// 创建新用户优惠券逻辑}private void handleOrderCreated(OrderCreatedEvent event) {log.info("Handling order creation: {}", event.getOrderId());// 订单创建处理逻辑}private void handleOrderPaid(OrderPaidEvent event) {log.info("Handling order payment: {}", event.getOrderId());// 订单支付处理逻辑}private void sendWelcomeNotification(UserRegisteredEvent event) {log.info("Sending welcome notification to user: {}", event.getUserId());// 发送欢迎通知逻辑}private void sendOrderConfirmationNotification(OrderCreatedEvent event) {log.info("Sending order confirmation notification for order: {}", event.getOrderId());// 发送订单确认通知逻辑}private void sendShippingNotification(OrderShippedEvent event) {log.info("Sending shipping notification for order: {}", event.getOrderId());// 发送发货通知逻辑}
}// 事件类定义
public class UserRegisteredEvent {private Long userId;private String email;private String username;private Date registrationTime;// 构造函数、getter、setter...
}public class UserUpdatedEvent {private Long userId;private Map<String, Object> updatedFields;private Date updateTime;// 构造函数、getter、setter...
}public class OrderCreatedEvent {private Long orderId;private Long userId;private BigDecimal totalAmount;private Date createTime;// 构造函数、getter、setter...
}public class OrderPaidEvent {private Long orderId;private Long userId;private BigDecimal paidAmount;private String paymentMethod;private Date paidTime;// 构造函数、getter、setter...
}public class OrderShippedEvent {private Long orderId;private String trackingNumber;private String shippingCompany;private Date shippedTime;// 构造函数、getter、setter...
}

总结

通过本文的详细介绍,我们深入了解了RabbitMQ在Java应用中的使用。从基础概念到高级特性,从简单的点对点通信到复杂的事件驱动架构,RabbitMQ都能提供强大的支持。

关键要点回顾

  1. 基础概念掌握:理解Exchange、Queue、Binding等核心概念是使用RabbitMQ的基础。

  2. 消息模式选择:根据业务场景选择合适的消息模式,如简单队列用于任务分发,发布订阅用于广播通知。

  3. Spring Boot集成:使用Spring AMQP可以大大简化RabbitMQ的使用,提供了丰富的注解和配置选项。

  4. 可靠性保证:通过消息确认、死信队列、消息持久化等机制确保消息的可靠传递。

  5. 性能优化:合理配置连接池、批量处理、并发控制等参数来优化系统性能。

  6. 监控运维:建立完善的监控和日志体系,及时发现和解决问题。

最佳实践建议

  • 设计原则:遵循单一职责原则,每个队列处理特定类型的消息
  • 错误处理:建立完善的错误处理和重试机制
  • 资源管理:合理管理连接和通道资源,避免资源泄露
  • 安全考虑:使用适当的认证和授权机制保护消息安全
  • 测试策略:编写完善的单元测试和集成测试

RabbitMQ作为一个成熟的消息中间件,在微服务架构、异步处理、系统解耦等场景中发挥着重要作用。掌握其使用方法和最佳实践,将有助于构建更加可靠、可扩展的分布式系统。


本文涵盖了RabbitMQ在Java中的主要使用场景和实践方法。在实际应用中,还需要根据具体的业务需求和系统架构进行适当的调整和优化。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/news/917652.shtml
繁体地址,请注明出处:http://hk.pswp.cn/news/917652.shtml
英文地址,请注明出处:http://en.pswp.cn/news/917652.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【MCAL】AUTOSAR架构下SPI数据异步DMA收发具体实现

目录 前言 正文 1.依赖的硬件特性 1.1.SPI硬件特性 1.1.1. TXFIFO Single Move Mode 1.1.2. RXFIFO Single Move Mode 1.1.3. Move Counter模式 1.1.4. PT中断 1.2.IR硬件特性 1.3.DMA硬件特性 1.3.1. DMA通道硬件请求 1.3.2. DMA循环Buffer 1.3.3. DMA Link List …

【Unity】协程 Async

协程 协程是 Unity 内置的异步机制&#xff0c;通过 yield 暂停执行&#xff0c;实现任务在多帧中分段执行。与普通函数不同&#xff0c;协程可在执行过程中挂起和恢复&#xff0c;呈现"并发"效果&#xff0c;但本质上仍运行于主线程。若在协程中进行耗时操作&#…

《揭秘!10 分钟洞悉 Prompt、Function Calling、MCP 与 AI agent 奥秘》

Prompt、Function Calling、MCP、AI agent这些术语频繁闯入我们的视野&#xff0c;它们到底都是什么、有啥关系。只需十分钟&#xff0c;咱们抽丝剥茧&#xff0c;揭开它们的神秘面纱&#xff0c;轻松掌握这些关键概念 并了解AI agent 完整执行流程。 一、提示词&#xff08;P…

决策树(回归树)全解析:原理、实践与应用

文章目录一、概述1.1 介绍1.2 回归树和分类树区别二、重要参数、属性及接口2.1 criterion&#xff08;不纯度衡量指标&#xff09;2.2 回归树如何工作&#xff08;核心流程拆解&#xff09;三、用回归树拟合正弦曲线&#xff08;实战案例&#xff09;3.1 绘制正弦曲线3.2 为正弦…

【盘古100Pro+开发板实验例程】FPGA学习 | HDMI 回环实验

本原创文章由深圳市小眼睛科技有限公司创作&#xff0c;版权归本公司所有&#xff0c;如需转载&#xff0c;需授权并注明出处&#xff08;www.meyesemi.com) 1. 实验简介 实验目的&#xff1a; 完成 HDMI 回环实验 实验环境&#xff1a; Window11 PDS2022.2-SP6.4 硬件环境…

鸿蒙系统PC安装指南

鸿蒙系统PC安装指南一、安装DevEco Studio集成开发环境二、下载鸿蒙系统PC三、启动鸿蒙系统及使用一、安装DevEco Studio集成开发环境首先访问华为官网上&#xff0c;注册并登录华为账号&#xff0c;以开始下载所需的软件。若尚未注册&#xff0c;请先注册一个。在官网页面中&a…

三十九、【扩展工具篇】Allpairspy 组合用例生成器:智能设计高效测试集

三十九、【扩展工具篇】Allpairspy 组合用例生成器:智能设计高效测试集 前言 准备工作 第一部分:后端实现 - `allpairspy` API 1. 创建 `allpairspy` 服务 2. 创建 `allpairspy` API 视图 3. 注册 API 路由 第二部分:前端实现 - `Allpairspy` 工具界面 1. 创建 API 服务 (`s…

ZooKeeper 深度实践:从原理到 Spring Boot 全栈落地

在 Kubernetes 为主流注册发现的今天&#xff0c;给出如何在 Spring Boot 中基于 ZooKeeper 实现服务注册/发现、分布式锁、配置中心以及集群协调的完整代码与最佳实践。所有示例均可直接复制运行。 1. ZooKeeper 架构与核心原理 1.1 角色 Leader&#xff1a;处理写请求&…

可验证随机函数-VRF

可验证随机函数&#xff08;Verifiable Random Function, VRF&#xff09;是一种结合密码学技术的伪随机数生成器&#xff0c;其核心特点是生成的随机数可被公开验证&#xff0c;且具有不可预测性和唯一性。以下是VRF的详细解析&#xff1a;1. 基本定义与核心特性 可验证性&…

极客大挑战2020(部分wp)

Roamphp1-Welcome 405请求方法不允许&#xff0c;改一下请求方法 数组绕过&#xff0c;在页面搜索flag即可&#xff01;本题&#xff1a;就是知道了405是请求方法不允许&#xff01; Roamphp2-Myblog&#xff08;zip协议加文件包含&#xff09; 首先进来就是一个博客页面&…

ESP32 外设驱动开发指南 (ESP-IDF框架)——GPIO篇:基础配置、外部中断与PWM(LEDC模块)应用

目录 一、前言 二、GPIO 2.1 GPIO简介 2.2 GPIO函数解析 2.3 LED驱动 2.4 KEY驱动 三、EXIT 3.1 EXIT简介 3.2 EXIT函数解析 3.3 EXIT驱动 四、LEDC 4.1 PWM原理解析 4.2 ESP32的LED PWM控制器介绍 4.3 LEDC函数解析 4.3.1 SW_PWM 4.3.2 HW_PWM 4.4 LEDC驱动 …

鸿蒙 ArkWeb 加载优化方案详解(2025 最佳实践)

适用平台&#xff1a;HarmonyOS NEXT / API 10 关键词&#xff1a;ArkWeb、WebviewController、NodeController、预加载、预连接、预渲染、性能优化一、前言&#xff1a;为什么必须优化 ArkWeb 加载&#xff1f;在鸿蒙生态中&#xff0c;ArkWeb 是系统级的 Web 容器引擎&#x…

JavaScript案例(乘法答题游戏)

项目概述 使用原生JavaScript实现一个乘法答题游戏&#xff0c;随机生成乘法题目&#xff0c;判断答案正误并记录分数&#xff0c;通过localStorage实现分数持久化存储。 核心功能需求 随机题目生成&#xff1a;动态生成1-10之间的乘法题答题交互&#xff1a;输入答案并提交…

EXCEL删除数据透视表

wps版 点击红框内任意区域 在顶部工具栏选择删除Excel 版 1.点击红框内任意区域2. 点击Enable Selection,再按住键盘上的Delete键&#xff0c;记住不是Backspace键

Python 飞机大战:从零开发经典 2D 射击游戏

引言&#xff1a;重温经典游戏开发 飞机大战作为经典的 2D 射击游戏&#xff0c;承载了许多人的童年回忆。使用 Python 和 Pygame 开发这样一款游戏不仅能重温经典&#xff0c;更是学习游戏开发绝佳的实践项目。本文将带你从零开始&#xff0c;一步步实现一个完整的飞机大战游…

Vue项目中实现浏览器串口通信:Web Serial API完整指南

前言 在现代Web开发中&#xff0c;随着IoT设备和硬件交互需求的增长&#xff0c;浏览器与串口设备的通信变得越来越重要。本文将详细介绍如何在Vue项目中使用Web Serial API实现串口通信功能&#xff0c;为开发者提供一个完整的解决方案。 技术背景 传统方案的局限性 传统的串口…

Github怎么只下载某个目录文件?(Git稀疏检出、GitZip for Github插件、在线工具DownGit)Github下载目录

文章目录**方法一&#xff1a;使用 Git 的稀疏检出&#xff08;Sparse Checkout&#xff09;**&#xff08;略&#xff09;**步骤&#xff1a;****方法二&#xff1a;使用 SVN 下载特定目录**&#xff08;略&#xff09;**步骤&#xff1a;****方法三&#xff1a;使用浏览器插件…

把“多视图融合、深度传感”组合在一起,今天分享3篇3D传感技术干货

关注gongzhonghao【计算机sci论文精选】3D传感技术起源于工业领域高精度测量需求&#xff0c;早期以激光三角测量、结构光等技术为主&#xff0c;主要服务于制造业的零部件检测与形变分析。随着消费电子智能化升级&#xff0c;苹果iPhone X的Face ID将结构光技术推向大众市场&a…

dubbo源码之消费端启动的高性能优化方案

一、序言 dubbo作为一款最流行的服务治理框架之一,在底层做了很多的优化,比如消费端在启动的时候做了很多性能提升的设计,接下来从连接的层面、序列化功能的层面进行介绍下。 二、优化点 1、消费端在服务启动的时候会调用DubboProtocol类的protocolBindingRefer方法来创建…

zookeeper常见命令和常见应用

前言 ZooKeeper自带一个交互式命令行工具&#xff08;通过zkCli.sh或zkCli.cmd启动&#xff09;&#xff0c;提供了一系列操作ZooKeeper数据节点的命令 下面我们对zookeeper常用命令进行介绍 使用prettyZoo命令行窗口 使用prettyZoo客户端链接zookeeper 打开zookeeper命令…