文章目录

  • 1. 前言
  • 2. BrokerOuterAPI
    • 2.1 NettyRemotingClient
    • 2.2 start 启动
      • 2.2.1 NettyRemotingClient#start
  • 3. NettyRemotingServer
    • 3.1 ClientHousekeepingService
    • 3.2 ProducerManager#doChannelCloseEvent
    • 3.3 ConsumerManager#doChannelCloseEvent
      • 3.3.1 DefaultConsumerIdsChangeListener#handle
      • 3.3.2 ConsumerFilterManager#unRegister
      • 3.3.3 Broker2Client#notifyConsumerIdsChanged
    • 3.4 FilterServerManager#doChannelCloseEvent
  • 4. 小结


本文章基于 RocketMQ 4.9.3

1. 前言

  • 【RocketMQ】- 源码系列目录
  • 【RocketMQ NameServer】- NameServer 启动源码

在前面的 NameServer 启动源码,我们探讨了 NettyRemotingServer 的启动源码,NameServer 作为服务端通过 NettyRemotingServer 处理 Broker 上报的信息,当然了 NettyRemotingServer 不单单是 NameServer 独有的,Broker 既可以作为服务端和生产者、消费者通信,又可以作为客户端和 NameServer 通信。

Broker 作为客户端通信时,将一些方法比如 fetchNameServerAddr 拉取 NameServer 地址、注册 Broker 信息到 NameServer、取消注册 Broker 信息都封装到了 BrokerOuterAPI 中。


2. BrokerOuterAPI

在这里插入图片描述BrokerOuterAPI 在 BrokerController 构造器中被初始化,可以看到传入的就是 NettyClientConfig,也就是 Netty 客户端的配置,下面就来看下构造器的初始化。

public BrokerOuterAPI(final NettyClientConfig nettyClientConfig) {this(nettyClientConfig, null);
}public BrokerOuterAPI(final NettyClientConfig nettyClientConfig, RPCHook rpcHook) {this.remotingClient = new NettyRemotingClient(nettyClientConfig);this.remotingClient.registerRPCHook(rpcHook);
}

可以看到 BrokerOuterAPI 的构造器就是初始化了 NettyRemotingClient 这个客户端通信类,同时注册进去的 RPC 钩子为 null,也就是在跟 NameServer 通信的前后不会调用 RPC 钩子的前置和后置方法,所以核心还是 NettyRemotingClient。


2.1 NettyRemotingClient

public NettyRemotingClient(final NettyClientConfig nettyClientConfig) {this(nettyClientConfig, null);
}public NettyRemotingClient(final NettyClientConfig nettyClientConfig,final ChannelEventListener channelEventListener) {// 首先设置服务器单向、异步发送请求的信号量, 这个是为了防止同一时间发送太多单向请求或者异步请求, 默认都是 65536super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());// 客户端配置this.nettyClientConfig = nettyClientConfig;// 连接事件监听器this.channelEventListener = channelEventListener;// 公开线程池 publicExecutor 的核心线程数, 默认就是 4int publicThreadNums = nettyClientConfig.getClientCallbackExecutorThreads();if (publicThreadNums <= 0) {publicThreadNums = 4;}// 创建一个默认线程数为 4 的线程池, 这个线程池可以用于处理请求回调this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet());}});// 单线程 I/O 事件处理器(线程数 1)this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));}});if (nettyClientConfig.isUseTLS()) {try {sslContext = TlsHelper.buildSslContext(true);log.info("SSL enabled for client");} catch (IOException e) {log.error("Failed to create SSLContext", e);} catch (CertificateException e) {log.error("Failed to create SSLContext", e);throw new RuntimeException("Failed to create SSLContext", e);}}
}

这里的逻辑跟 NettyRemotingServer 差不多,只是作为客户端不需要创建 Accept 事件 Reactor 组,也就是 bossGroup。


2.2 start 启动

创建出 BrokerOuterAPI 之后,会在 BrokerController#start 方法中启动。
在这里插入图片描述
在这里插入图片描述
这里面也是启动了 remotingClient,所以下面就来看下 NettyRemotingClient 的 start 方法。


2.2.1 NettyRemotingClient#start

/*** 客户端 Netty 服务启动*/
@Override
public void start() {// 创建默认的事件处理组, 专门用于处理一些执行前的编解码、连接空闲状态检测、网络连接管理等操作, 不管是接收到消息还是发送消息(入站或者出站处理器)// 都可以通过这个线程池去做处理, 这样就能避免 I/O 线程浪费资源在这些状态检测上, 这里默认创建的是 4 个线程大小的线程池this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyClientConfig.getClientWorkerThreads(),new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {// 创建出来的线程是 NettyClientWorkerThread_0、NettyClientWorkerThread_1 ...return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());}});// I/O 事件处理器(线程数 1)Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)// 禁用 Nagle 算法, Nagle 算法是说当应用程序发送的数据量较小时,TCP 不会立即将这些数据发送出去,而是会等待一段时间,将后续的数据合并// 成一个较大的数据包后再发送。这样可以减少网络上的数据包数量,降低网络拥塞的可能性,提高传输效率.option(ChannelOption.TCP_NODELAY, true)// 用于 TCP 连接心跳检测, 检测 TCP 连接是否活跃, 这里可能是 Netty 自己已经存在更精细的 IdleStateHandler 处理器就行了.option(ChannelOption.SO_KEEPALIVE, false)// 连接超时时间.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()).handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();if (nettyClientConfig.isUseTLS()) {if (null != sslContext) {pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));log.info("Prepend SSL handler");} else {log.warn("Connections are insecure as SSLContext is null!");}}pipeline.addLast(defaultEventExecutorGroup,// 编码处理器new NettyEncoder(),// 解码处理器new NettyDecoder(),// 维护心跳连接的处理器, 指定的三个参数是值读空闲时间、写空闲时间、总空闲时间, 如果服务端一定时间内没有读写就会出发不同的事件,这三个参数对应的// 事件分别是: IdleStateEvent.READER_IDLE、IdleStateEvent.WRITER_IDLE、IdleStateEvent.ALL_IDLE, 总之这玩意就是用来检测有没有长时间不// 读写的, 这样可以判断一个连接是不是空闲连接new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),// 连接管理器, 主要是管理连接的活跃、非活跃、注册、取消注册 ... 事件new NettyConnectManageHandler(),// 客户端业务处理器new NettyClientHandler());}});// 设置 Netty 客户端的消息接收缓冲区, 默认是 0if (nettyClientConfig.getClientSocketSndBufSize() > 0) {log.info("client set SO_SNDBUF to {}", nettyClientConfig.getClientSocketSndBufSize());handler.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize());}// 设置 Netty 客户端的消息发送缓冲区, 默认也是 0if (nettyClientConfig.getClientSocketRcvBufSize() > 0) {log.info("client set SO_RCVBUF to {}", nettyClientConfig.getClientSocketRcvBufSize());handler.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize());}// 写缓冲区的低水位和高水位if (nettyClientConfig.getWriteBufferLowWaterMark() > 0 && nettyClientConfig.getWriteBufferHighWaterMark() > 0) {log.info("client set netty WRITE_BUFFER_WATER_MARK to {},{}",nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark());handler.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark()));}// 定时任务, 初始化之后 3s 执行, 之后 1s 执行一次this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {// 扫描 responseTable, 将超时的 ResponseFuture 删掉, 然后执行回调逻辑NettyRemotingClient.this.scanResponseTable();} catch (Throwable e) {log.error("scanResponseTable exception", e);}}}, 1000 * 3, 1000);// 如果连接监听器设置了, 就启动 NettyEventExecutor 去处理 NettyEventif (this.channelEventListener != null) {// broker 启动的时候是不会设置连接监听器的, 生产者和消费者启动才会设置this.nettyEventExecutor.start();}
}

这里的方法跟 NettyRemotingServer 也是差不多的,首先是创建 defaultEventExecutorGroup 默认的事件处理组,专门用于处理一些执行前的编解码、连接空闲状态检测、网络连接管理等操作,不管是接收到消息还是发送消息(入站或者出站处理器) 都可以通过这个线程池去做处理,这样就能避免 I/O 线程浪费资源在这些状态检测上, 这里默认创建的是 4 个线程大小的线程池。

接下来就是创建 Bootstrap,因为客户端不涉及处理 Accept 连接事件,所以只需要设置从 Reactor 组就可以,不过这里面的 Reactor 线程组大小是 1,所以是单线程处理 IO 读写事件?很久没看了也不是很清楚。

接着往下看,下面 initChannel 方法中往 Channel 的 Pipeline 里面设置的处理器包括编码处理器,解码处理器,心跳连接处理器,连接管理器和客户端业务处理器。参看 NettyRemotingServer 的处理器。
在这里插入图片描述
可以看到的是跟 NettyRemotingServer 对比,入站处理器就少了一个 HandshakeHandler,因为不需要处理连接事件,同时最后的 NettyServerHandler 换成了 NettyClientHandler。出站处理器则是一样的。
在这里插入图片描述
最后就是启动定时任务,初始化之后 3s 执行, 之后 1s 执行一次,扫描 responseTable,将超时的 ResponseFuture 删掉,然后执行回调逻辑。

由于 BrokerOuterAPI 并没有设置 channelEventListener,所以就不会启动 nettyEventExecutor,broker 启动的时候是不会设置连接监听器的, 生产者和消费者启动才会设置,关于 nettyEventExecutor 可以看这篇文章:【RocketMQ NameServer】- NettyEventExecutor 处理 Netty 事件。


3. NettyRemotingServer

在第二小节我们说了,Broker 可以作为服务端跟生产者、消费者、过滤服务通信,也可以作为客户端向 NameServer 发送请求,作为服务端,Broker 会在 initialize 初始化的时候创建出 fastRemotingServer 和 remotingServer。
在这里插入图片描述
fastRemotingServer 和 remotingServer 都是作为 Netty 服务端通信用的,只是一个监听 10909,一个监听 10911,fastRemotingServer 所建立的连接通道也是 VIP 通道,VIP 意思就是性能比较高的,一般都是用于内部使用,比如超时同步加锁请求、获取配置等等,而 remotingServer 监听端口 10911,专门处理来自生产者和消费者的请求,比如消息发送、消息拉取,因为生产者和消费者的交互是很频繁的,所以自热而然请求的性能就会慢一点。

当然了,创建 NettyRemotingServer 和启动 NettyRemotingServer 的流程在 NameServer 启动的文章中已经有讲解过源码,在 NettyRemotingServer 启动的源码中,我们也知道了 NettyRemotingServer 启动的时候会启动 nettyEventExecutor 线程,专门从 eventQueue 获取不同的连接事件然后做不同的处理,关于连接事件 NettyEvent,后面我也写了一篇文章 【RocketMQ NameServer】- NettyEventExecutor 处理 Netty 事件 来讲述 NettyEventExecutor 是如何处理 Netty 事件的,NettyEvent 又是怎么来的。

当时写上面这篇文章的时候可以看到 NettyEventExecutor#run 方法实际上是调用了 BrokerHousekeepingService 去处理连接事件,而 broker 的 remotingServer 看源码也能看到是传入了 clientHousekeepingService 去处理生产者和消费者的连接事件的,所以这里我们就来看下 clientHousekeepingService 这个类的逻辑。


3.1 ClientHousekeepingService

其实看名字就能看出来了,NameServer 通过 BrokerHousekeepingService 是专门处理 Broker 的连接事件,而 Broker 通过 ClientHousekeepingService 是专门处理生产者、消费者、过滤服务(已废弃)的连接通道。

【RocketMQ NameServer】- NettyEventExecutor 处理 Netty 事件 这篇文章我们也知道了

  • NettyEventExecutor 会调用 ClientHousekeepingService#onChannelIdle 处理连接空闲事件(IDLE)
  • NettyEventExecutor 会调用 ClientHousekeepingService#onChannelClose 处理连接关闭事件(CLOSE)
  • NettyEventExecutor 会调用 ClientHousekeepingService#onChannelConnect 处理生产者和消费者连接事件(CONNECT)
  • NettyEventExecutor 会调用 ClientHousekeepingService#onChannelException 处理连接异常事件(EXCEPTION)

那下面来看下这几个方法,由于源码不多,就直接贴出来。

@Override
public void onChannelConnect(String remoteAddr, Channel channel) {}/*** 连接关闭* @param remoteAddr* @param channel*/
@Override
public void onChannelClose(String remoteAddr, Channel channel) {this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
}/*** 连接处理发生异常* @param remoteAddr* @param channel*/
@Override
public void onChannelException(String remoteAddr, Channel channel) {this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
}/*** 空闲连接处理* @param remoteAddr* @param channel*/
@Override
public void onChannelIdle(String remoteAddr, Channel channel) {this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
}

可以看到,onChannelConnect 方法是一个空实现,应该是因为生产者和消费者都会定时上报心跳给 broker,所以连接不需要额外处理。

然后其他三个类型的事件都是调用同一个方法 doChannelCloseEvent 去处理,只是生产者、消费者、过滤服务器的逻辑不同,下面就一个一个来看下这里面的处理逻辑。


3.2 ProducerManager#doChannelCloseEvent

/*** broker 关闭和生产者的连接* @param remoteAddr* @param channel*/
public synchronized void doChannelCloseEvent(final String remoteAddr, final Channel channel) {if (channel != null) {// 遍历所有生产者连接for (final Map.Entry<String, ConcurrentHashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable.entrySet()) {// 生产者组final String group = entry.getKey();// 生产者组下面的生产者连接final ConcurrentHashMap<Channel, ClientChannelInfo> clientChannelInfoTable =entry.getValue();// 删除这个连接final ClientChannelInfo clientChannelInfo =clientChannelInfoTable.remove(channel);if (clientChannelInfo != null) {// 删除成功之后, 将这个生产者的客户端 ID 也从 clientChannelTable 集合中删掉clientChannelTable.remove(clientChannelInfo.getClientId());log.info("NETTY EVENT: remove channel[{}][{}] from ProducerManager groupChannelTable, producer group: {}",clientChannelInfo.toString(), remoteAddr, group);}}}
}

生产者管理类 ProducerManager#groupChannelTable 集合会存储生产者组下面的所有生产者连接,clientChannelTable 存储了【客户端 ID -> 连接】映射关系。

/*** 生产者组 -> 生产者组下面的生产者连接*/
private final ConcurrentHashMap<String /* group name */, ConcurrentHashMap<Channel, ClientChannelInfo>> groupChannelTable =new ConcurrentHashMap<>();
/*** 客户端 ID -> 生产者连接*/
private final ConcurrentHashMap<String, Channel> clientChannelTable = new ConcurrentHashMap<>();

上面的 doChannelCloseEvent 方法就是在处理这两个集合,由于参数只有一个 channel,所以不知道是哪个生产者组下面的,就需要遍历所有生产者组,一个一个去判断删除。


3.3 ConsumerManager#doChannelCloseEvent

/*** 关闭消费者连接* @param remoteAddr* @param channel*/
public void doChannelCloseEvent(final String remoteAddr, final Channel channel) {// 遍历消费者组下面的消费者Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();while (it.hasNext()) {Entry<String, ConsumerGroupInfo> next = it.next();// 获取消费者组消息, 里面包括这个消费者组的订阅信息、连接信息ConsumerGroupInfo info = next.getValue();// 删除, 因为参数只有连接通道, 不确定是哪个消费者组下面的, 所以会遍历所有消费组一个一个去判断删除boolean removed = info.doChannelCloseEvent(remoteAddr, channel);if (removed) {// 删除成功了, 判断下这个消费者组下面还有没有消费者连接if (info.getChannelInfoTable().isEmpty()) {// 如果没有了, 把这个消费者组也删掉ConsumerGroupInfo remove = this.consumerTable.remove(next.getKey());if (remove != null) {// 如果删除成功了log.info("unregister consumer ok, no any connection, and remove consumer group, {}",next.getKey());// 处理消费者组 UNREGISTER 事件, 这里面不回去通知消费者重平衡, 因为这个消费者组下面的消费者都删掉了this.consumerIdsChangeListener.handle(ConsumerGroupEvent.UNREGISTER, next.getKey());}}// 通知其他所有消费者去重平衡, 但是由于上面 handle 之后没有返回, 下面这里就会有两种情况// 1.info.getChannelInfoTable().isEmpty() = true, 这里 getAllChannel 获取到的就是空集合, CHANGE 事件对于空集合就是不处理// 2.info.getChannelInfoTable().isEmpty() = false, 意思就是只删除了消费者组下面的一个消费者, 所以就通知其余消费者进行重平衡this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, next.getKey(), info.getAllChannel());}}
}/*** 关闭连接* @param remoteAddr* @param channel* @return*/
public boolean doChannelCloseEvent(final String remoteAddr, final Channel channel) {// 从 channelInfoTable 中删除消费者连接final ClientChannelInfo info = this.channelInfoTable.remove(channel);if (info != null) {// 删除成功log.warn("NETTY EVENT: remove not active channel[{}] from ConsumerGroupInfo groupChannelTable, consumer group: {}",info.toString(), groupName);return true;}// 删除失败, 就是没找到消费连接return false;
}

消费者组会稍微复杂一点,由于消费者的信息比较多(消费点位、topic 订阅消息),因此消费者的管理使用了 consumerTable 去管理消费者组下面的消费者信息。

private final ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable =new ConcurrentHashMap<String, ConsumerGroupInfo>(1024);public class ConsumerGroupInfo {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);// 消费者组名private final String groupName;// 消费者组下面的消费者的订阅情况, 不过一个消费者组一般都只会订阅消费一个 topic 吧private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable =new ConcurrentHashMap<String, SubscriptionData>();// 消费者组下面的连接信息private final ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =new ConcurrentHashMap<Channel, ClientChannelInfo>(16);// 消费者组的消费类型, PULL 或者 PUSHprivate volatile ConsumeType consumeType;// 消费模式, 广播还是集群private volatile MessageModel messageModel;// 从哪个位置开始消费private volatile ConsumeFromWhere consumeFromWhere;// 上一次上报心跳的事件private volatile long lastUpdateTimestamp = System.currentTimeMillis();...
}

因此这里面的处理逻辑就是遍历 consumerTable 集合,使用 doChannelCloseEvent 方法去判断这个消费者组下面的 channelInfoTable 集合是否包含要删除的 channel,如果包含就删掉。删除成功之后如果这个消费者组已经没有消费者了,那么这个消费者组也可以删掉了。

不过要注意一下,消费者涉及到重平衡的逻辑,但是重平衡是针对消费者组下面的消费者对 topic 下面的队列去重平衡,如果消费者组都没了就没必要重平衡,因此可以看到如果说删除消费者组成功了,通过 handle 处理的是 UNREGISTER 事件。而如果没有删除消费者组,只是删除了消费者,那么通过 handle 处理的是 CHANGE 事件。


3.3.1 DefaultConsumerIdsChangeListener#handle

这个方法就是处理消费者变更的方法,只需要关注 CHANGE 和 UNREGISTER 就行。

@Override
public void handle(ConsumerGroupEvent event, String group, Object... args) {if (event == null) {return;}switch (event) {case CHANGE:// 消费者变更, 但是消费者组还在, 同时消费者组下面的其余消费者进行重平衡if (args == null || args.length < 1) {return;}// 消费者组下面的消费者连接List<Channel> channels = (List<Channel>) args[0];// 如果参数里面传入了连接且配置是设置了当消费者变更就通知其他消费者重平衡if (channels != null && brokerController.getBrokerConfig().isNotifyConsumerIdsChangedEnable()) {for (Channel chl : channels) {// 通知消费者重平衡this.brokerController.getBroker2Client().notifyConsumerIdsChanged(chl, group);}}break;case UNREGISTER:// 这里就是整个消费者组从 consumerTable 集合中删掉了, 由于消费者组下面没有消费者, 因此也不需要重平衡this.brokerController.getConsumerFilterManager().unRegister(group);break;case REGISTER:// 消费者注册if (args == null || args.length < 1) {return;}Collection<SubscriptionData> subscriptionDataList = (Collection<SubscriptionData>) args[0];this.brokerController.getConsumerFilterManager().register(group, subscriptionDataList);break;default:throw new RuntimeException("Unknown event " + event);}
}

3.3.2 ConsumerFilterManager#unRegister

UNREGISTER 就是整个消费者组从 consumerTable 集合中删掉了, 由于消费者组下面没有消费者, 因此也不需要重平衡,所以 unRegister 就是简单处理下消费者组的消费者过滤信息。

/*** 删除消费者组过滤信息* @param consumerGroup*/
public void unRegister(String consumerGroup) {// 如果不包括, 直接返回if (!this.groupFilterData.containsKey(consumerGroup)) {return;}// 获取消费者组的过滤信息ConsumerFilterData data = this.groupFilterData.get(consumerGroup);// 如果过滤信息过期了, 就是 dead > bornTimeif (data == null || data.isDead()) {return;}long now = System.currentTimeMillis();log.info("Unregister consumer filter: {}, deadTime: {}", data, now);// 设置删除时间是当前时间data.setDeadTime(now);
}

当然了这里也是简单看下里面的逻辑,等到后面讲消费者的时候也会讲消息过滤这块的内容。


3.3.3 Broker2Client#notifyConsumerIdsChanged

/*** 通知消费者重平衡* @param channel* @param consumerGroup*/
public void notifyConsumerIdsChanged(final Channel channel,final String consumerGroup) {if (null == consumerGroup) {log.error("notifyConsumerIdsChanged consumerGroup is null");return;}// 构建请求头NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader();requestHeader.setConsumerGroup(consumerGroup);// 请求类型是 NOTIFY_CONSUMER_IDS_CHANGEDRemotingCommand request =RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader);try {// 发送 OneWay 请求this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);} catch (Exception e) {log.error("notifyConsumerIdsChanged exception. group={}, error={}", consumerGroup, e.toString());}
}

这里就是直接发送单向请求,请求 Code 为 NOTIFY_CONSUMER_IDS_CHANGED,意思是消费者组消费者发生变化,发送请求进行重平衡。

消费者的重平衡我们后面讲消费者的时候再详细介绍,这里先不说,先知道 Broker 怎么处理的就行。


3.4 FilterServerManager#doChannelCloseEvent

过滤服务的 doChannelCloseEvent 就是简单讲过滤服务连接从 filterServerTable 中删掉。

/*** 删除过滤服务连接, 直接从 filterServerTable 集合中删除* @param remoteAddr* @param channel*/
public void doChannelCloseEvent(final String remoteAddr, final Channel channel) {FilterServerInfo old = this.filterServerTable.remove(channel);if (old != null) {log.warn("The Filter Server<{}> connection<{}> closed, remove it", old.getFilterServerAddr(),remoteAddr);}
}

4. 小结

这篇文章中我们主要是讲述了 broker 的 NettyRemotingClient 和 NettyRemotingServer,其中 NettyRemotingServer 重点是讲了 ClientHousekeepingServer 这个连接处理类,根据前面的探讨我们也知道了 broker 既可以作为服务端和生产者、消费者、消息过滤服务通信,也可以作为客户端和 NameServer 通信。





如有错误,欢迎指出!!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/80316.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/80316.shtml
英文地址,请注明出处:http://en.pswp.cn/web/80316.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++性能测试工具——AMD CodeAnalyst及其新工具的使用

一、CodeAnalyst及其新的替代工具 与VTune相比&#xff0c;AMD也有自己的性能测试工具&#xff0c;也就是CodeAnalyst。不过目前看&#xff0c;其应该已经有些过时&#xff0c;目前AMD提供了更新的性能测试工具uProf或CodeXL&#xff0c;这些新工具的优点在于对新的硬件架构和…

ProfibusDP主站转modbusTCP网关与ABB电机保护器数据交互

ProfibusDP主站转modbusTCP网关与ABB电机保护器数据交互 在工业自动化领域&#xff0c;Profibus DP&#xff08;Process Field Bus&#xff09;和Modbus TCP是两种常见的通讯协议&#xff0c;它们各自在不同的场合发挥着重要作用。然而&#xff0c;随着技术的发展和应用需求的…

2025.05.17淘天机考笔试真题第三题

&#x1f4cc; 点击直达笔试专栏 &#x1f449;《大厂笔试突围》 &#x1f4bb; 春秋招笔试突围在线OJ &#x1f449; 笔试突围OJ 03. 奇偶平衡树分割问题 问题描述 K小姐是一位园林设计师&#xff0c;她设计了一个由多个花坛组成的树形公园。每个花坛中种植了不同数量的花…

第三十五节:特征检测与描述-ORB 特征

1. 引言:为什么需要ORB? 在计算机视觉领域,特征检测与描述是许多任务(如图像匹配、目标跟踪、三维重建等)的核心基础。传统的算法如SIFT(尺度不变特征变换)和SURF(加速稳健特征)因其优异的性能被广泛应用,但它们存在两个显著问题: 专利限制:SIFT和SURF受专利保护,…

深入解读WPDRRC信息安全模型:构建中国特色的信息安全防护体系

目录 前言1 WPDRRC模型概述2 模型结构详解2.1 预警&#xff08;Warning&#xff09;2.2 保护&#xff08;Protect&#xff09;2.3 检测&#xff08;Detect&#xff09;2.4 响应&#xff08;React&#xff09;2.5 恢复&#xff08;Restore&#xff09;2.6 反击&#xff08;Count…

《算法导论(第4版)》阅读笔记:p82-p82

《算法导论(第4版)》学习第 17 天&#xff0c;p82-p82 总结&#xff0c;总计 1 页。 一、技术总结 1. Matrix Matrices(矩阵) (1)教材 因为第 4 章涉及到矩阵&#xff0c;矩阵属于线性代数(linear algebra)范畴&#xff0c;如果不熟悉&#xff0c;可以看一下作者推荐的两本…

基于Spring Boot和Vue的在线考试系统架构设计与实现(源码+论文+部署讲解等)

源码项目获取联系 请文末卡片dd我获取更详细的演示视频 系统介绍 基于Spring Boot和Vue的在线考试系统。为学生和教师/管理员提供一个高效、便捷的在线学习、考试及管理平台。系统采用前后端分离的架构&#xff0c;后端基于成熟稳定的Spring Boot框架&#xff0c;负责数据处理…

Codeforces Round 1024 (Div.2)

比赛链接&#xff1a;CF1024 A. Dinner Time 只有当 n n n 是 p p p 的倍数而且 n ⋅ q p ̸ m \frac{n \cdot q}{p} \not m pn⋅q​m 时输出 NO&#xff0c;其余情况均满足条件。 时间复杂度&#xff1a; O ( 1 ) O(1) O(1)。 #include <bits/stdc.h> using na…

【LeetCode 热题 100】二叉树的最大深度 / 翻转二叉树 / 二叉树的直径 / 验证二叉搜索树

⭐️个人主页&#xff1a;小羊 ⭐️所属专栏&#xff1a;LeetCode 热题 100 很荣幸您能阅读我的文章&#xff0c;诚请评论指点&#xff0c;欢迎欢迎 ~ 目录 二叉树的中序遍历二叉树的最大深度翻转二叉树对称二叉树二叉树的直径二叉树的层序遍历将有序数组转换为二叉搜索树验…

Tomcat发布websocket

一、tomcal的lib放入文件 tomcat-websocket.jar websocket-api.jar 二、代码示例 package com.test.ws;import com.test.core.json.Jmode;import javax.websocket.*; import javax.websocket.server.ServerEndpoint; import java.util.concurrent.CopyOnWriteArraySet; imp…

LLM笔记(二)LLM数据基础-分词算法(2)

文章目录 1. 分词算法概述1.1 基于词典的&#xff08;或基于规则的&#xff09;分词算法1.2 基于统计的&#xff08;或基于机器学习的&#xff09;分词算法1.3 基于深度学习的分词算法1.4 子词&#xff08;Subword&#xff09;分词算法1.5 混合分词算法1.6 针对不同语言的特点 …

Uniapp开发鸿蒙应用时如何运行和调试项目

经过前几天的分享&#xff0c;大家应该应该对uniapp开发鸿蒙应用的开发语法有了一定的了解&#xff0c;可以进行一些简单的应用开发&#xff0c;今天分享一下在使用uniapp开发鸿蒙应用时怎么运行到鸿蒙设备&#xff0c;并且在开发中怎么调试程序。 运行 Uniapp项目支持运行到…

数据湖与数据仓库融合:Hudi、Iceberg、Delta Lake 实践对比

在实时与离线一体化的今天,数据湖与数据仓库边界不断融合,越来越多企业选用如 Hudi、Iceberg、Delta Lake 等开源方案实现统一的数据存储、计算、分析平台。本篇将围绕以下关键点,展开实战对比与解决方案分享: ✅ 实时写入能力 ✅ ACID 保证 ✅ 增量数据处理能力 ✅ 流批一…

Python爬虫(29)Python爬虫高阶:动态页面处理与云原生部署全链路实践(Selenium、Scrapy、K8s)

目录 引言&#xff1a;动态爬虫的技术挑战与云原生机遇一、动态页面处理&#xff1a;Selenium与Scrapy的协同作战1.1 Selenium的核心价值与局限1.2 Scrapy-Selenium中间件开发1.3 动态分页处理实战&#xff1a;京东商品爬虫 二、云原生部署&#xff1a;Kubernetes架构设计与优化…

数据结构(十)——排序

一、选择排序 1.简单选择排序 基本思想&#xff1a;假设排序表为[1,…,n]&#xff0c;第i趟排序即从[i,…,n]中选择关键字最小的元素与L[i]交换 eg&#xff1a;给定关键字序列{87&#xff0c;45&#xff0c;78&#xff0c;32&#xff0c;17&#xff0c;65&#xff0c;53&…

小结:jvm 类加载过程

类加载过程 是Java虚拟机&#xff08;JVM&#xff09;将字节码文件&#xff08;.class文件&#xff09;加载到内存中&#xff0c;并转换为运行时数据结构的过程。这个过程可以分为多个步骤&#xff0c;每个步骤都有其特定的任务和目的。根据你提供的信息&#xff0c;以下是类加…

2024 山东省ccpc省赛

目录 I&#xff08;签到&#xff09; 题目简述&#xff1a; 思路&#xff1a; 代码&#xff1a; A&#xff08;二分答案&#xff09; 题目简述&#xff1a; 思路&#xff1a; 代码&#xff1a; K&#xff08;构造&#xff09; 题目&#xff1a; 思路&#xff1a; 代…

turn.js与 PHP 结合使用来实现 PDF 文件的页面切换效果

将 Turn.js 与 PHP 结合使用来实现 PDF 文件的页面切换效果&#xff0c;你需要一个中间步骤将 PDF 转换为 Turn.js 可以处理的格式&#xff08;如 HTML 页面或图片&#xff09;。以下是实现这一功能的步骤和示例代码&#xff1a; 步骤 1: 安装必要的库 首先&#xff0c;你需要…

Python实现NOA星雀优化算法优化卷积神经网络CNN回归模型项目实战

说明&#xff1a;这是一个机器学习实战项目&#xff08;附带数据代码文档视频讲解&#xff09;&#xff0c;如需数据代码文档视频讲解可以直接到文章最后关注获取。 1.项目背景 在当今数据驱动的时代&#xff0c;卷积神经网络&#xff08;CNN&#xff09;不仅在图像分类任务中…

(面试)View相关知识

1、View绘制流程 onMeasure() 确定View的测量宽高。onLayout() 确定View的最终宽高和四个顶点的位置。onDraw() 将View 绘制到屏幕上。 2、MeasureSpec有三种测量模式&#xff1a; 2.1. EXACTLY&#xff08;精确模式&#xff09; 含义&#xff1a;父容器明确指定了子View的精…