目录

1.添加包

2. 连接配置

        2.1.连接字符串

   2.2.连接对象

3.创建连接服务

        3.1.添加配置获取方法

        3.2.服务实现类

        3.3.服务接口

4.创建生产者服务

        4.1.生产者实现类

         4.2.生产者接口

5.创建消费者服务

        5.1.消费者服务接口

5.2.消费者接口

6.注册

7.简单使用案例

        7.1.实现

        7.2.接口

        7.3.控制器


        在 .NET Core 应用程序中使用 RabbitMQ 有许多好处,主要体现在其作为消息队列系统所带来的灵活性、可靠性和扩展性等方面,还能促进微服务架构的实施,是构建现代分布式应用的理想选择之一

1.添加包

        添加 RabbitMQ.Client 包。

2. 连接配置

        2.1.连接字符串

        dbsettings.json文件添加 RabbitMQ 连接配置

//RabbitMQ配置
"RabbitMQSettings": {"HostName": "ip地址",             //地址"Port": "端口",                   //端口"UserName": "RabbitMQ用户名",     //用户名"Password": "RabbitMQ密码",       //密码"VirtualHost": "/",              //本地虚拟地址"RetryCount": 5,                 //最大重试次数"RetryInterval": 5,              //断开重连次数"PrefetchCount": 5,              //预取消息数量"ConsumerCount": 5               //消费者数量
}

   2.2.连接对象

namespace Frame3_DataRepository.RabbitMQRepository.BaseMQ
{/// <summary>/// 消息队列配置类/// </summary>public class RabbitMQSettings{/// <summary>/// RabbitMQ 服务器地址/// </summary>public string HostName { get; set; }/// <summary>/// 端口号,默认5672/// </summary>public int Port { get; set; } = 5672;/// <summary>/// 用户名/// </summary>public string UserName { get; set; }/// <summary>/// 密码/// </summary>public string Password { get; set; }/// <summary>/// 虚拟主机,默认为//// </summary>public string VirtualHost { get; set; } = "/";/// <summary>/// 连接重试次数/// </summary>public int RetryCount { get; set; } = 5;/// <summary>/// 重试间隔(秒)/// </summary>public int RetryInterval { get; set; } = 5;/// <summary>/// 预取消息数量/// </summary>public ushort PrefetchCount { get; set; }/// <summary>/// 消费者数量/// </summary>public int ConsumerCount { get; set; }}/// <summary>/// 持久化/// </summary>public enum DeliveryMode : byte{NonPersistent = 1,Persistent = 2}/// <summary>/// 消费者状态信息/// </summary>public class ConsumerStatus{/// <summary>/// 当前活跃消费者数量/// </summary>public int CurrentCount { get; set; }/// <summary>/// 最大允许消费者数量/// </summary>public int MaxCount { get; set; }/// <summary>/// 活跃消费者标签列表/// </summary>public List<string> ActiveConsumers { get; set; } = new();}
}

        案例如下

3.创建连接服务

        先创建配置获取方法,再创建 RabbitMqClient 服务实现类和 IRabbitMqClient 服务接口来MQ连接。

        3.1.添加配置获取方法

using Microsoft.Extensions.Configuration;namespace Frame4_LibraryCore.BaseConfig
{/// <summary>/// 全局配置/// </summary>public static class Config{/// <summary>/// 从指定的 JSON 配置文件中读取配置,并反序列化为指定类型/// </summary>/// <typeparam name="T">目标配置类型(如 RedisSettings、DatabaseSettings 等)</typeparam>/// <param name="fileName">JSON 配置文件名(如 "appsettings.json")</param>/// <param name="sessions">配置节点名称(如 "RedisSettings")</param>/// <returns>返回绑定后的强类型配置对象</returns>public static T GetSetting<T>(string fileName, string sessions){//创建 ConfigurationBuilder 实例,用于构建配置var builder = new ConfigurationBuilder()//设置配置文件的基础路径为当前程序运行目录.SetBasePath(Directory.GetCurrentDirectory())//添加 JSON 文件作为配置源://- fileName: 指定要加载的 JSON 文件//- optional: false 表示文件必须存在,否则抛出异常//- reloadOnChange: true 表示文件修改时自动重新加载.AddJsonFile(fileName, optional: false, reloadOnChange: true);//构建配置对象(IConfigurationRoot)IConfigurationRoot config = builder.Build();//获取指定配置节点(sessions),并将其反序列化为类型 Tvar conn = config.GetSection(sessions).Get<T>();//返回反序列化后的配置对象return conn;}}
}

         案例如下

 

        3.2.服务实现类

using Frame3_DataRepository.RabbitMQRepository.BaseMQ;
using Frame4_LibraryCore.BaseConfig;
using Frame6_LibraryUtility;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;namespace Frame3_DataRepository.RabbitMQRepository
{/// <summary>/// 队列服务实现类/// 提供消息队列连接和通道管理功能/// </summary>public sealed class RabbitMqClient : BaseServiceSingleton, IRabbitMqClient{/// <summary>/// RabbitMQ 连接工厂实例/// </summary>private readonly IConnectionFactory _connectionFactory;/// <summary>/// 日志记录器/// </summary>private readonly ILogger<RabbitMqClient> _logger;/// <summary>/// 连接重试最大次数/// </summary>private readonly int _retryCount;/// <summary>/// 重试间隔时间(秒)/// </summary>private readonly int _retryInterval;/// <summary>/// RabbitMQ 连接对象/// </summary>private IConnection _connection;/// <summary>/// 标识对象是否已被释放/// </summary>private bool _disposed;/// <summary>/// 连接操作的线程锁/// </summary>private readonly SemaphoreSlim _connectionLock = new(1, 1);/// <summary>/// 心跳检测定时器,用于定期检查连接状态/// </summary>private Timer _heartbeatTimer;/// <summary>/// 心跳检测间隔(秒),默认30秒/// </summary>private const int HeartbeatInterval = 30;/// <summary>/// 预取消息数量/// </summary>private readonly ushort _prefetchCount;/// <summary>/// 最大允许的消费者数量/// </summary>private readonly int _maxConsumerCount;/// <summary>/// 构造函数,初始化RabbitMQ服务/// </summary>/// <param name="logger">日志记录器,从DI容器注入</param>/// <exception cref="ArgumentNullException">当必需参数为null时抛出</exception>public RabbitMqClient(ILogger<RabbitMqClient> logger){//参数校验,确保依赖注入的参数不为null_logger = logger ?? throw new ArgumentNullException(nameof(logger));//读取配置//var settingsValue = settings?.Value ?? throw new ArgumentNullException(nameof(settings));var settingsValue = Config.GetSetting<RabbitMQSettings>("dbsettings.json", "RabbitMQSettings");//从配置中初始化重试参数_retryCount = settingsValue.RetryCount;_retryInterval = settingsValue.RetryInterval;//配置连接工厂参数_connectionFactory = new ConnectionFactory{HostName = settingsValue.HostName,                  // 主机地址Port = settingsValue.Port,                          // 端口号UserName = settingsValue.UserName,                  // 用户名Password = settingsValue.Password,                  // 密码VirtualHost = settingsValue.VirtualHost,            // 虚拟主机AutomaticRecoveryEnabled = true,                    // 启用自动恢复NetworkRecoveryInterval = TimeSpan.FromSeconds(10), // 网络恢复间隔10秒RequestedHeartbeat = TimeSpan.FromSeconds(60)        // 设置心跳间隔60秒};// 初始化心跳检测定时器_heartbeatTimer = new Timer(HeartbeatCheck, null, Timeout.Infinite, Timeout.Infinite);//初始化预取消息数量_prefetchCount = settingsValue.PrefetchCount;//初始化消费者数量_maxConsumerCount = settingsValue.ConsumerCount;}/// <summary>/// 检查当前是否已建立有效连接/// </summary>public bool IsConnected => _connection?.IsOpen == true && !_disposed;/// <summary>/// 预取消息数量/// </summary>public ushort prefetchCount => _prefetchCount;/// <summary>/// 消费者数量/// </summary>public int ConsumerCount => _maxConsumerCount;/// <summary>/// 尝试建立RabbitMQ连接/// </summary>/// <returns>是否连接成功</returns>public async Task<bool> TryConnectAsync(){// 已连接则直接返回成功if (IsConnected) return true;// 加锁防止多线程同时创建连接await _connectionLock.WaitAsync();try{// 双重检查锁定模式if (IsConnected) return true;//记录建立连接_logger.LogInformation("正在建立RabbitMQ连接...");// 带重试机制的连接逻辑for (int i = 0; i < _retryCount; i++){try{//创建新连接_connection = await _connectionFactory.CreateConnectionAsync().ConfigureAwait(false);//订阅连接关闭事件_connection.ConnectionShutdownAsync += OnConnectionShutdown;//验证连接是否成功建立if (IsConnected){//记录连接成功_logger.LogInformation("RabbitMQ连接已成功建立");//连接成功后启动心跳检测 不用心跳检测可注释//StartHeartbeatCheck();return true;}}catch (BrokerUnreachableException ex){//专门处理Broker不可达异常_logger.LogWarning(ex, $"RabbitMQ服务不可达,第{i}次重试...");}catch (Exception ex){//处理其他类型的异常_logger.LogWarning(ex, $"RabbitMQ连接异常,第{i}次重试...");}//如果未达到最大重试次数,等待间隔时间后重试if (i < _retryCount){//等待间隔时间后重试await Task.Delay(_retryInterval * 1000).ConfigureAwait(false);}}//记录连接到最大重试数_logger.LogError($"RabbitMQ连接失败,已达到最大重试次数{_retryCount}");return false;}finally{//确保锁被释放_connectionLock.Release();}}/// <summary>/// 创建通道/// </summary>/// <returns>创建的通道对象</returns>/// <exception cref="InvalidOperationException">当连接不可用时抛出</exception>public async Task<IChannel> CreateChannelAsync(){//确保连接可用if (!IsConnected && !await TryConnectAsync().ConfigureAwait(false)){throw new InvalidOperationException("没有可用的RabbitMQ连接");}try{return await _connection.CreateChannelAsync().ConfigureAwait(false);}catch (OperationInterruptedException ex){//处理RabbitMQ操作中断异常_logger.LogError(ex, "创建RabbitMQ通道时操作被中断");throw;}catch (Exception ex){//处理其他创建通道时的异常_logger.LogError(ex, "创建RabbitMQ通道失败");throw;}}/// <summary>/// 连接关闭事件处理程序,自动尝试重新连接/// </summary>private async Task OnConnectionShutdown(object sender, ShutdownEventArgs args){//记录连接关闭事件,包括关闭原因_logger.LogWarning($"RabbitMQ连接已关闭,原因: {args}");//如果服务未被释放,尝试自动重新连接if (!_disposed){try{//异步尝试重新连接,不阻塞当前线程await TryConnectAsync().ConfigureAwait(false);}catch (Exception ex){//记录重连失败异常_logger.LogError(ex, "重连失败");}}}/// <summary>/// 心跳检测回调方法,定期检查连接状态/// </summary>private async void HeartbeatCheck(object state){try{// 如果连接不存在或已关闭,尝试重新连接if (!IsConnected){//记录检测断开重新连接_logger.LogWarning("心跳检测发现连接断开,尝试重新连接...");//建立连接await TryConnectAsync().ConfigureAwait(false);}}catch (Exception ex){// 记录心跳检测过程中的异常_logger.LogError(ex, "心跳检测异常");}}/// <summary>/// 启动心跳检测定时器/// </summary>private void StartHeartbeatCheck(){// 设置定时器,定期执行心跳检测_heartbeatTimer.Change(TimeSpan.FromSeconds(HeartbeatInterval),TimeSpan.FromSeconds(HeartbeatInterval));}/// <summary>/// 停止心跳检测定时器/// </summary>private void StopHeartbeatCheck(){// 禁用定时器_heartbeatTimer.Change(Timeout.Infinite, Timeout.Infinite);}/// <summary>/// 释放资源/// </summary>public void Dispose(){//释放Dispose(true);//优化垃圾回收GC.SuppressFinalize(this);}/// <summary>/// 受保护的释放方法/// </summary>/// <param name="disposing">是否主动释放</param>private void Dispose(bool disposing){//如果已经释放,直接返回if (_disposed) return;//如果是主动释放,处理托管资源if (disposing){try{//停止心跳检测StopHeartbeatCheck();//释放定时器资源_heartbeatTimer?.Dispose();//取消事件订阅if (_connection != null){_connection.ConnectionShutdownAsync -= OnConnectionShutdown;}//释放连接资源_connection?.Dispose();//释放线程锁资源_connectionLock.Dispose();//记录连接关闭日志_logger.LogInformation("RabbitMQ连接已关闭");}catch (Exception ex){//记录资源释放过程中的异常_logger.LogError(ex, "关闭RabbitMQ连接时出错");}finally{//标记为已释放状态_disposed = true;}}}}
}

        案例如下

        3.3.服务接口

using RabbitMQ.Client;
using RabbitMQ.Client.Exceptions;namespace Frame3_DataRepository.RabbitMQRepository
{/// <summary>/// 队列服务接口/// 供生产者和消费者调用/// </summary>public interface IRabbitMqClient //: IDisposable{/// <summary>/// 获取当前连接状态/// true表示已建立有效连接,false表示连接不可用/// </summary>bool IsConnected { get; }/// <summary>/// 预取消息数量/// </summary>ushort prefetchCount { get; }/// <summary>/// 消费者数量/// </summary>int ConsumerCount { get; }/// <summary>/// 尝试建立RabbitMQ连接/// </summary>/// <returns>是否连接成功</returns>Task<bool> TryConnectAsync();/// <summary>/// 异步创建RabbitMQ通道/// 用于消息发布、消费等操作/// </summary>/// <returns>RabbitMQ通道实例</returns>/// <exception cref="InvalidOperationException">当无法创建连接时抛出</exception>/// <exception cref="OperationInterruptedException">当RabbitMQ操作被中断时抛出</exception>Task<IChannel> CreateChannelAsync();void Dispose();}
}

         案例如下

4.创建生产者服务

        创建生产者服务实现类 MQProducerService 和接口 IMQProducerService

        4.1.生产者实现类

using Frame6_LibraryUtility;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
using System.Text;
using System.Text.Json;namespace Frame3_DataRepository.RabbitMQRepository.Producer
{/// <summary>/// 生产者服务实现类/// RabbitMQ/// </summary>public sealed class MQProducerService : BaseService, IMQProducerService{/// <summary>/// 基础连接服务/// </summary>private readonly IRabbitMqClient _iRabbitMQService;/// <summary>/// 日志记录器/// </summary>private readonly ILogger<MQProducerService> _logger;/// <summary>/// 构造函数,注入依赖/// </summary>/// <param name="iRabbitMQService"></param>/// <param name="logger"></param>public MQProducerService(IRabbitMqClient iRabbitMQService, ILogger<MQProducerService> logger){//参数校验,确保依赖注入的参数不为null_iRabbitMQService = iRabbitMQService ?? throw new ArgumentNullException(nameof(iRabbitMQService));_logger = logger ?? throw new ArgumentNullException(nameof(logger));}/// <summary>/// 发布消息-点对点模式(Point-to-Point)/// </summary>/// <typeparam name="T">消息类型,必须是class类型</typeparam>/// <param name="queueName">目标队列名称</param>/// <param name="message">要发布的消息对象</param>/// <param name="messageId">可选的消息ID,未提供时自动生成</param>/// <param name="exchange">可选交换机名称,默认使用直接交换机</param>/// <param name="headers">可选的消息头字典</param>/// <param name="withDLX">是否启用死信队列</param>/// <param name="maxRetryCount">最大重试次数(仅当启用死信队列时有效)</param>/// <returns>异步任务</returns>/// <exception cref="ArgumentNullException">当必要参数为空时抛出</exception>/// <exception cref="InvalidOperationException">当消息序列化失败时抛出</exception>public async Task PublishByPTPAsync<T>(string queueName, T message, string messageId = null, string exchange = null, IDictionary<string, object> headers = null, bool withDLX = true, int maxRetryCount = 3) where T : class{//参数校验if (string.IsNullOrWhiteSpace(queueName))throw new ArgumentNullException(nameof(queueName), "队列名称不能为空");if (message == null)throw new ArgumentNullException(nameof(message), "消息内容不能为空");//生成消息ID(如果未提供则使用Guid)messageId = messageId.IsEmpty() ? messageId = Guid.NewGuid().ToString() : messageId;//声明变量用于存储序列化后的消息,便于错误处理var jsonMessage = string.Empty;//声明RabbitMQ通道IChannel? channel = null;try{//创建RabbitMQ通道channel = await _iRabbitMQService.CreateChannelAsync();// 死信队列配置var arguments = new Dictionary<string, object>();if (withDLX){// 死信交换机配置var dlxExchange = $"{queueName}.DLX";var dlxQueue = $"{queueName}.DLQ";// 声明死信交换机和队列await channel.ExchangeDeclareAsync(dlxExchange, ExchangeType.Direct, durable: true);await channel.QueueDeclareAsync(queue: dlxQueue,durable: true,exclusive: false,autoDelete: false,arguments: null);await channel.QueueBindAsync(dlxQueue, dlxExchange, dlxQueue);// 设置死信队列参数arguments.Add("x-dead-letter-exchange", dlxExchange);arguments.Add("x-dead-letter-routing-key", dlxQueue);arguments.Add("x-max-retry-count", maxRetryCount); // 自定义属性,记录最大重试次数arguments.Add("x-max-length", 100000);arguments.Add("x-queue-mode", "lazy");}// 添加消费者数量限制arguments["x-max-consumers"] = _iRabbitMQService.ConsumerCount;//声明队列(确保队列存在)var queueDeclareOk = await channel.QueueDeclareAsync(queue: queueName,       //队列名称durable: true,          //队列持久化(服务器重启后仍然存在)exclusive: false,       //非独占队列autoDelete: false,      //不自动删除arguments: arguments//new Dictionary<string, object>//{//    // 只允许一个活跃消费者//    //["x-single-active-consumer"] = true,//    ["x-max-consumers"] = _iRabbitMQService.ConsumerCount,//});//序列化消息为JSON格式jsonMessage = JsonSerializer.Serialize(message);//将消息内容转换为UTF-8字节数组var body = Encoding.UTF8.GetBytes(jsonMessage);//创建消息属性var properties = new BasicProperties{Persistent = true,                                                          //消息持久化(需要队列也是持久化的才有效)MessageId = messageId,                                                      //设置唯一消息ID用于追踪ContentType = "application/json",                                           //明确指定内容类型为JSONTimestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds())    //添加时间戳};//设置消息头(如果提供)if (headers != null && headers.Any()){properties.Headers = new Dictionary<string, object>(headers);}// 发布消息到队列await channel.BasicPublishAsync(exchange: exchange ?? string.Empty,         //交换机名称(空字符串表示默认direct交换机)routingKey: queueName,                      //路由键(对于默认交换机就是队列名)mandatory: false,                           //不强制要求消息被路由到队列basicProperties: properties,                //消息属性body: body                                  //消息体);//记录成功日志(结构化日志)_logger.LogInformation("消息发布成功。\r\n交换机: {Exchange}\r\n消息ID: {MessageId}\r\n队列: {queueName}\r\n消息: {jsonMessage}", exchange ?? "(默认)", messageId, queueName, jsonMessage);}catch (JsonException jsonEx){// 专门处理JSON序列化错误_logger.LogError(jsonEx, "消息序列化失败。\r\n类型: {MessageType}", typeof(T).Name);throw new InvalidOperationException("消息序列化失败", jsonEx);}catch (OperationInterruptedException opEx){// 处理RabbitMQ操作中断异常_logger.LogError(opEx, "RabbitMQ操作中断。\r\n消息: {jsonMessage}\r\n队列: {queueName}", jsonMessage, queueName);throw;}catch (Exception ex){// 处理其他所有异常_logger.LogError(ex, "消息发布失败。\r\n交换机: {Exchange}\r\n队列: {queueName}\r\n消息: {jsonMessage}", exchange ?? "(默认)", queueName, jsonMessage);throw;}finally{// 确保通道资源被释放await channel?.CloseAsync();}}/// <summary>/// 发布消息-发布/订阅模式(Pub/Sub)/// </summary>/// <typeparam name="T">消息类型,必须是 class 类型</typeparam>/// <param name="exchangeName">目标 Exchange 名称</param>/// <param name="message">要发布的消息对象</param>/// <param name="messageId">可选的消息唯一标识符,默认自动生成</param>/// <param name="headers">可选的消息头字典</param>/// <returns>异步任务</returns>public async Task PublishByPubSubAsync<T>(string exchangeName, T message, string messageId = null, IDictionary<string, object> headers = null) where T : class{//校验 Exchange 名称是否为空或空白字符串if (string.IsNullOrWhiteSpace(exchangeName))throw new ArgumentNullException(nameof(exchangeName), "Exchange名称不能为空");//校验消息内容是否为 nullif (message == null)throw new ArgumentNullException(nameof(message), "消息内容不能为空");//如果未提供消息ID,则使用 Guid 生成唯一的 IDmessageId = messageId.IsEmpty() ? Guid.NewGuid().ToString() : messageId;//用于存储序列化后的 JSON 消息(便于日志和异常处理)var jsonMessage = string.Empty;//声明 RabbitMQ 通道变量,初始为 nullIChannel? channel = null;try{//创建 RabbitMQ 通道channel = await _iRabbitMQService.CreateChannelAsync();//声明 Fanout 类型的 Exchange(广播模式)await channel.ExchangeDeclareAsync(exchange: exchangeName,type: ExchangeType.Fanout,   //扇出类型,广播给所有绑定队列durable: true,               //可持久化autoDelete: false);          //不自动删除//将消息对象序列化为 JSON 字符串jsonMessage = JsonSerializer.Serialize(message);//将 JSON 消息转换为 UTF-8 编码的字节数组var body = Encoding.UTF8.GetBytes(jsonMessage);//创建消息属性var properties = new BasicProperties{Persistent = true,                                                //消息持久化//DeliveryMode = (DeliveryModes)DeliveryMode.Persistent,MessageId = messageId,                                            //设置唯一消息IDContentType = "application/json",                                 //内容类型为 JSONTimestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds()) //添加时间戳};//如果提供了 Headers,则设置到消息属性中if (headers != null && headers.Any()){properties.Headers = new Dictionary<string, object>(headers);}//向 Exchange 发送消息(不指定 Routing Key,Fanout 类型忽略此参数)await channel.BasicPublishAsync(exchange: exchangeName,      //目标 Exchange 名称routingKey: string.Empty,    //Fanout 类型不需要路由键mandatory: false,            //不要求消息必须被投递basicProperties: properties, //消息属性body: body);                 //消息体字节数据//记录消息发布成功的日志信息_logger.LogInformation("消息已发布到Exchange。\r\nExchange: {Exchange}\r\n消息ID: {MessageId}\r\n消息: {jsonMessage}", exchangeName, messageId, jsonMessage);}catch (JsonException jsonEx){//捕获 JSON 序列化异常并记录错误日志_logger.LogError(jsonEx, "消息序列化失败。\r\n类型: {MessageType}", typeof(T).Name);//抛出自定义异常,包含原始异常信息throw new InvalidOperationException("消息序列化失败", jsonEx);}catch (Exception ex){//捕获其他所有异常并记录错误日志_logger.LogError(ex, "消息发布到Exchange失败。\r\nExchange: {Exchange}\r\n消息: {jsonMessage}", exchangeName, jsonMessage);//抛出异常throw;}finally{// 确保通道资源被释放await channel?.CloseAsync();}}/// <summary>/// 发布消息-路由模式(Routing)/// </summary>/// <typeparam name="T">消息体的类型,必须为引用类型</typeparam>/// <param name="exchangeName">要发布的交换机名称。不能为空或空白字符串。</param>/// <param name="routingKey">消息的路由键,用于匹配绑定队列。不能为空或空白字符串。</param>/// <param name="message">要发送的消息对象,将被序列化为 JSON 格式。</param>/// <param name="messageId">消息的唯一标识符。如果未提供,则自动生成 Guid 字符串。</param>/// <param name="headers">可选的消息头部信息,用于携带额外元数据。</param>/// <returns>异步任务</returns>public async Task PublishByRoutingAsync<T>(string exchangeName, string routingKey, T message, string messageId = null, IDictionary<string, object> headers = null) where T : class{//检查 Exchange 名称是否为空或空白字符if (string.IsNullOrWhiteSpace(exchangeName))throw new ArgumentNullException(nameof(exchangeName), "Exchange名称不能为空");//检查路由键是否为空或空白字符if (string.IsNullOrWhiteSpace(routingKey))throw new ArgumentNullException(nameof(routingKey), "路由键不能为空");//检查消息对象是否为 nullif (message == null)throw new ArgumentNullException(nameof(message), "消息内容不能为空");//如果没有提供 MessageId,则生成一个 Guid 字符串作为唯一标识messageId = messageId.IsEmpty() ? Guid.NewGuid().ToString() : messageId;//用于记录日志的消息 JSON 字符串var jsonMessage = string.Empty;//声明一个 IChannel 对象,初始为 nullIChannel? channel = null;try{//创建 RabbitMQ Channel(通道)channel = await _iRabbitMQService.CreateChannelAsync();//声明一个 Direct 类型的 Exchange(如果不存在则创建)await channel.ExchangeDeclareAsync(exchange: exchangeName,     //指定 Exchange 的名称type: ExchangeType.Direct,  //指定 Exchange 的类型为 Direct(直连模式)durable: true,              //设置为持久化 Exchange,RabbitMQ 重启后不会丢失autoDelete: false           //不自动删除 Exchange,即使最后一个队列被解绑也不会自动删除);//将消息对象序列化为 JSON 字符串jsonMessage = JsonSerializer.Serialize(message);//将 JSON 字符串转换为 UTF-8 编码的字节数组var body = Encoding.UTF8.GetBytes(jsonMessage);//创建并初始化 BasicProperties(消息属性)var properties = new BasicProperties{Persistent = true, //设置消息持久化MessageId = messageId, //设置消息 IDContentType = "application/json", //内容类型为 JSONTimestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds()) //时间戳};//如果提供了自定义 Headers,则复制到消息属性中if (headers != null && headers.Any()){properties.Headers = new Dictionary<string, object>(headers);}//发布消息到指定 Exchange 和路由键await channel.BasicPublishAsync(exchange: exchangeName,      //指定消息要发送到的 Exchange 名称routingKey: routingKey,      //指定消息的路由键,用于 Exchange 路由决策mandatory: false,            //如果为 true,当消息无法路由到任何队列时会返回给生产者;false 则直接丢弃basicProperties: properties, //消息的属性,如持久化、内容类型、消息 ID、时间戳等body: body                   //消息的实际内容(字节数组),通常是序列化后的 JSON 数据);//记录日志:消息发送成功_logger.LogInformation("消息已通过路由键发送。\r\nExchange: {exchangeName}\r\n路由键: {routingKey}\r\n消息ID: {MessageId}\r\n消息: {jsonMessage}",exchangeName, routingKey, messageId, jsonMessage);}catch (Exception ex){//捕获异常并记录日志_logger.LogError(ex, "消息发送失败。\r\nExchange: {exchangeName}\r\n路由键: {routingKey}\r\n消息: {jsonMessage}",exchangeName, routingKey, jsonMessage);//抛出异常以便上层处理throw;}finally{// 确保通道资源被释放await channel?.CloseAsync();}}/// <summary>/// 发布消息-主题模式(Topic)/// </summary>/// <typeparam name="T"></typeparam>/// <param name="exchangeName">要发布的交换机名称。不能为空或空白字符串。</param>/// <param name="routingKey">消息的路由键,用于匹配 Topic 类型 Exchange 的绑定规则。不能为空或空白字符串。</param>/// <param name="message">要发送的消息对象,将被序列化为 JSON 格式。</param>/// <param name="messageId">消息的唯一标识符。如果未提供,则自动生成 Guid 字符串。</param>/// <param name="headers">可选的消息头部信息,用于携带额外元数据。</param>/// <returns>异步任务</returns>public async Task PublishByTopicAsync<T>(string exchangeName, string routingKey, T message, string messageId = null, IDictionary<string, object> headers = null) where T : class{//检查 Exchange 名称是否为空或空白字符if (string.IsNullOrWhiteSpace(exchangeName))throw new ArgumentNullException(nameof(exchangeName), "Exchange名称不能为空");//检查路由键是否为空或空白字符if (string.IsNullOrWhiteSpace(routingKey))throw new ArgumentNullException(nameof(routingKey), "路由键不能为空");//检查消息对象是否为 nullif (message == null)throw new ArgumentNullException(nameof(message), "消息内容不能为空");//如果没有提供 MessageId,则生成一个 Guid 字符串作为唯一标识messageId = messageId.IsEmpty() ? Guid.NewGuid().ToString() : messageId;//用于记录日志的消息 JSON 字符串var jsonMessage = string.Empty;//声明一个 IChannel 对象,初始为 nullIChannel? channel = null;try{//创建 RabbitMQ Channel(通道)channel = await _iRabbitMQService.CreateChannelAsync();// 删除已存在的 Exchange(如果不需要保留消息)//await channel.ExchangeDeleteAsync("TopicTest");//声明一个 Topic 类型的 Exchange(如果不存在则创建)await channel.ExchangeDeclareAsync(exchange: exchangeName,     //指定要声明的 Exchange 名称,名称由外部传入的 exchangeName 变量指定type: ExchangeType.Topic,   //设置 Exchange 的类型为 Topic(主题模式),支持通配符路由键匹配durable: true,              //设置 Exchange 为持久化,即使 RabbitMQ 重启也不会丢失autoDelete: false           //设置 Exchange 不自动删除,即使最后一个绑定被移除后仍保留);//将消息对象序列化为 JSON 字符串jsonMessage = JsonSerializer.Serialize(message);//将 JSON 字符串转换为 UTF-8 编码的字节数组var body = Encoding.UTF8.GetBytes(jsonMessage);//创建并初始化 BasicProperties(消息属性)var properties = new BasicProperties{Persistent = true, //设置消息持久化MessageId = messageId, //设置消息 IDContentType = "application/json", //内容类型为 JSONTimestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds()) //时间戳};//如果提供了自定义 Headers,则复制到消息属性中if (headers != null && headers.Any()){properties.Headers = new Dictionary<string, object>(headers);}//发布消息到指定 Exchange 和路由键await channel.BasicPublishAsync(exchange: exchangeName,      //指定消息要发送到的 Exchange 名称routingKey: routingKey,      //指定消息的路由键,用于 Exchange 路由决策mandatory: false,            //如果为 true,当消息无法路由到任何队列时会返回给生产者;false 则直接丢弃basicProperties: properties, //消息的属性,如持久化、内容类型、消息 ID、时间戳等body: body                   //消息的实际内容(字节数组),通常是序列化后的 JSON 数据);//记录日志:消息发送成功_logger.LogInformation("消息已通过路由键发送。\r\nExchange: {exchangeName}\r\n路由键: {routingKey}\r\n消息ID: {MessageId}\r\n消息: {jsonMessage}",exchangeName, routingKey, messageId, jsonMessage);}catch (Exception ex){//捕获异常并记录日志_logger.LogError(ex, "消息发送失败。\r\nExchange: {exchangeName}\r\n路由键: {routingKey}\r\n消息: {jsonMessage}",exchangeName, routingKey, jsonMessage);//抛出异常以便上层处理throw;}finally{// 确保通道资源被释放await channel?.CloseAsync();}}/// <summary>/// 发布消息-请求/响应(RPC)/// </summary>/// <typeparam name="TRequest">请求消息的类型,必须为引用类型</typeparam>/// <typeparam name="TResponse">期望的响应消息类型,必须为引用类型</typeparam>/// <param name="exchangeName">要发送请求的目标 Exchange 名称。不能为空或空白字符串。</param>/// <param name="routingKey">用于路由请求消息的路由键。不能为空或空白字符串。</param>/// <param name="request">请求对象,将被序列化为 JSON 并作为消息体发送。</param>/// <param name="timeout">等待响应的超时时间。默认为 default(可能无限期等待)。</param>/// <returns>异步任务</returns>public async Task<TResponse> PublishByPRCAsync<TRequest, TResponse>(string exchangeName, string routingKey, TRequest request, TimeSpan timeout = default) where TRequest : class where TResponse : class{// 参数校验if (string.IsNullOrWhiteSpace(exchangeName))throw new ArgumentException("Exchange名称不能为空", nameof(exchangeName));if (string.IsNullOrWhiteSpace(routingKey))throw new ArgumentException("路由键不能为空", nameof(routingKey));if (request == null)throw new ArgumentNullException(nameof(request));// 设置默认超时时间(30秒)var actualTimeout = timeout == default ? TimeSpan.FromSeconds(5) : timeout;if (actualTimeout <= TimeSpan.Zero)throw new ArgumentException("超时时间必须大于0", nameof(timeout));// 生成唯一 CorrelationIdvar correlationId = Guid.NewGuid().ToString();// 创建 TaskCompletionSourcevar tcs = new TaskCompletionSource<TResponse>();// 创建独立 Channelvar channel = await _iRabbitMQService.CreateChannelAsync();try{// 在 PublishByPRCAsync 方法中,发送请求前添加:await channel.ExchangeDeclareAsync(exchange: exchangeName,type: ExchangeType.Direct, // RPC通常使用Directdurable: true,            // 持久化autoDelete: false);// 声明临时队列用于接收响应var replyQueue = await channel.QueueDeclareAsync(queue: "",durable: false,exclusive: true,autoDelete: true);// 创建消费者var consumer = new AsyncEventingBasicConsumer(channel);consumer.ReceivedAsync += (model, ea) =>{try{if (ea.BasicProperties?.CorrelationId == correlationId){var response = JsonSerializer.Deserialize<TResponse>(ea.Body.Span);tcs.TrySetResult(response);}}catch (Exception ex){tcs.TrySetException(ex);}return Task.CompletedTask;};// 开始监听回复队列var consumerTag = await channel.BasicConsumeAsync(queue: replyQueue,autoAck: true,consumer: consumer);// 构建消息属性var props = new BasicProperties();props.ReplyTo = replyQueue;props.CorrelationId = correlationId;props.ContentType = "application/json";// 序列化请求体var body = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(request));// 发送请求await channel.BasicPublishAsync(exchange: exchangeName,routingKey: routingKey,mandatory: false,basicProperties: props,body: body);// 设置超时取消using var cts = new CancellationTokenSource(actualTimeout);cts.Token.Register(() =>{if (!tcs.Task.IsCompleted){tcs.TrySetException(new TimeoutException($"RPC请求超时({actualTimeout.TotalSeconds}秒)"));channel.BasicCancelAsync(consumerTag);}});return await tcs.Task;}finally{// 确保通道资源被释放await channel?.CloseAsync();}}/// <summary>/// 将死信队列中的消息重新发布到原始队列(泛型版本)/// </summary>/// <typeparam name="T">消息体的类型(如 DTO 类)</typeparam>/// <param name="queueName">原始队列名称</param>/// <param name="batchSize">每次处理的消息批大小</param>/// <param name="delay">重发延迟时间(毫秒)</param>/// <returns>成功处理的消息数量</returns>public async Task<int> RepublishDeadLetterMessagesAsync<T>(string queueName, int batchSize = 100, int delay = 0) where T : class{// 检查传入的队列名是否为空或空白字符串,若为空则抛出异常if (string.IsNullOrWhiteSpace(queueName))throw new ArgumentNullException(nameof(queueName));// 构造死信队列(DLQ)名称,格式为:{原始队列名}.DLQvar dlxQueueName = $"{queueName}.DLQ";// 声明一个 RabbitMQ 的 Channel 对象,用于后续操作IChannel? channel = null;// 记录已处理的消息数量int processedCount = 0;try{// 创建一个新的 Channel 实例(通过服务注入的 _iRabbitMQService)channel = await _iRabbitMQService.CreateChannelAsync();// 检查死信队列是否存在(被动声明方式)try{// 如果不存在会抛出异常,catch 中捕获并记录日志后返回 0await channel.QueueDeclarePassiveAsync(dlxQueueName);}catch{// 日志记录:如果 DLQ 不存在,则直接返回 0_logger.LogWarning("死信队列 {DLXQueueName} 不存在", dlxQueueName);return 0;}// 循环获取最多 batchSize 条消息for (int i = 0; i < batchSize; i++){// 使用 BasicGet 从 DLQ 获取一条消息(不自动确认)var result = await channel.BasicGetAsync(dlxQueueName, autoAck: false);// 如果没有更多消息了,跳出循环if (result == null)break;// 获取消息体内容,并转为 byte[] 数组var body = result.Body.ToArray();// 获取原始消息属性(BasicProperties),用于后续操作var originalProperties = result.BasicProperties;// 获取当前消息的 DeliveryTag,用于确认或拒绝消息var deliveryTag = result.DeliveryTag;try{// 如果设置了 delay > 0,则等待指定时间(模拟延迟重试)if (delay > 0)await Task.Delay(delay);// 生成新的唯一 MessageId,用于追踪消息var messageId = Guid.NewGuid().ToString();// 创建新的 BasicProperties 实例,用于新消息的属性设置var properties = new BasicProperties{Persistent = true,                                                          // 设置消息持久化(需队列也持久化才生效)MessageId = messageId,                                                      // 设置唯一消息 IDContentType = "application/json",                                           // 明确内容类型为 JSONTimestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds())    // 添加当前时间戳};// 将消息重新发布到原始队列,使用默认交换机(exchange 为空)await channel.BasicPublishAsync(exchange: string.Empty,routingKey: queueName,mandatory: false,basicProperties: properties,body: body);// 确认 DLQ 中该消息已被正确处理(ACK)await channel.BasicAckAsync(deliveryTag, multiple: false);// 成功处理计数器 +1processedCount++;// 日志记录:消息已成功重新发布_logger.LogInformation("已重新发布死信消息 {MessageId} 到队列 {QueueName}",properties.MessageId ?? "未知ID", queueName);}catch (Exception ex){// 日志记录:消息重发失败_logger.LogError(ex, "重新发布死信消息失败,DeliveryTag={DeliveryTag}", deliveryTag);// 拒绝消息并重新入队(回到 DLQ),requeue: true 表示重新入队await channel.BasicNackAsync(deliveryTag, multiple: false, requeue: true);}}// 返回成功处理的消息数量return processedCount;}catch (Exception ex){// 日志记录:整个处理过程中发生错误_logger.LogError(ex, "处理死信队列 {DLXQueueName} 时发生错误", dlxQueueName);// 抛出异常,供上层调用者捕获处理throw;}finally{// 确保通道资源被释放await channel?.CloseAsync();}}}
}

        案例如下

         4.2.生产者接口

namespace Frame3_DataRepository.RabbitMQRepository.Producer
{/// <summary>/// 生产者服务接口/// RabbitMQ/// </summary>public interface IMQProducerService{/// <summary>/// 发布消息-点对点模式(Point-to-Point)/// </summary>/// <param name="queue">队列名称</param>/// <typeparam name="T">消息类型,必须是引用类型</typeparam>/// <param name="message">要发布的消息对象</param>/// <param name="messageId">消息ID(可选,未提供时自动生成GUID)</param>/// <param name="exchange">交换机名称(空字符串表示默认交换机)</param>/// <returns>异步任务</returns>/// <exception cref="ArgumentNullException">当队列名或消息为空时抛出</exception>Task PublishByPTPAsync<T>(string queue, T message, string messageId = null, string exchange = null, IDictionary<string, object> headers = null, bool withDLX = true, int maxRetryCount = 3) where T : class;/// <summary>/// 发布消息-发布/订阅模式(Pub/Sub)/// </summary>/// <typeparam name="T">消息类型,必须是 class 类型</typeparam>/// <param name="exchangeName">目标 Exchange 名称</param>/// <param name="message">要发布的消息对象</param>/// <param name="messageId">可选的消息唯一标识符,默认自动生成</param>/// <param name="headers">可选的消息头字典</param>/// <returns>异步任务</returns>Task PublishByPubSubAsync<T>(string exchangeName, T message, string messageId = null, IDictionary<string, object> headers = null) where T : class;/// <summary>/// 发布消息-路由模式(Routing)/// </summary>/// <typeparam name="T">消息体的类型,必须为引用类型</typeparam>/// <param name="exchangeName">要发布的交换机名称。不能为空或空白字符串。</param>/// <param name="routingKey">消息的路由键,用于匹配绑定队列。不能为空或空白字符串。</param>/// <param name="message">要发送的消息对象,将被序列化为 JSON 格式。</param>/// <param name="messageId">消息的唯一标识符。如果未提供,则自动生成 Guid 字符串。</param>/// <param name="headers">可选的消息头部信息,用于携带额外元数据。</param>/// <returns>异步任务</returns>Task PublishByRoutingAsync<T>(string exchangeName, string routingKey, T message, string messageId = null, IDictionary<string, object> headers = null) where T : class;/// <summary>/// 发布消息-主题模式(Topic)/// </summary>/// <typeparam name="T"></typeparam>/// <param name="exchangeName">要发布的交换机名称。不能为空或空白字符串。</param>/// <param name="routingKey">消息的路由键,用于匹配 Topic 类型 Exchange 的绑定规则。不能为空或空白字符串。</param>/// <param name="message">要发送的消息对象,将被序列化为 JSON 格式。</param>/// <param name="messageId">消息的唯一标识符。如果未提供,则自动生成 Guid 字符串。</param>/// <param name="headers">可选的消息头部信息,用于携带额外元数据。</param>/// <returns>异步任务</returns>Task PublishByTopicAsync<T>(string exchangeName, string routingKey, T message, string messageId = null, IDictionary<string, object> headers = null) where T : class;/// <summary>/// 发布消息-请求/响应(RPC)/// </summary>/// <typeparam name="TRequest">请求消息的类型,必须为引用类型</typeparam>/// <typeparam name="TResponse">期望的响应消息类型,必须为引用类型</typeparam>/// <param name="exchangeName">要发送请求的目标 Exchange 名称。不能为空或空白字符串。</param>/// <param name="routingKey">用于路由请求消息的路由键。不能为空或空白字符串。</param>/// <param name="request">请求对象,将被序列化为 JSON 并作为消息体发送。</param>/// <param name="timeout">等待响应的超时时间。默认为 default(可能无限期等待)。</param>/// <returns>异步任务</returns>Task<TResponse> PublishByPRCAsync<TRequest, TResponse>(string exchangeName, string routingKey, TRequest request, TimeSpan timeout = default) where TRequest : class where TResponse : class;/// <summary>/// 将死信队列中的消息重新发布到原始队列(泛型版本)/// </summary>/// <typeparam name="T">消息体的类型(如 DTO 类)</typeparam>/// <param name="queueName">原始队列名称</param>/// <param name="batchSize">每次处理的消息批大小</param>/// <param name="delay">重发延迟时间(毫秒)</param>/// <returns>成功处理的消息数量</returns>Task<int> RepublishDeadLetterMessagesAsync<T>(string queueName, int batchSize = 100, int delay = 0) where T : class;}
}

        案例如下

5.创建消费者服务

         创建消费者服务实现类 MQConsumerService 和接口 IMQConsumerService

        5.1.消费者服务接口

using Frame3_DataRepository.RabbitMQRepository.BaseMQ;
using Frame6_LibraryUtility;
using Microsoft.Extensions.Logging;
using MongoDB.Bson;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
using System.Collections.Concurrent;
using System.Text;
using System.Text.Json;namespace Frame3_DataRepository.RabbitMQRepository.Consumer
{/// <summary>/// 消费者服务实现类/// 提供可靠的消息消费功能,支持自动重试和错误处理/// </summary>public sealed class MQConsumerService : BaseServiceSingleton, IMQConsumerService{/// <summary>/// RabbitMQ 基础服务/// </summary>private readonly IRabbitMqClient _rabbitMQService;/// <summary>/// 日志记录器/// </summary>private readonly ILogger<MQConsumerService> _logger;/// <summary>/// 最大消费者数量限制/// </summary>private readonly int _maxConsumerCount;/// <summary>/// 当前消费者数量计数器(线程安全)/// </summary>private int _currentConsumerCount;/// <summary>/// 用于限制消费者数量的信号量/// </summary>private readonly SemaphoreSlim _consumerLimitSemaphore;/// <summary>/// 当前活跃的消费者字典,线程安全集合/// Key: 消费者标签/// Value: (通道对象, 消费者对象)/// </summary>private readonly ConcurrentDictionary<string, (IChannel Channel, AsyncEventingBasicConsumer Consumer)> _activeConsumers;/// <summary>/// 构造函数,依赖注入初始化/// </summary>/// <param name="rabbitMQService">RabbitMQ基础服务</param>/// <param name="logger">日志记录器</param>/// <exception cref="ArgumentNullException">当参数为null时抛出</exception>public MQConsumerService(IRabbitMqClient rabbitMQService, ILogger<MQConsumerService> logger){_rabbitMQService = rabbitMQService ?? throw new ArgumentNullException(nameof(rabbitMQService));_logger = logger ?? throw new ArgumentNullException(nameof(logger));_activeConsumers = new ConcurrentDictionary<string, (IChannel, AsyncEventingBasicConsumer)>();_maxConsumerCount = _rabbitMQService.ConsumerCount;_currentConsumerCount = 0;_consumerLimitSemaphore = new SemaphoreSlim(_maxConsumerCount, _maxConsumerCount);}/// <summary>/// 消费消息-点对点(Point-to-Point)/// </summary>/// <typeparam name="T">消息类型</typeparam>/// <param name="queueName">要消费的队列名称</param>/// <param name="messageHandler">消息处理委托</param>/// <param name="prefetchCount">预取消息数量,控制消费者负载</param>/// <param name="autoAck">是否自动确认消息,建议设为false实现可靠消费</param>/// <param name="withDLX">是否启用死信队列</param>/// <returns>取消令牌源,用于停止消费</returns>public async Task<CancellationTokenSource> StartConsumingByPTPAsync<T>(string queueName, Func<T, Task> messageHandler, bool withDLX = true, ushort prefetchCount = 0, bool autoAck = false) where T : class{//参数校验if (string.IsNullOrWhiteSpace(queueName))throw new ArgumentNullException(nameof(queueName), "队列名称不能为空");if (messageHandler == null)throw new ArgumentNullException(nameof(messageHandler), "消息处理器不能为空");//等待获取消费者槽位(带超时防止死锁)if (!await _consumerLimitSemaphore.WaitAsync(TimeSpan.FromSeconds(30))){throw new InvalidOperationException($"等待消费者槽位超时,当前已有 {_maxConsumerCount} 个活跃消费者");}//当前消费者数量+1Interlocked.Increment(ref _currentConsumerCount);//创建取消令牌源var cancellationTokenSource = new CancellationTokenSource();//创建通道IChannel? channel = null;try{//赋值预读取数量if (prefetchCount == 0){prefetchCount = _rabbitMQService.prefetchCount;}//创建通道channel = await _rabbitMQService.CreateChannelAsync();//设置QoS(服务质量),控制预取消息数量await channel.BasicQosAsync(prefetchSize: 0,                //不限制预取消息总大小prefetchCount: prefetchCount,   //prefetchCount > 0 ? prefetchCount : _rabbitMQService.prefetchCount,   //每次预取的消息数量global: false                   //应用于当前消费者而非整个通道);//检查队列是否存在try{await channel.QueueDeclarePassiveAsync(queueName);}catch{_logger.LogError($"队列 {queueName} 不存在");throw;}//创建消费者var consumer = new AsyncEventingBasicConsumer(channel);//注册消息接收事件consumer.ReceivedAsync += async (model, ea) =>{try{//反序列化消息var message = JsonSerializer.Deserialize<T>(Encoding.UTF8.GetString(ea.Body.Span));//记录接收日志_logger.LogInformation("收到消息。\r\n消息ID: {MessageId}\r\n队列: {QueueName}\r\n消息体:{message}", ea.BasicProperties.MessageId, queueName, message?.ToJson());//处理消息await messageHandler(message);//如果不是自动确认模式,手动确认消息if (!autoAck){await channel.BasicAckAsync(deliveryTag: ea.DeliveryTag,    //消息投递标签multiple: false                 //不批量确认);}}catch (JsonException jsonEx){//处理反序列化错误_logger.LogError(jsonEx, "消息反序列化失败。\r\n队列: {QueueName}", queueName);//拒绝消息,不重新入队if (!autoAck){await channel.BasicRejectAsync(deliveryTag: ea.DeliveryTag, requeue: false);}}catch (Exception ex){//处理业务逻辑错误_logger.LogError(ex, "消息处理失败。\r\n队列: {QueueName}", queueName);//如果不是自动确认模式(autoAck=false),需要手动处理消息确认和重试逻辑if (!autoAck){//获取当前消息的属性对象var properties = ea.BasicProperties;//获取消息头,如果headers为null则创建新的字典var headers = properties.Headers ?? new Dictionary<string, object>();//获取当前重试次数,如果不存在x-retry-count头则默认为0int retryCount = headers.TryGetValue("x-retry-count", out var retryObj) ? Convert.ToInt32(retryObj) : 0;//获取最大重试次数,如果不存在x-max-retry-count头则默认为1int maxRetryCount = headers.TryGetValue("x-max-retry-count", out var maxRetryObj) ? Convert.ToInt32(maxRetryObj) : 1;//如果启用了死信队列(withDLX=true)且当前重试次数已达最大重试次数if (withDLX && retryCount >= maxRetryCount){//记录警告日志,说明消息已达到最大重试次数_logger.LogWarning("消息已达到最大重试次数 {MaxRetryCount},将被移入死信队列", maxRetryCount);//拒绝消息,requeue=false表示不重新入队,消息将被路由到死信队列await channel.BasicRejectAsync(ea.DeliveryTag, requeue: false);}else{//创建新的消息属性对象,复制原始消息的所有属性var newProperties = new BasicProperties{ContentType = properties.ContentType,       //复制内容类型ContentEncoding = properties.ContentEncoding,   //复制内容编码DeliveryMode = properties.DeliveryMode,         //复制投递模式(1-非持久化,2-持久化)Priority = properties.Priority,                 //复制消息优先级CorrelationId = properties.CorrelationId,       //复制关联ID(用于请求 - 响应模式)ReplyTo = properties.ReplyTo,                   //复制回复队列名称Expiration = properties.Expiration,             //复制消息过期时间MessageId = properties.MessageId,               //复制消息IDTimestamp = properties.Timestamp,               //复制时间戳Type = properties.Type,                         //复制消息类型UserId = properties.UserId,                     //复制用户IDAppId = properties.AppId,                       //复制应用IDClusterId = properties.ClusterId,               //复制集群ID//复制消息头,并更新重试次数Headers = new Dictionary<string, object>(headers){["x-retry-count"] = retryCount + 1          //重试次数+1}};//重新发布消息到原始队列await channel.BasicPublishAsync(exchange: string.Empty,             //exchange: 空字符串表示默认direct交换机routingKey: queueName,              //使用原始队列名称mandatory: false,                   //false表示如果无法路由则丢弃消息basicProperties: newProperties,     //使用更新后的属性(包含新的重试次数)body: ea.Body                       //原始消息体);//确认原始消息已被处理(从队列中移除)//- deliveryTag: 消息投递标签//- multiple: false表示只确认单条消息await channel.BasicAckAsync(ea.DeliveryTag, multiple: false);}}}};//开始消费队列消息var consumerTag = await channel.BasicConsumeAsync(queue: queueName,   //队列名称autoAck: autoAck,   //自动确认设置consumer: consumer  //消费者实例);//将消费者添加到活跃集合_activeConsumers.TryAdd(consumerTag, (channel, consumer));//注册取消令牌回调,当取消时停止消费者cancellationTokenSource.Token.Register(() => StopConsuming(consumerTag).ConfigureAwait(false));//记录消费者/总消费者数量_logger.LogInformation("成功启动消费者,当前消费者数: {_currentConsumerCount}/{_maxConsumerCount},当前消费者标记: {consumerTag}", _currentConsumerCount, _maxConsumerCount, consumerTag);return cancellationTokenSource;}catch (Exception ex){//清理创建失败的通道资源channel?.Dispose();//当前消费者数量-1Interlocked.Decrement(ref _currentConsumerCount);_consumerLimitSemaphore.Release();//记录启动失败日志_logger.LogError(ex, "启动消费者失败。队列: {QueueName}", queueName);throw;}//finally//{//    // 如果出错或任务完成,确保 channel 被释放//    channel?.Dispose();//}}/// <summary>/// 消费消息-发布/订阅模式(Pub/Sub)/// </summary>/// <typeparam name="T">消息类型</typeparam>/// <param name="exchangeName">要订阅的Exchange名称</param>/// <param name="messageHandler">消息处理委托</param>/// <param name="prefetchCount">预取消息数量,控制消费者负载</param>/// <param name="autoAck">是否自动确认消息</param>/// <returns>取消令牌源,用于停止消费</returns>public async Task<CancellationTokenSource> StartConsumingByPubSubAsync<T>(string exchangeName, Func<T, Task> messageHandler, ushort prefetchCount = 0, bool autoAck = false) where T : class{//校验exchange名称是否为空if (string.IsNullOrWhiteSpace(exchangeName))throw new ArgumentNullException(nameof(exchangeName), "Exchange名称不能为空");//校验消息处理器是否为空if (messageHandler == null)throw new ArgumentNullException(nameof(messageHandler), "消息处理器不能为空");//等待获取消费者槽位(防止并发消费者过多)if (!await _consumerLimitSemaphore.WaitAsync(TimeSpan.FromSeconds(30))){//获取失败则抛出超时异常throw new InvalidOperationException($"等待消费者槽位超时,当前已有 {_maxConsumerCount} 个活跃消费者");}//原子增加当前消费者数量Interlocked.Increment(ref _currentConsumerCount);//创建取消令牌源用于后续停止消费var cancellationTokenSource = new CancellationTokenSource();IChannel? channel = null;try{//如果未指定prefetchCount,则使用默认值if (prefetchCount == 0){prefetchCount = _rabbitMQService.prefetchCount;}//创建 RabbitMQ 通道channel = await _rabbitMQService.CreateChannelAsync();//设置QoS(服务质量),限制预取的消息数量await channel.BasicQosAsync(prefetchSize: 0,prefetchCount: prefetchCount,global: false);//声明一个 fanout 类型的 Exchange(广播模式)await channel.ExchangeDeclareAsync(exchange: exchangeName,type: ExchangeType.Fanout,durable: true,   //可持久化autoDelete: false); //不自动删除//自定义临时队列名称var queueName = "PubSub-" + DateTime.Now.ToString("yyyyMMddHHmmssfff");//创建临时队列(由RabbitMQ自动生成名字)//var queueResult = await channel.QueueDeclareAsync();var queueResult = await channel.QueueDeclareAsync(queue: queueName,   //队列名称durable: false,     //队列是否持久化       false:队列仅存于内存,RabbitMQ 重启后队列丢失(适合临时队列)           true:队列会持久化到磁盘,RabbitMQ 重启后仍存在(适合重要消息)exclusive: true,    //队列是否排他         true:队列仅对当前连接可见,连接关闭后队列自动删除(适合临时私有队列)    false:队列可被多个消费者共享(默认值,适合常规队列)autoDelete: true    //队列是否自动删除     true:当最后一个消费者断开连接后,队列自动删除(适合临时队列)            false:队列不会自动删除,需手动调用 QueueDelete 删除(默认值));//将队列绑定到Exchange(fanout类型忽略routingKey)await channel.QueueBindAsync(queue: queueName,exchange: exchangeName,routingKey: "");//创建异步消费者对象var consumer = new AsyncEventingBasicConsumer(channel);//注册消息接收事件处理逻辑consumer.ReceivedAsync += async (model, ea) =>{try{//反序列化消息体为泛型对象Tvar message = JsonSerializer.Deserialize<T>(Encoding.UTF8.GetString(ea.Body.Span));//记录收到消息的日志_logger.LogInformation("收到消息。\r\n消息ID: {MessageId}\r\nExchange: {ExchangeName}\r\n消息体:{message}", ea.BasicProperties.MessageId, exchangeName, message?.ToJson());//调用用户定义的消息处理方法await messageHandler(message);//如果不是自动确认,则手动发送 Ack 确认消息已处理if (!autoAck){await channel.BasicAckAsync(ea.DeliveryTag, multiple: false);}}catch (JsonException jsonEx){//消息反序列化失败,记录错误日志_logger.LogError(jsonEx, "消息反序列化失败。\r\nExchange: {ExchangeName}", exchangeName);//非自动确认模式下拒绝消息,不重入队列if (!autoAck){await channel.BasicRejectAsync(ea.DeliveryTag, requeue: false);}}catch (Exception ex){//消息处理过程中发生其他异常,记录错误日志_logger.LogError(ex, "消息处理失败。\r\nExchange: {ExchangeName}", exchangeName);//非自动确认模式下拒绝消息,并尝试重新入队if (!autoAck){await channel.BasicRejectAsync(ea.DeliveryTag, requeue: true);}}};//启动消费,开始监听消息var consumerTag = await channel.BasicConsumeAsync(queue: queueName,autoAck: autoAck,consumer: consumer);//将消费者和通道保存起来以便后续取消操作_activeConsumers.TryAdd(consumerTag, (channel, consumer));//注册取消回调,当取消令牌被触发时调用StopConsumingcancellationTokenSource.Token.Register(() => StopConsuming(consumerTag).ConfigureAwait(false));//记录启动成功日志_logger.LogInformation("成功启动订阅者,当前消费者数: {_currentConsumerCount}/{_maxConsumerCount}", _currentConsumerCount, _maxConsumerCount);//返回取消令牌源,供外部控制消费终止return cancellationTokenSource;}catch (Exception ex){//出现异常时释放资源channel?.Dispose();//原子减少消费者计数Interlocked.Decrement(ref _currentConsumerCount);//释放信号量槽位_consumerLimitSemaphore.Release();//记录启动失败日志_logger.LogError(ex, "启动订阅者失败。Exchange: {ExchangeName}", exchangeName);//抛出异常throw;}}/// <summary>/// 消费消息-路由模式(Routing)/// </summary>/// <typeparam name="T">消息反序列化的目标类型</typeparam>/// <param name="exchangeName">要绑定的 Exchange 名称</param>/// <param name="routingKey">用于绑定队列和 Exchange 的路由键</param>/// <param name="messageHandler">处理接收到的消息的异步回调函数</param>/// <param name="prefetchCount">预取消息数量,默认为0(未使用)</param>/// <param name="autoAck">是否自动确认消息</param>/// <returns>CancellationTokenSource,用于取消消费操作</returns>public async Task<CancellationTokenSource> StartConsumingByRoutingAsync<T>(string exchangeName, string routingKey, Func<T, Task> messageHandler, ushort prefetchCount = 0, bool autoAck = false) where T : class{//校验exchange名称是否为空if (string.IsNullOrWhiteSpace(exchangeName))throw new ArgumentNullException(nameof(exchangeName), "Exchange名称不能为空");//校验路由键是否为空if (routingKey == null)throw new ArgumentNullException(nameof(routingKey), "路由键不能为空");//校验消息处理器是否为空if (messageHandler == null)throw new ArgumentNullException(nameof(messageHandler), "消息处理器不能为空");//等待获取消费者槽位(防止并发消费者过多)if (!await _consumerLimitSemaphore.WaitAsync(TimeSpan.FromSeconds(30))){//获取失败则抛出超时异常throw new InvalidOperationException($"等待消费者槽位超时,当前已有 {_maxConsumerCount} 个活跃消费者");}//原子增加当前消费者数量Interlocked.Increment(ref _currentConsumerCount);// 创建 CancellationTokenSource,用于后续控制取消消费var cancellationTokenSource = new CancellationTokenSource();IChannel? channel = null;try{// 创建一个新的 RabbitMQ Channelchannel = await _rabbitMQService.CreateChannelAsync();// 声明一个 Direct 类型的 Exchange(如果不存在则创建)await channel.ExchangeDeclareAsync(exchange: exchangeName,type: ExchangeType.Direct,durable: true,          // Exchange 持久化autoDelete: false);     // 不自动删除//自定义临时队列名称var queueName = "Routing-" + DateTime.Now.ToString("yyyyMMddHHmmssfff");//创建临时队列(由RabbitMQ自动生成名字)//var queueResult = await channel.QueueDeclareAsync();var queueResult = await channel.QueueDeclareAsync(queue: queueName,   //队列名称durable: false,     //队列是否持久化       false:队列仅存于内存,RabbitMQ 重启后队列丢失(适合临时队列)           true:队列会持久化到磁盘,RabbitMQ 重启后仍存在(适合重要消息)exclusive: true,    //队列是否排他         true:队列仅对当前连接可见,连接关闭后队列自动删除(适合临时私有队列)    false:队列可被多个消费者共享(默认值,适合常规队列)autoDelete: true    //队列是否自动删除     true:当最后一个消费者断开连接后,队列自动删除(适合临时队列)            false:队列不会自动删除,需手动调用 QueueDelete 删除(默认值));// 将队列绑定到指定的 Exchange,并使用给定的 routingKeyawait channel.QueueBindAsync(queueName, exchangeName, routingKey);// 创建异步消费者var consumer = new AsyncEventingBasicConsumer(channel);// 注册消息接收事件处理逻辑consumer.ReceivedAsync += async (model, ea) =>{try{// 反序列化消息体为泛型 T 对象var msg = JsonSerializer.Deserialize<T>(Encoding.UTF8.GetString(ea.Body.Span));//记录收到消息的日志_logger.LogInformation("收到消息。\r\n消息ID: {MessageId}\r\nExchange: {ExchangeName}\r\n消息体:{message}", ea.BasicProperties.MessageId, exchangeName, msg?.ToJson());// 调用用户定义的消息处理函数await messageHandler(msg);// 如果不是自动确认,则手动发送 Ack 确认消息已处理成功if (!autoAck)await channel.BasicAckAsync(ea.DeliveryTag, multiple: false);}catch (JsonException jsonEx){//消息反序列化失败,记录错误日志_logger.LogError(jsonEx, "消息反序列化失败。\r\nExchange: {ExchangeName}", exchangeName);//非自动确认模式下拒绝消息,不重入队列if (!autoAck){await channel.BasicRejectAsync(ea.DeliveryTag, requeue: false);}}catch (Exception ex){//消息处理过程中发生其他异常,记录错误日志_logger.LogError(ex, "消息处理失败。\r\nExchange: {ExchangeName}", exchangeName);//非自动确认模式下拒绝消息,并尝试重新入队if (!autoAck){await channel.BasicRejectAsync(ea.DeliveryTag, requeue: true);}}};// 开始消费队列中的消息var consumerTag = await channel.BasicConsumeAsync(queueName, autoAck, consumer);// 记录当前消费者信息以便后续取消或释放资源_activeConsumers.TryAdd(consumerTag, (channel, consumer));// 注册取消令牌,在取消时调用 StopConsuming 方法停止消费cancellationTokenSource.Token.Register(() => StopConsuming(consumerTag).ConfigureAwait(false));//记录启动成功日志_logger.LogInformation("成功启动订阅者,当前消费者数: {_currentConsumerCount}/{_maxConsumerCount}", _currentConsumerCount, _maxConsumerCount);// 返回 CancellationTokenSource,供外部控制取消return cancellationTokenSource;}catch (Exception ex){//出现异常时释放资源channel?.Dispose();//原子减少消费者计数Interlocked.Decrement(ref _currentConsumerCount);//释放信号量槽位_consumerLimitSemaphore.Release();//记录启动失败日志_logger.LogError(ex, "启动订阅者失败。Exchange: {ExchangeName}", exchangeName);//抛出异常throw;}}/// <summary>/// 消费消息-主题模式(Topic)/// </summary>/// <typeparam name="T">消息反序列化的目标类型</typeparam>/// <param name="exchangeName">要绑定的 Exchange 名称</param>/// <param name="topicPattern">用于绑定队列的 Topic 匹配规则(如 user.*)</param>/// <param name="messageHandler">处理接收到的消息的异步回调函数</param>/// <param name="prefetchCount">预取消息数量,默认为0(未使用)</param>/// <param name="autoAck">是否自动确认消息</param>/// <returns>CancellationTokenSource,用于取消消费操作</returns>public async Task<CancellationTokenSource> StartConsumingByTopicAsync<T>(string exchangeName, string topicPattern, Func<T, Task> messageHandler, ushort prefetchCount = 0, bool autoAck = false) where T : class{//校验exchange名称是否为空if (string.IsNullOrWhiteSpace(exchangeName))throw new ArgumentNullException(nameof(exchangeName), "Exchange名称不能为空");//校验匹配规则是否为空if (topicPattern == null)throw new ArgumentNullException(nameof(topicPattern), "topicPattern不能为空");//校验消息处理器是否为空if (messageHandler == null)throw new ArgumentNullException(nameof(messageHandler), "消息处理器不能为空");//等待获取消费者槽位(防止并发消费者过多)if (!await _consumerLimitSemaphore.WaitAsync(TimeSpan.FromSeconds(30))){//获取失败则抛出超时异常throw new InvalidOperationException($"等待消费者槽位超时,当前已有 {_maxConsumerCount} 个活跃消费者");}//原子增加当前消费者数量Interlocked.Increment(ref _currentConsumerCount);// 创建 CancellationTokenSource,用于后续控制取消消费var cancellationTokenSource = new CancellationTokenSource();IChannel? channel = null;try{// 创建一个新的 RabbitMQ Channelchannel = await _rabbitMQService.CreateChannelAsync();// 声明一个 Topic 类型的 Exchange(如果不存在则创建)await channel.ExchangeDeclareAsync(exchange: exchangeName,type: ExchangeType.Topic,durable: true,          // Exchange 持久化autoDelete: false);     // 不自动删除//自定义临时队列名称var queueName = "Topic-" + DateTime.Now.ToString("yyyyMMddHHmmssfff");//创建临时队列(由RabbitMQ自动生成名字)//var queueResult = await channel.QueueDeclareAsync();var queueResult = await channel.QueueDeclareAsync(queue: queueName,   //队列名称durable: false,     //队列是否持久化       false:队列仅存于内存,RabbitMQ 重启后队列丢失(适合临时队列)           true:队列会持久化到磁盘,RabbitMQ 重启后仍存在(适合重要消息)exclusive: true,    //队列是否排他         true:队列仅对当前连接可见,连接关闭后队列自动删除(适合临时私有队列)    false:队列可被多个消费者共享(默认值,适合常规队列)autoDelete: true    //队列是否自动删除     true:当最后一个消费者断开连接后,队列自动删除(适合临时队列)            false:队列不会自动删除,需手动调用 QueueDelete 删除(默认值));// 将队列绑定到指定的 Exchange,并使用 Topic 模式匹配规则await channel.QueueBindAsync(queueName, exchangeName, topicPattern);// 创建异步消费者var consumer = new AsyncEventingBasicConsumer(channel);// 注册消息接收事件处理逻辑consumer.ReceivedAsync += async (model, ea) =>{try{// 反序列化消息体为泛型 T 对象var msg = JsonSerializer.Deserialize<T>(Encoding.UTF8.GetString(ea.Body.Span));//记录收到消息的日志_logger.LogInformation("收到消息。\r\n消息ID: {MessageId}\r\nExchange: {ExchangeName}\r\n消息体:{message}", ea.BasicProperties.MessageId, exchangeName, msg?.ToJson());// 调用用户定义的消息处理函数await messageHandler(msg);// 如果不是自动确认,则手动发送 Ack 确认消息已处理成功if (!autoAck)await channel.BasicAckAsync(ea.DeliveryTag, multiple: false);}catch (JsonException jsonEx){//消息反序列化失败,记录错误日志_logger.LogError(jsonEx, "消息反序列化失败。\r\nExchange: {ExchangeName}", exchangeName);//非自动确认模式下拒绝消息,不重入队列if (!autoAck){await channel.BasicRejectAsync(ea.DeliveryTag, requeue: false);}}catch (Exception ex){//消息处理过程中发生其他异常,记录错误日志_logger.LogError(ex, "消息处理失败。\r\nExchange: {ExchangeName}", exchangeName);//非自动确认模式下拒绝消息,并尝试重新入队if (!autoAck){await channel.BasicRejectAsync(ea.DeliveryTag, requeue: true);}}};// 开始消费队列中的消息var consumerTag = await channel.BasicConsumeAsync(queueName, autoAck, consumer);// 记录当前消费者信息以便后续取消或释放资源_activeConsumers.TryAdd(consumerTag, (channel, consumer));// 注册取消令牌,在取消时调用 StopConsuming 方法停止消费cancellationTokenSource.Token.Register(() => StopConsuming(consumerTag).ConfigureAwait(false));//记录启动成功日志_logger.LogInformation("成功启动订阅者,当前消费者数: {_currentConsumerCount}/{_maxConsumerCount}", _currentConsumerCount, _maxConsumerCount);// 返回 CancellationTokenSource,供外部控制取消return cancellationTokenSource;}catch (Exception ex){//出现异常时释放资源channel?.Dispose();//原子减少消费者计数Interlocked.Decrement(ref _currentConsumerCount);//释放信号量槽位_consumerLimitSemaphore.Release();//记录启动失败日志_logger.LogError(ex, "启动订阅者失败。Exchange: {ExchangeName}", exchangeName);//抛出异常throw;}}/// <summary>/// 消费消息-请求/响应(RPC)/// </summary>/// <typeparam name="TRequest">请求消息的类型</typeparam>/// <typeparam name="TResponse">响应消息的类型</typeparam>/// <param name="exchangeName">Exchange 名称,通常为空字符串表示默认 Exchange</param>/// <param name="routingKey">用于监听的队列名称(同时也是 routingKey)</param>/// <param name="handler">处理请求并返回响应的异步回调函数</param>/// <param name="prefetchCount">预取消息数量,默认为0(未使用)</param>/// <returns>CancellationTokenSource,用于取消消费操作</returns>public async Task<CancellationTokenSource> StartConsumingByPRCAsync<TRequest, TResponse>(string exchangeName, string routingKey, Func<TRequest, Task<TResponse>> handler, ushort prefetchCount = 0) where TRequest : class where TResponse : class{// 参数校验if (string.IsNullOrWhiteSpace(exchangeName))throw new ArgumentException("Exchange名称不能为空", nameof(exchangeName));if (string.IsNullOrWhiteSpace(routingKey))throw new ArgumentException("路由键不能为空", nameof(routingKey));if (handler == null)throw new ArgumentNullException(nameof(handler), "消息处理器不能为空");// 等待获取消费者槽位if (!await _consumerLimitSemaphore.WaitAsync(TimeSpan.FromSeconds(30))){throw new InvalidOperationException($"等待消费者槽位超时,当前已有 {_maxConsumerCount} 个活跃消费者");}try{Interlocked.Increment(ref _currentConsumerCount);// 创建新的 RabbitMQ Channelvar channel = await _rabbitMQService.CreateChannelAsync();try{// 设置预取数量(控制并发处理能力)if (prefetchCount > 0){await channel.BasicQosAsync(0, prefetchCount, false);}// 声明队列(与生产者保持一致)var queueDeclareOk = await channel.QueueDeclareAsync(queue: routingKey,   //队列名称durable: true,       //队列是否持久化       false:队列仅存于内存,RabbitMQ 重启后队列丢失(适合临时队列)           true:队列会持久化到磁盘,RabbitMQ 重启后仍存在(适合重要消息)exclusive: false,    //队列是否排他         true:队列仅对当前连接可见,连接关闭后队列自动删除(适合临时私有队列)    false:队列可被多个消费者共享(默认值,适合常规队列)autoDelete: false,   //队列是否自动删除     true:当最后一个消费者断开连接后,队列自动删除(适合临时队列)            false:队列不会自动删除,需手动调用 QueueDelete 删除(默认值)arguments: null);// 绑定队列到Exchangeawait channel.QueueBindAsync(queue: routingKey,exchange: exchangeName,routingKey: routingKey);// 创建异步消费者var consumer = new AsyncEventingBasicConsumer(channel);// 消息处理逻辑consumer.ReceivedAsync += async (model, ea) =>{try{// 反序列化请求var request = JsonSerializer.Deserialize<TRequest>(ea.Body.Span);// 处理请求var response = await handler(request);// 准备响应属性var replyProps = new BasicProperties();replyProps.CorrelationId = ea.BasicProperties.CorrelationId;replyProps.ContentType = "application/json";// 发送响应await channel.BasicPublishAsync(exchange: "", // 默认ExchangeroutingKey: ea.BasicProperties.ReplyTo,mandatory: false,basicProperties: replyProps,body: Encoding.UTF8.GetBytes(JsonSerializer.Serialize(response)));// 确认消息await channel.BasicAckAsync(ea.DeliveryTag, false);}catch (Exception ex){_logger.LogError(ex, "处理RPC请求失败: {CorrelationId}",ea.BasicProperties?.CorrelationId);// 拒绝消息且不重新入队await channel.BasicNackAsync(ea.DeliveryTag, false, false);}};// 开始消费var consumerTag = await channel.BasicConsumeAsync(queue: routingKey,autoAck: false, // 手动确认consumer: consumer);// 创建取消令牌var cts = new CancellationTokenSource();// 注册取消回调cts.Token.Register(async () =>{try{await channel.BasicCancelAsync(consumerTag);await channel.CloseAsync();}catch (Exception ex){_logger.LogWarning(ex, "取消消费者时发生错误");}finally{channel.Dispose();Interlocked.Decrement(ref _currentConsumerCount);_consumerLimitSemaphore.Release();}});return cts;}catch{// 发生异常时确保通道被关闭channel?.Dispose();throw;}}catch{// 发生异常时释放信号量Interlocked.Decrement(ref _currentConsumerCount);_consumerLimitSemaphore.Release();throw;}}/// <summary>/// 停止指定消费者的消息消费/// </summary>/// <param name="consumerTag">消费者标签</param>/// <returns>异步任务</returns>public async Task StopConsuming(string consumerTag){//从活跃集合中移除消费者if (_activeConsumers.TryRemove(consumerTag, out var consumerInfo)){try{//取消消费者订阅await consumerInfo.Channel.BasicCancelAsync(consumerTag);//异步释放通道资源await consumerInfo.Channel.DisposeAsync();//记录停止成功日志_logger.LogInformation("已停止消费者。消费者标签: {ConsumerTag}", consumerTag);}catch (OperationInterruptedException opEx){//记录操作中断警告日志_logger.LogWarning(opEx, "消费者取消操作被中断。消费者标签: {ConsumerTag}", consumerTag);}catch (Exception ex){//记录停止失败错误日志_logger.LogError(ex, "停止消费者时出错。消费者标签: {ConsumerTag}", consumerTag);throw;}finally{Interlocked.Decrement(ref _currentConsumerCount);_consumerLimitSemaphore.Release();_logger.LogInformation("当前消费者数: {CurrentCount}/{MaxCount}", _currentConsumerCount, _maxConsumerCount);}}else{// 记录未找到消费者警告日志_logger.LogWarning("未找到对应的消费者。消费者标签: {ConsumerTag}", consumerTag);}}/// <summary>/// 停止所有消费者的消息消费/// </summary>/// <returns>异步任务</returns>public async Task StopAllConsuming(){// 遍历所有消费者标签并停止foreach (var consumerTag in _activeConsumers.Keys.ToList()){try{await StopConsuming(consumerTag).ConfigureAwait(false);}catch (Exception ex){// 记录单个消费者停止失败日志,继续停止其他消费者_logger.LogError(ex, "停止消费者时出错。消费者标签: {ConsumerTag}", consumerTag);}}}/// <summary>/// 获取当前消费者状态/// </summary>public ConsumerStatus GetConsumerStatus(){// 创建并返回一个新的 ConsumerStatus 对象,用于封装当前消费者的运行状态return new ConsumerStatus{// 设置当前消费者数量CurrentCount = _currentConsumerCount,// 设置最大消费者数量MaxCount = _maxConsumerCount,// 获取当前所有活跃消费者的标识符列表ActiveConsumers = _activeConsumers.Keys.ToList()};}}}

        案例如下

5.2.消费者接口

using Frame3_DataRepository.RabbitMQRepository.BaseMQ;namespace Frame3_DataRepository.RabbitMQRepository.Consumer
{/// <summary>/// 消费者服务接口/// RabbitMQ/// </summary>public interface IMQConsumerService{/// <summary>/// 消费消息-点对点(Point-to-Point)/// </summary>/// <typeparam name="T">消息类型</typeparam>/// <param name="queueName">要消费的队列名称</param>/// <param name="messageHandler">消息处理委托</param>/// <param name="prefetchCount">预取消息数量,控制消费者负载</param>/// <param name="autoAck">是否自动确认消息,建议设为false实现可靠消费</param>/// <returns>取消令牌源,用于停止消费</returns>/// <exception cref="ArgumentNullException">当必要参数为空时抛出</exception>Task<CancellationTokenSource> StartConsumingByPTPAsync<T>(string queueName, Func<T, Task> messageHandler, bool withDLX = true, ushort prefetchCount = 0, bool autoAck = false) where T : class;/// <summary>/// 消费消息-发布/订阅模式(Pub/Sub)/// </summary>/// <typeparam name="T">消息类型</typeparam>/// <param name="exchangeName">要订阅的Exchange名称</param>/// <param name="messageHandler">消息处理委托</param>/// <param name="prefetchCount">预取消息数量,控制消费者负载</param>/// <param name="autoAck">是否自动确认消息</param>/// <returns>取消令牌源,用于停止消费</returns>Task<CancellationTokenSource> StartConsumingByPubSubAsync<T>(string exchangeName, Func<T, Task> messageHandler, ushort prefetchCount = 0, bool autoAck = false) where T : class;/// <summary>/// 消费消息-路由模式(Routing)/// </summary>/// <typeparam name="T">消息反序列化的目标类型</typeparam>/// <param name="exchangeName">要绑定的 Exchange 名称</param>/// <param name="routingKey">用于绑定队列和 Exchange 的路由键</param>/// <param name="messageHandler">处理接收到的消息的异步回调函数</param>/// <param name="prefetchCount">预取消息数量,默认为0(未使用)</param>/// <param name="autoAck">是否自动确认消息</param>/// <returns>CancellationTokenSource,用于取消消费操作</returns>Task<CancellationTokenSource> StartConsumingByRoutingAsync<T>(string exchangeName, string routingKey, Func<T, Task> messageHandler, ushort prefetchCount = 0, bool autoAck = false) where T : class;/// <summary>/// 消费消息-主题模式(Topic)/// </summary>/// <typeparam name="T">消息反序列化的目标类型</typeparam>/// <param name="exchangeName">要绑定的 Exchange 名称</param>/// <param name="topicPattern">用于绑定队列的 Topic 匹配规则(如 user.*)</param>/// <param name="messageHandler">处理接收到的消息的异步回调函数</param>/// <param name="prefetchCount">预取消息数量,默认为0(未使用)</param>/// <param name="autoAck">是否自动确认消息</param>/// <returns>CancellationTokenSource,用于取消消费操作</returns>Task<CancellationTokenSource> StartConsumingByTopicAsync<T>(string exchangeName, string topicPattern, Func<T, Task> messageHandler, ushort prefetchCount = 0, bool autoAck = false) where T : class;/// <summary>/// 消费消息-请求/响应(RPC)/// </summary>/// <typeparam name="TRequest">请求消息的类型</typeparam>/// <typeparam name="TResponse">响应消息的类型</typeparam>/// <param name="exchangeName">Exchange 名称,通常为空字符串表示默认 Exchange</param>/// <param name="routingKey">用于监听的队列名称(同时也是 routingKey)</param>/// <param name="handler">处理请求并返回响应的异步回调函数</param>/// <param name="prefetchCount">预取消息数量,默认为0(未使用)</param>/// <returns>CancellationTokenSource,用于取消消费操作</returns>Task<CancellationTokenSource> StartConsumingByPRCAsync<TRequest, TResponse>(string exchangeName, string routingKey, Func<TRequest, Task<TResponse>> handler, ushort prefetchCount = 0) where TRequest : class where TResponse : class;/// <summary>/// 停止指定消费者的消息消费/// </summary>/// <param name="consumerTag">消费者标签</param>/// <returns>异步任务</returns>Task StopConsuming(string consumerTag);/// <summary>/// 停止所有消费者的消息消费/// </summary>/// <returns>异步任务</returns>Task StopAllConsuming();/// <summary>/// 获取当前消费者状态/// </summary>/// <returns></returns>ConsumerStatus GetConsumerStatus();}
}

        案例如下

6.注册

         在 Program 或 Startup 中注册队列。

 // 注册 RabbitMQ 连接服务为单例(Singleton)// IRabbitMqClient 是一个接口,代表 RabbitMQ 客户端连接的抽象// RabbitMqClient 是其具体实现类// 单例生命周期意味着在整个应用程序生命周期中只创建一次该实例,所有请求共享同一个实例builder.Services.AddSingleton<IRabbitMqClient, RabbitMqClient>();// 注册 RabbitMQ 消息生产者服务为作用域(Scoped)// IMQProducerService 是用于发送消息的接口// MQProducerService 是其实现类// Scoped 生命周期表示在同一个请求上下文中使用同一个实例(适用于 Web 应用场景)builder.Services.AddScoped<IMQProducerService, MQProducerService>();// 注册 RabbitMQ 消息消费者服务为单例(Singleton)// IMQConsumerService 是用于消费消息(接收并处理消息)的接口// MQConsumerService 是其实现类// 使用 Singleton 是因为消费者通常需要长时间运行、持续监听队列,适合整个应用周期内保持一个实例builder.Services.AddSingleton<IMQConsumerService, MQConsumerService>();

         案例如下

7.简单使用案例

        下面是 实现、接口和控制器的使用案例

        7.1.实现

using Frame1_Service.IService.Product;
using Frame2_DataModel.Entity.Products;
using Frame3_DataRepository.RabbitMQRepository.Consumer;
using Frame3_DataRepository.RabbitMQRepository.Producer;
using Frame6_LibraryUtility;
using RabbitMQ.Client.Exceptions;namespace Frame1_Service.Service.Product
{public class RabbitMQTestSvr : BaseService, IRabbitMQTestSvr{/// <summary>/// 生产者/// </summary>private readonly IMQProducerService _iRabbitMQProducer;/// <summary>/// 消费者/// </summary>private readonly IMQConsumerService _iRabbitMQConsumer;/// <summary>/// 构造/// </summary>/// <param name="iRabbitMQProducer"></param>/// <param name="iRabbitMQConsumer"></param>public RabbitMQTestSvr(IMQProducerService iRabbitMQProducer, IMQConsumerService iRabbitMQConsumer){_iRabbitMQConsumer = iRabbitMQConsumer;_iRabbitMQProducer = iRabbitMQProducer;}/// <summary>/// 模拟消费逻辑/// </summary>/// <param name="model"></param>/// <returns></returns>private async Task ProcessOrderAsync(ProductsEntity model){Console.WriteLine("消费成功:" + model.ToJson());}/// <summary>/// 生产者-点对点(Point-to-Point)/// </summary>/// <param name="model"></param>/// <returns></returns>public async Task<ResultModel<bool>> ProducerTest(ProductsEntity model){var result = new ResultModel<bool>() { Data = false };// 创建 Random 实例Random random = new Random();model.Id = Guid.NewGuid().ToString();model.ProductName = "测试" + (random.Next(1, 999)).ToString();model.Price = random.Next(1000, 9999);model.Stock = random.Next(1, 99);await _iRabbitMQProducer.PublishByPTPAsync<ProductsEntity>("ProducerTest", model);result.Code = ResultCodeEnum.Success;result.Msg = "操作成功";result.Data = true;return result;}/// <summary>/// 消费者-点对点(Point-to-Point)/// </summary>/// <returns></returns>public async Task<ResultModel<bool>> ConsumerTest(){var result = new ResultModel<bool>() { Data = false };await _iRabbitMQConsumer.StartConsumingByPTPAsync<ProductsEntity>("ProducerTest", ProcessOrderAsync);result.Code = ResultCodeEnum.Success;result.Msg = "操作成功";result.Data = true;return result;}/// <summary>/// 生产者-发布订阅(Pub/Sub)/// </summary>/// <param name="model"></param>/// <returns></returns>public async Task<ResultModel<bool>> ProducerPubSub(ProductsEntity model){var result = new ResultModel<bool>() { Data = false };// 创建 Random 实例Random random = new Random();model.Id = Guid.NewGuid().ToString();model.ProductName = "测试" + (random.Next(1, 999)).ToString();model.Price = random.Next(1000, 9999);model.Stock = random.Next(1, 99);await _iRabbitMQProducer.PublishByPubSubAsync<ProductsEntity>("PubSubTest", model);result.Code = ResultCodeEnum.Success;result.Msg = "操作成功";result.Data = true;return result;}/// <summary>/// 消费者-发布订阅(Pub/Sub)/// </summary>/// <returns></returns>public async Task<ResultModel<bool>> ConsumerPubSub(){var result = new ResultModel<bool>() { Data = false };await _iRabbitMQConsumer.StartConsumingByPubSubAsync<ProductsEntity>("PubSubTest", ProcessOrderAsync);result.Code = ResultCodeEnum.Success;result.Msg = "操作成功";result.Data = true;return result;}/// <summary>/// 生产者-路由模式(Routing)/// </summary>/// <param name="model"></param>/// <returns></returns>public async Task<ResultModel<bool>> ProducerRouting(ProductsEntity model){var result = new ResultModel<bool>() { Data = false };// 创建 Random 实例Random random = new Random();model.Id = Guid.NewGuid().ToString();model.ProductName = "测试" + (random.Next(1, 999)).ToString();model.Price = random.Next(1000, 9999);model.Stock = random.Next(1, 99);await _iRabbitMQProducer.PublishByRoutingAsync<ProductsEntity>("RoutingTest", "Routing", model);result.Code = ResultCodeEnum.Success;result.Msg = "操作成功";result.Data = true;return result;}/// <summary>/// 消费者-路由模式(Routing)/// </summary>/// <returns></returns>public async Task<ResultModel<bool>> ConsumerRouting(){var result = new ResultModel<bool>() { Data = false };await _iRabbitMQConsumer.StartConsumingByRoutingAsync<ProductsEntity>("RoutingTest", "Routing", ProcessOrderAsync);result.Code = ResultCodeEnum.Success;result.Msg = "操作成功";result.Data = true;return result;}/// <summary>/// 生产者-主题模式(Topic)/// </summary>/// <param name="model"></param>/// <returns></returns>public async Task<ResultModel<bool>> ProducerTopic(ProductsEntity model){var result = new ResultModel<bool>() { Data = false };// 创建 Random 实例Random random = new Random();model.Id = Guid.NewGuid().ToString();model.ProductName = "测试" + (random.Next(1, 999)).ToString();model.Price = random.Next(1000, 9999);model.Stock = random.Next(1, 99);await _iRabbitMQProducer.PublishByTopicAsync<ProductsEntity>("TopicTest", "Topic.test", model);result.Code = ResultCodeEnum.Success;result.Msg = "操作成功";result.Data = true;return result;}/// <summary>/// 消费者-主题模式(Topic)/// </summary>/// <returns></returns>public async Task<ResultModel<bool>> ConsumerTopic(){var result = new ResultModel<bool>() { Data = false };await _iRabbitMQConsumer.StartConsumingByTopicAsync<ProductsEntity>("TopicTest", "Topic.*", ProcessOrderAsync);result.Code = ResultCodeEnum.Success;result.Msg = "操作成功";result.Data = true;return result;}/// <summary>/// 生产者-请求响应模式(RPC)/// </summary>/// <param name="model"></param>/// <returns></returns>public async Task<ResultModel<CalculateResponse>> ProducerRPC(ProductsEntity model){var result = new ResultModel<CalculateResponse>();var request = new CalculateRequest { X = 5, Y = 7 };// 创建 Random 实例Random random = new Random();model.Id = Guid.NewGuid().ToString();model.ProductName = "测试" + (random.Next(1, 999)).ToString();model.Price = random.Next(1000, 9999);model.Stock = random.Next(1, 99);var response = await _iRabbitMQProducer.PublishByPRCAsync<CalculateRequest, CalculateResponse>("RPCTest", "RPC", request);result.Code = ResultCodeEnum.Success;result.Msg = "操作成功";result.Data = response;return result;}/// <summary>/// 消费者-请求响应模式(RPC)/// </summary>/// <returns></returns>public async Task<ResultModel<bool>> ConsumerRPC(){var result = new ResultModel<bool> { Data = false };try{//示例:模拟一个计算器服务(可替换为真实的 ICalculatorService)Func<CalculateRequest, Task<CalculateResponse>> handler = async req =>{await Task.Delay(10); //模拟异步处理return new CalculateResponse { Result = req.X + req.Y };};//启动消费者var cts = await _iRabbitMQConsumer.StartConsumingByPRCAsync<CalculateRequest, CalculateResponse>(exchangeName: "RPCTest", routingKey: "RPC", handler);result.Data = true;result.Code = ResultCodeEnum.Success;result.Msg = "RPC消费者已启动";}catch (OperationInterruptedException ex){result.Msg = "消息队列服务不可用";}catch (Exception ex){result.Msg = "消费者初始化失败";}return result;}/// <summary>/// 死信队列重抛/// </summary>/// <returns></returns>public async Task<ResultModel<bool>> Republish(string queueName){var result = new ResultModel<bool>() { Data = false };await _iRabbitMQProducer.RepublishDeadLetterMessagesAsync<ProductsEntity>(queueName);result.Code = ResultCodeEnum.Success;result.Msg = "操作成功";result.Data = true;return result;}/// <summary>/// 停止消费者/// </summary>/// <returns></returns>public async Task<ResultModel<bool>> StopAllConsumer(string consumerTag){var result = new ResultModel<bool>() { Data = false };if (consumerTag.Equals("0")){await _iRabbitMQConsumer.StopAllConsuming();}else{await _iRabbitMQConsumer.StopConsuming(consumerTag);}result.Code = ResultCodeEnum.Success;result.Msg = "操作成功";result.Data = true;return result;}}
}

        案例如下

 

        7.2.接口

using Frame2_DataModel.Entity.Products;
using Frame6_LibraryUtility;namespace Frame1_Service.IService.Product
{public interface IRabbitMQTestSvr{/// <summary>/// 生产者-点对点(Point-to-Point)/// </summary>/// <param name="model"></param>/// <returns></returns>Task<ResultModel<bool>> ProducerTest(ProductsEntity model);/// <summary>/// 消费者-点对点(Point-to-Point)/// </summary>/// <returns></returns>Task<ResultModel<bool>> ConsumerTest();/// <summary>/// 生产者-发布订阅(Pub/Sub)/// </summary>/// <param name="model"></param>/// <returns></returns>Task<ResultModel<bool>> ProducerPubSub(ProductsEntity model);/// <summary>/// 消费者-发布订阅(Pub/Sub)/// </summary>/// <returns></returns>Task<ResultModel<bool>> ConsumerPubSub();/// <summary>/// 生产者-路由模式(Routing)/// </summary>/// <param name="model"></param>/// <returns></returns>Task<ResultModel<bool>> ProducerRouting(ProductsEntity model);/// <summary>/// 消费者-路由模式(Routing)/// </summary>/// <returns></returns>Task<ResultModel<bool>> ConsumerRouting();/// <summary>/// 生产者-主题模式(Topic)/// </summary>/// <param name="model"></param>/// <returns></returns>Task<ResultModel<bool>> ProducerTopic(ProductsEntity model);/// <summary>/// 消费者-主题模式(Topic)/// </summary>/// <returns></returns>Task<ResultModel<bool>> ConsumerTopic();/// <summary>/// 生产者-请求响应模式(RPC)/// </summary>/// <param name="model"></param>/// <returns></returns>Task<ResultModel<CalculateResponse>> ProducerRPC(ProductsEntity model);/// <summary>/// 消费者-请求响应模式(RPC)/// </summary>/// <returns></returns>Task<ResultModel<bool>> ConsumerRPC();/// <summary>/// 死信队列重抛/// </summary>/// <returns></returns>Task<ResultModel<bool>> Republish(string queueName);/// <summary>/// 停止消费者/// </summary>/// <returns></returns>Task<ResultModel<bool>> StopAllConsumer(string consumerTag);}
}

        案例如下

        7.3.控制器

using Frame1_Service.IService.Product;
using Frame1_Service.Service.Product;
using Frame2_DataModel.Entity.Products;
using Frame4_LibraryCore.BaseConfig;
using Frame6_LibraryUtility;
using Microsoft.AspNetCore.Mvc;namespace DemoAPI.Controllers
{/// <summary>/// 消息队列控制器 -RabbitMQ/// </summary>//[Authorize]// 保护整个控制器[Route("api/[controller]/[action]")]//标记路由地址规格[ApiController] // 标记该类为 API 控制器,启用一些默认的行为,如模型绑定、输入验证等[ApiExplorerSettings(GroupName = nameof(ApiVersionInfo.V1))]//设置控制器的API版本public class RabbitMQTestController : BaseController{private IRabbitMQTestSvr _iRabbitMQTestSvr;/// <summary>/// 构造/// </summary>/// <param name="iRabbitMQTestSvr"></param>public RabbitMQTestController(IRabbitMQTestSvr iRabbitMQTestSvr) {_iRabbitMQTestSvr = iRabbitMQTestSvr;}/// <summary>/// 生产者-点对点(Point-to-Point)/// </summary>/// <param name="model"></param>/// <returns></returns>[HttpPost]public async Task<ResultModel<bool>> ProducerTest(ProductsEntity model) => await _iRabbitMQTestSvr.ProducerTest(model);/// <summary>/// 消费者-点对点(Point-to-Point)/// </summary>/// <returns></returns>[HttpGet]public async Task<ResultModel<bool>> ConsumerTest() => await _iRabbitMQTestSvr.ConsumerTest();/// <summary>/// 生产者-发布订阅(Pub/Sub)/// </summary>/// <param name="model"></param>/// <returns></returns>[HttpPost]public async Task<ResultModel<bool>> ProducerPubSub(ProductsEntity model) => await _iRabbitMQTestSvr.ProducerPubSub(model);/// <summary>/// 消费者-发布订阅(Pub/Sub)/// </summary>/// <returns></returns>[HttpGet]public async Task<ResultModel<bool>> ConsumerPubSub() => await _iRabbitMQTestSvr.ConsumerPubSub();/// <summary>/// 生产者-路由模式(Routing)/// </summary>/// <param name="model"></param>/// <returns></returns>[HttpPost]public async Task<ResultModel<bool>> ProducerRouting(ProductsEntity model) => await _iRabbitMQTestSvr.ProducerRouting(model);/// <summary>/// 消费者-路由模式(Routing)/// </summary>/// <returns></returns>[HttpGet]public async Task<ResultModel<bool>> ConsumerRouting() => await _iRabbitMQTestSvr.ConsumerRouting();/// <summary>/// 生产者-主题模式(Topic)/// </summary>/// <param name="model"></param>/// <returns></returns>[HttpPost]public async Task<ResultModel<bool>> ProducerTopic(ProductsEntity model) => await _iRabbitMQTestSvr.ProducerTopic(model);/// <summary>/// 消费者-主题模式(Topic)/// </summary>/// <returns></returns>[HttpGet]public async Task<ResultModel<bool>> ConsumerTopic() => await _iRabbitMQTestSvr.ConsumerTopic();/// <summary>/// 生产者-请求响应模式(RPC)/// </summary>/// <param name="model"></param>/// <returns></returns>[HttpPost]public async Task<ResultModel<CalculateResponse>> ProducerRPC(ProductsEntity model) => await _iRabbitMQTestSvr.ProducerRPC(model);/// <summary>/// 消费者-请求响应模式(RPC)/// </summary>/// <returns></returns>[HttpGet]public async Task<ResultModel<bool>> ConsumerRPC() => await _iRabbitMQTestSvr.ConsumerRPC();/// <summary>/// 死信队列重抛/// </summary>/// <returns></returns>[HttpGet]public async Task<ResultModel<bool>> Republish(string queueName) => await _iRabbitMQTestSvr.Republish(queueName);/// <summary>/// 停止消费者/// </summary>/// <returns></returns>[HttpGet]public async Task<ResultModel<bool>> StopAllConsumer(string consumerTag) => await _iRabbitMQTestSvr.StopAllConsumer(consumerTag);}
}

        案例如下

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/89515.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/89515.shtml
英文地址,请注明出处:http://en.pswp.cn/bicheng/89515.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Apache SeaTunnel配置使用案例

前置操作 Apache SeaTunnel详解与部署&#xff08;最新版本2.3.11&#xff09;-CSDN博客 mkdir /usr/local/soft/apache-seatunnel-2.3.11/job/ 一、MySQL to HDFS 官方配置参考&#xff1a; MySQL | Apache SeaTunnel Hdfs文件 | Apache SeaTunnel 1、配置确认 将mysq…

GitCode 使用高频问题及解决方案

GitCode 作为一款强大的版本控制系统&#xff0c;在软件开发流程中起着举足轻重的作用。然而&#xff0c;在使用过程中&#xff0c;开发者们常常会遇到各种各样的问题。本文将汇总 GitCode 使用中的高频问题&#xff0c;并提供详细的解决方案&#xff0c;帮助开发者们更顺畅地使…

在FreeBSD系统使用chroot进入Ubuntu仿真环境使用Localsend软件发送和接受文件

LocalSend是一款非常实用的在不同系统&#xff08;Windows、MacOS、Linux、Android和IOS&#xff09;传递文件的程序。我们这次的实践&#xff0c;就是要在FreeBSD下也能发送和接收文件。 安装LocalSend 跟在Ubuntu下安装非常类似&#xff0c;只是不需要下面的第一步&#xf…

交叉熵损失F.cross_entropy在分类模型中的应用

一、核心思想&#xff1a;通过概率分布惩罚错误交叉熵损失的本质是&#xff1a; 比较模型预测的概率分布 vs 真实标签的概率分布&#xff0c;惩罚两者之间的差异。例如&#xff1a;真实标签&#xff1a;图像 0 → 文本 0&#xff08;独热编码 [1, 0, 0, ...]&#xff09;模型预…

测试学习之——Pytest Day3

引言Pytest 作为 Python 中最受欢迎的测试框架之一&#xff0c;以其简洁的语法、强大的功能和丰富的插件生态系统&#xff0c;极大地提升了自动化测试的效率和可维护性。在本文中&#xff0c;我们将深入探讨 Pytest 的两大核心特性&#xff1a;Fixture 和插件管理&#xff0c;帮…

控制Vue对话框显示隐藏

正确做法 — 使用 Vue 数据驱动控制显隐你不需要手动设置 display: block&#xff0c;因为 Element Plus 的 <el-dialog> 是基于 v-model 或 :visible.sync 控制的。&#x1f527; 修改模板部分&#xff1a;将原来的&#xff1a;<el-dialog title"报文详情"…

直播带货与开源AI智能名片链动2+1模式S2B2C商城小程序:重塑电商营销新格局

摘要&#xff1a;本文聚焦于直播带货对互联网供需关系的深刻影响&#xff0c;分析其如何改变传统电商营销模式&#xff0c;实现从“人找货”到“货找人”的转变。同时&#xff0c;引入开源AI智能名片链动21模式S2B2C商城小程序这一创新概念&#xff0c;探讨其在直播带货背景下的…

Jmeter 性能测试响应时间过长怎么办?

当 JMeter 性能测试中出现 响应时间过长 的问题时&#xff0c;需要从 测试脚本、服务器、网络、JMeter配置 等多方面排查和优化。以下是详细的解决步骤和思路&#xff1a; B站最新性能进阶&#xff0c;学会这些jmeter性能测试技能&#xff0c;更助于正确设计、执行和分析性能测…

COZE官方文档基础知识解读第三期 —— prompt(提示词)

COZE官方文档基础知识解读第三期 —— prompt&#xff08;提示词&#xff09; 对于初步接触PE&#xff08;prompt engineering&#xff09; 的小伙伴们&#xff0c;你们可以去火山方舟提供的prompt工具&#xff0c;用工具&#xff08;其余的prompt网站https://www.promptinggu…

代码随想录算法训练营第三十二天|动态规划理论基础、LeetCode 509. 斐波那契数、70. 爬楼梯、746. 使用最小花费爬楼梯

目录 LeetCode 509. 斐波那契数 70. 爬楼梯 746. 使用最小花费爬楼梯 感想 文档讲解&#xff1a;代码随想录 动态规划&#xff0c;英文&#xff1a;Dynamic Programming&#xff0c;简称DP&#xff0c;如果某一问题有很多重叠子问题&#xff0c;使用动态规划是最有效的。 …

SpringMVC3

一、JSON 与参数传递1.1JSON 是什么- JSON 是字符串&#xff1a;比如 {"name":"zhangsan","password":"123456","age":15} 就是一个 JSON 字符串&#xff0c;它用来在前后端、服务间传递数据。- JSON 库&#xff1a;Fastj…

查看.bin二进制文件的方式(HxD十六进制编辑器的安装)

文章目录Windows 系统上安装 HxD 十六进制编辑器的步骤。**HxD 是一款免费、轻量级的工具&#xff0c;适合查看和编辑 .bin 等二进制文件。****PS:实际安装过程中会发现找不到Windows11的版本&#xff0c;安装windows10的即可&#xff0c;并且没有区别setup版和portable版**安装…

Linux系统性能优化与监控

系统性能优化与监控是保障 Linux 服务器稳定运行的核心技术&#xff0c;涉及 ​​CPU、内存、磁盘 I/O、网络、进程​​ 等多维度的指标分析、问题定位与优化策略。以下从​​监控工具与指标​​、​​常见问题诊断​​、​​优化方法​​三个层面详细讲解&#xff0c;并结合​…

如何在 React + TypeScript 中实现 JSON 格式化功能

如何在 React TypeScript 中实现 JSON 格式化功能 作为前端开发者&#xff0c;我们经常需要处理 JSON 数据。无论是 API 调试、配置文件编辑还是数据转换&#xff0c;能够格式化 JSON 是一项基本但非常有用的技能。本文将详细介绍如何在 React 和 TypeScript 环境中实现 JSON…

Mac连接服务器Docker容器全攻略

苹果电脑( macOS 系统 )连接服务器、配置容器,整体思路和 Linux 终端操作更贴近,以下结合 macOS 特点,详细分步说明,以 Docker 容器 + 常见 Linux 服务器( 如 CentOS、Ubuntu )为例: 一、连接服务器(SSH 方式, macOS 终端原生支持 ) 1. 准备信息 找运维或云平台…

【字节跳动】数据挖掘面试题0019:带货直播间推荐:现在有一个带货的直播间,怎么把它精准地推送给有需要的用户

文章大纲 带货直播间推荐系统:原理、算法与实践 一、推荐系统在带货直播中的重要性 二、数据收集与处理 1. 用户数据 2. 直播间数据 3. 用户行为数据 4. 数据处理与特征工程 三、推荐算法实现 1. 基于内容的推荐 2. 基于协同过滤的推荐 3. 基于知识图谱的推荐 4. 混合推荐算法…

Windows10笔记本电脑开启BIOS

文章目录什么是BIOS一、方案一&#xff1a;快捷键进入二、方案二&#xff08;推荐&#xff09;各品牌快捷键大全什么是BIOS BIOS 全拼为 BasicInputOutputSystem, 即基本输入/输出系统,是计算机中非常基础而且重要的程序。把这一段程序存放在一个不需要电源的记忆体(芯片)中,就…

NFS、iSCSI 和lnmp部署操作

目录 &#xff08;一&#xff09;基础配置 1.NFS服务安装 2.修改配置文件 3.重载配置文件 4.查看共享目录 5.客户端挂载 6.更换共享目录 7.基础实验 &#xff08;二&#xff09;布置lnmp平台 1.php 安装软件 检测 2.连接MySQL 测试 3.软件实施 软件安装配置 &…

Redis深度解析:从缓存原理到高并发实战

第一部分&#xff1a;Redis核心概念与架构设计1.1 Redis本质解析Redis&#xff08;Remote Dictionary Server&#xff09;作为开源的内存数据结构存储系统&#xff0c;其核心价值在于&#xff1a;内存优先架构&#xff1a;数据主要存储在内存中&#xff0c;读写性能达到10万 QP…

【NLP舆情分析】基于python微博舆情分析可视化系统(flask+pandas+echarts) 视频教程 - 微博类别信息爬取

大家好&#xff0c;我是java1234_小锋老师&#xff0c;最近写了一套【NLP舆情分析】基于python微博舆情分析可视化系统(flaskpandasecharts)视频教程&#xff0c;持续更新中&#xff0c;计划月底更新完&#xff0c;感谢支持。今天讲解架构搭建 视频在线地址&#xff1a; 2026…