文章目录
- 1. 前言
- 2. BrokerOuterAPI
- 2.1 NettyRemotingClient
- 2.2 start 启动
- 2.2.1 NettyRemotingClient#start
- 3. NettyRemotingServer
- 3.1 ClientHousekeepingService
- 3.2 ProducerManager#doChannelCloseEvent
- 3.3 ConsumerManager#doChannelCloseEvent
- 3.3.1 DefaultConsumerIdsChangeListener#handle
- 3.3.2 ConsumerFilterManager#unRegister
- 3.3.3 Broker2Client#notifyConsumerIdsChanged
- 3.4 FilterServerManager#doChannelCloseEvent
- 4. 小结
本文章基于 RocketMQ 4.9.3
1. 前言
- 【RocketMQ】- 源码系列目录
- 【RocketMQ NameServer】- NameServer 启动源码
在前面的 NameServer 启动源码,我们探讨了 NettyRemotingServer 的启动源码,NameServer 作为服务端通过 NettyRemotingServer 处理 Broker 上报的信息,当然了 NettyRemotingServer 不单单是 NameServer 独有的,Broker 既可以作为服务端和生产者、消费者通信,又可以作为客户端和 NameServer 通信。
Broker 作为客户端通信时,将一些方法比如 fetchNameServerAddr 拉取 NameServer 地址、注册 Broker 信息到 NameServer、取消注册 Broker 信息都封装到了 BrokerOuterAPI 中。
2. BrokerOuterAPI
BrokerOuterAPI 在 BrokerController 构造器中被初始化,可以看到传入的就是 NettyClientConfig,也就是 Netty 客户端的配置,下面就来看下构造器的初始化。
public BrokerOuterAPI(final NettyClientConfig nettyClientConfig) {this(nettyClientConfig, null);
}public BrokerOuterAPI(final NettyClientConfig nettyClientConfig, RPCHook rpcHook) {this.remotingClient = new NettyRemotingClient(nettyClientConfig);this.remotingClient.registerRPCHook(rpcHook);
}
可以看到 BrokerOuterAPI 的构造器就是初始化了 NettyRemotingClient 这个客户端通信类,同时注册进去的 RPC 钩子为 null,也就是在跟 NameServer 通信的前后不会调用 RPC 钩子的前置和后置方法,所以核心还是 NettyRemotingClient。
2.1 NettyRemotingClient
public NettyRemotingClient(final NettyClientConfig nettyClientConfig) {this(nettyClientConfig, null);
}public NettyRemotingClient(final NettyClientConfig nettyClientConfig,final ChannelEventListener channelEventListener) {// 首先设置服务器单向、异步发送请求的信号量, 这个是为了防止同一时间发送太多单向请求或者异步请求, 默认都是 65536super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());// 客户端配置this.nettyClientConfig = nettyClientConfig;// 连接事件监听器this.channelEventListener = channelEventListener;// 公开线程池 publicExecutor 的核心线程数, 默认就是 4int publicThreadNums = nettyClientConfig.getClientCallbackExecutorThreads();if (publicThreadNums <= 0) {publicThreadNums = 4;}// 创建一个默认线程数为 4 的线程池, 这个线程池可以用于处理请求回调this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet());}});// 单线程 I/O 事件处理器(线程数 1)this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));}});if (nettyClientConfig.isUseTLS()) {try {sslContext = TlsHelper.buildSslContext(true);log.info("SSL enabled for client");} catch (IOException e) {log.error("Failed to create SSLContext", e);} catch (CertificateException e) {log.error("Failed to create SSLContext", e);throw new RuntimeException("Failed to create SSLContext", e);}}
}
这里的逻辑跟 NettyRemotingServer 差不多,只是作为客户端不需要创建 Accept 事件 Reactor 组,也就是 bossGroup。
2.2 start 启动
创建出 BrokerOuterAPI 之后,会在 BrokerController#start 方法中启动。
这里面也是启动了 remotingClient,所以下面就来看下 NettyRemotingClient 的 start 方法。
2.2.1 NettyRemotingClient#start
/*** 客户端 Netty 服务启动*/
@Override
public void start() {// 创建默认的事件处理组, 专门用于处理一些执行前的编解码、连接空闲状态检测、网络连接管理等操作, 不管是接收到消息还是发送消息(入站或者出站处理器)// 都可以通过这个线程池去做处理, 这样就能避免 I/O 线程浪费资源在这些状态检测上, 这里默认创建的是 4 个线程大小的线程池this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyClientConfig.getClientWorkerThreads(),new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {// 创建出来的线程是 NettyClientWorkerThread_0、NettyClientWorkerThread_1 ...return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());}});// I/O 事件处理器(线程数 1)Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)// 禁用 Nagle 算法, Nagle 算法是说当应用程序发送的数据量较小时,TCP 不会立即将这些数据发送出去,而是会等待一段时间,将后续的数据合并// 成一个较大的数据包后再发送。这样可以减少网络上的数据包数量,降低网络拥塞的可能性,提高传输效率.option(ChannelOption.TCP_NODELAY, true)// 用于 TCP 连接心跳检测, 检测 TCP 连接是否活跃, 这里可能是 Netty 自己已经存在更精细的 IdleStateHandler 处理器就行了.option(ChannelOption.SO_KEEPALIVE, false)// 连接超时时间.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()).handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();if (nettyClientConfig.isUseTLS()) {if (null != sslContext) {pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));log.info("Prepend SSL handler");} else {log.warn("Connections are insecure as SSLContext is null!");}}pipeline.addLast(defaultEventExecutorGroup,// 编码处理器new NettyEncoder(),// 解码处理器new NettyDecoder(),// 维护心跳连接的处理器, 指定的三个参数是值读空闲时间、写空闲时间、总空闲时间, 如果服务端一定时间内没有读写就会出发不同的事件,这三个参数对应的// 事件分别是: IdleStateEvent.READER_IDLE、IdleStateEvent.WRITER_IDLE、IdleStateEvent.ALL_IDLE, 总之这玩意就是用来检测有没有长时间不// 读写的, 这样可以判断一个连接是不是空闲连接new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),// 连接管理器, 主要是管理连接的活跃、非活跃、注册、取消注册 ... 事件new NettyConnectManageHandler(),// 客户端业务处理器new NettyClientHandler());}});// 设置 Netty 客户端的消息接收缓冲区, 默认是 0if (nettyClientConfig.getClientSocketSndBufSize() > 0) {log.info("client set SO_SNDBUF to {}", nettyClientConfig.getClientSocketSndBufSize());handler.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize());}// 设置 Netty 客户端的消息发送缓冲区, 默认也是 0if (nettyClientConfig.getClientSocketRcvBufSize() > 0) {log.info("client set SO_RCVBUF to {}", nettyClientConfig.getClientSocketRcvBufSize());handler.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize());}// 写缓冲区的低水位和高水位if (nettyClientConfig.getWriteBufferLowWaterMark() > 0 && nettyClientConfig.getWriteBufferHighWaterMark() > 0) {log.info("client set netty WRITE_BUFFER_WATER_MARK to {},{}",nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark());handler.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark()));}// 定时任务, 初始化之后 3s 执行, 之后 1s 执行一次this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {// 扫描 responseTable, 将超时的 ResponseFuture 删掉, 然后执行回调逻辑NettyRemotingClient.this.scanResponseTable();} catch (Throwable e) {log.error("scanResponseTable exception", e);}}}, 1000 * 3, 1000);// 如果连接监听器设置了, 就启动 NettyEventExecutor 去处理 NettyEventif (this.channelEventListener != null) {// broker 启动的时候是不会设置连接监听器的, 生产者和消费者启动才会设置this.nettyEventExecutor.start();}
}
这里的方法跟 NettyRemotingServer 也是差不多的,首先是创建 defaultEventExecutorGroup 默认的事件处理组,专门用于处理一些执行前的编解码、连接空闲状态检测、网络连接管理等操作,不管是接收到消息还是发送消息(入站或者出站处理器) 都可以通过这个线程池去做处理,这样就能避免 I/O 线程浪费资源在这些状态检测上, 这里默认创建的是 4 个线程大小的线程池。
接下来就是创建 Bootstrap,因为客户端不涉及处理 Accept 连接事件,所以只需要设置从 Reactor 组就可以,不过这里面的 Reactor 线程组大小是 1,所以是单线程处理 IO 读写事件?很久没看了也不是很清楚。
接着往下看,下面 initChannel 方法中往 Channel 的 Pipeline 里面设置的处理器包括编码处理器,解码处理器,心跳连接处理器,连接管理器和客户端业务处理器。参看 NettyRemotingServer 的处理器。
可以看到的是跟 NettyRemotingServer 对比,入站处理器就少了一个 HandshakeHandler,因为不需要处理连接事件,同时最后的 NettyServerHandler 换成了 NettyClientHandler。出站处理器则是一样的。
最后就是启动定时任务,初始化之后 3s 执行, 之后 1s 执行一次,扫描 responseTable,将超时的 ResponseFuture 删掉,然后执行回调逻辑。
由于 BrokerOuterAPI 并没有设置 channelEventListener,所以就不会启动 nettyEventExecutor,broker 启动的时候是不会设置连接监听器的, 生产者和消费者启动才会设置,关于 nettyEventExecutor 可以看这篇文章:【RocketMQ NameServer】- NettyEventExecutor 处理 Netty 事件。
3. NettyRemotingServer
在第二小节我们说了,Broker 可以作为服务端跟生产者、消费者、过滤服务通信,也可以作为客户端向 NameServer 发送请求,作为服务端,Broker 会在 initialize 初始化的时候创建出 fastRemotingServer 和 remotingServer。
fastRemotingServer 和 remotingServer 都是作为 Netty 服务端通信用的,只是一个监听 10909,一个监听 10911,fastRemotingServer 所建立的连接通道也是 VIP 通道,VIP 意思就是性能比较高的,一般都是用于内部使用,比如超时同步加锁请求、获取配置等等,而 remotingServer 监听端口 10911,专门处理来自生产者和消费者的请求,比如消息发送、消息拉取,因为生产者和消费者的交互是很频繁的,所以自热而然请求的性能就会慢一点。
当然了,创建 NettyRemotingServer 和启动 NettyRemotingServer 的流程在 NameServer 启动的文章中已经有讲解过源码,在 NettyRemotingServer 启动的源码中,我们也知道了 NettyRemotingServer 启动的时候会启动 nettyEventExecutor 线程,专门从 eventQueue 获取不同的连接事件然后做不同的处理,关于连接事件 NettyEvent,后面我也写了一篇文章 【RocketMQ NameServer】- NettyEventExecutor 处理 Netty 事件 来讲述 NettyEventExecutor 是如何处理 Netty 事件的,NettyEvent 又是怎么来的。
当时写上面这篇文章的时候可以看到 NettyEventExecutor#run 方法实际上是调用了 BrokerHousekeepingService 去处理连接事件,而 broker 的 remotingServer 看源码也能看到是传入了 clientHousekeepingService 去处理生产者和消费者的连接事件的,所以这里我们就来看下 clientHousekeepingService 这个类的逻辑。
3.1 ClientHousekeepingService
其实看名字就能看出来了,NameServer 通过 BrokerHousekeepingService 是专门处理 Broker 的连接事件,而 Broker 通过 ClientHousekeepingService 是专门处理生产者、消费者、过滤服务(已废弃)的连接通道。
在 【RocketMQ NameServer】- NettyEventExecutor 处理 Netty 事件
这篇文章我们也知道了
- NettyEventExecutor 会调用 ClientHousekeepingService#onChannelIdle 处理连接空闲事件(IDLE)
- NettyEventExecutor 会调用 ClientHousekeepingService#onChannelClose 处理连接关闭事件(CLOSE)
- NettyEventExecutor 会调用 ClientHousekeepingService#onChannelConnect 处理生产者和消费者连接事件(CONNECT)
- NettyEventExecutor 会调用 ClientHousekeepingService#onChannelException 处理连接异常事件(EXCEPTION)
那下面来看下这几个方法,由于源码不多,就直接贴出来。
@Override
public void onChannelConnect(String remoteAddr, Channel channel) {}/*** 连接关闭* @param remoteAddr* @param channel*/
@Override
public void onChannelClose(String remoteAddr, Channel channel) {this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
}/*** 连接处理发生异常* @param remoteAddr* @param channel*/
@Override
public void onChannelException(String remoteAddr, Channel channel) {this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
}/*** 空闲连接处理* @param remoteAddr* @param channel*/
@Override
public void onChannelIdle(String remoteAddr, Channel channel) {this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
}
可以看到,onChannelConnect 方法是一个空实现,应该是因为生产者和消费者都会定时上报心跳给 broker,所以连接不需要额外处理。
然后其他三个类型的事件都是调用同一个方法 doChannelCloseEvent 去处理,只是生产者、消费者、过滤服务器的逻辑不同,下面就一个一个来看下这里面的处理逻辑。
3.2 ProducerManager#doChannelCloseEvent
/*** broker 关闭和生产者的连接* @param remoteAddr* @param channel*/
public synchronized void doChannelCloseEvent(final String remoteAddr, final Channel channel) {if (channel != null) {// 遍历所有生产者连接for (final Map.Entry<String, ConcurrentHashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable.entrySet()) {// 生产者组final String group = entry.getKey();// 生产者组下面的生产者连接final ConcurrentHashMap<Channel, ClientChannelInfo> clientChannelInfoTable =entry.getValue();// 删除这个连接final ClientChannelInfo clientChannelInfo =clientChannelInfoTable.remove(channel);if (clientChannelInfo != null) {// 删除成功之后, 将这个生产者的客户端 ID 也从 clientChannelTable 集合中删掉clientChannelTable.remove(clientChannelInfo.getClientId());log.info("NETTY EVENT: remove channel[{}][{}] from ProducerManager groupChannelTable, producer group: {}",clientChannelInfo.toString(), remoteAddr, group);}}}
}
生产者管理类 ProducerManager#groupChannelTable
集合会存储生产者组下面的所有生产者连接,clientChannelTable
存储了【客户端 ID -> 连接】映射关系。
/*** 生产者组 -> 生产者组下面的生产者连接*/
private final ConcurrentHashMap<String /* group name */, ConcurrentHashMap<Channel, ClientChannelInfo>> groupChannelTable =new ConcurrentHashMap<>();
/*** 客户端 ID -> 生产者连接*/
private final ConcurrentHashMap<String, Channel> clientChannelTable = new ConcurrentHashMap<>();
上面的 doChannelCloseEvent 方法就是在处理这两个集合,由于参数只有一个 channel,所以不知道是哪个生产者组下面的,就需要遍历所有生产者组,一个一个去判断删除。
3.3 ConsumerManager#doChannelCloseEvent
/*** 关闭消费者连接* @param remoteAddr* @param channel*/
public void doChannelCloseEvent(final String remoteAddr, final Channel channel) {// 遍历消费者组下面的消费者Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();while (it.hasNext()) {Entry<String, ConsumerGroupInfo> next = it.next();// 获取消费者组消息, 里面包括这个消费者组的订阅信息、连接信息ConsumerGroupInfo info = next.getValue();// 删除, 因为参数只有连接通道, 不确定是哪个消费者组下面的, 所以会遍历所有消费组一个一个去判断删除boolean removed = info.doChannelCloseEvent(remoteAddr, channel);if (removed) {// 删除成功了, 判断下这个消费者组下面还有没有消费者连接if (info.getChannelInfoTable().isEmpty()) {// 如果没有了, 把这个消费者组也删掉ConsumerGroupInfo remove = this.consumerTable.remove(next.getKey());if (remove != null) {// 如果删除成功了log.info("unregister consumer ok, no any connection, and remove consumer group, {}",next.getKey());// 处理消费者组 UNREGISTER 事件, 这里面不回去通知消费者重平衡, 因为这个消费者组下面的消费者都删掉了this.consumerIdsChangeListener.handle(ConsumerGroupEvent.UNREGISTER, next.getKey());}}// 通知其他所有消费者去重平衡, 但是由于上面 handle 之后没有返回, 下面这里就会有两种情况// 1.info.getChannelInfoTable().isEmpty() = true, 这里 getAllChannel 获取到的就是空集合, CHANGE 事件对于空集合就是不处理// 2.info.getChannelInfoTable().isEmpty() = false, 意思就是只删除了消费者组下面的一个消费者, 所以就通知其余消费者进行重平衡this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, next.getKey(), info.getAllChannel());}}
}/*** 关闭连接* @param remoteAddr* @param channel* @return*/
public boolean doChannelCloseEvent(final String remoteAddr, final Channel channel) {// 从 channelInfoTable 中删除消费者连接final ClientChannelInfo info = this.channelInfoTable.remove(channel);if (info != null) {// 删除成功log.warn("NETTY EVENT: remove not active channel[{}] from ConsumerGroupInfo groupChannelTable, consumer group: {}",info.toString(), groupName);return true;}// 删除失败, 就是没找到消费连接return false;
}
消费者组会稍微复杂一点,由于消费者的信息比较多(消费点位、topic 订阅消息),因此消费者的管理使用了 consumerTable
去管理消费者组下面的消费者信息。
private final ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable =new ConcurrentHashMap<String, ConsumerGroupInfo>(1024);public class ConsumerGroupInfo {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);// 消费者组名private final String groupName;// 消费者组下面的消费者的订阅情况, 不过一个消费者组一般都只会订阅消费一个 topic 吧private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable =new ConcurrentHashMap<String, SubscriptionData>();// 消费者组下面的连接信息private final ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =new ConcurrentHashMap<Channel, ClientChannelInfo>(16);// 消费者组的消费类型, PULL 或者 PUSHprivate volatile ConsumeType consumeType;// 消费模式, 广播还是集群private volatile MessageModel messageModel;// 从哪个位置开始消费private volatile ConsumeFromWhere consumeFromWhere;// 上一次上报心跳的事件private volatile long lastUpdateTimestamp = System.currentTimeMillis();...
}
因此这里面的处理逻辑就是遍历 consumerTable
集合,使用 doChannelCloseEvent
方法去判断这个消费者组下面的 channelInfoTable
集合是否包含要删除的 channel
,如果包含就删掉。删除成功之后如果这个消费者组已经没有消费者了,那么这个消费者组也可以删掉了。
不过要注意一下,消费者涉及到重平衡的逻辑,但是重平衡是针对消费者组下面的消费者对 topic 下面的队列去重平衡,如果消费者组都没了就没必要重平衡,因此可以看到如果说删除消费者组成功了,通过 handle 处理的是 UNREGISTER 事件。而如果没有删除消费者组,只是删除了消费者,那么通过 handle 处理的是 CHANGE 事件。
3.3.1 DefaultConsumerIdsChangeListener#handle
这个方法就是处理消费者变更的方法,只需要关注 CHANGE 和 UNREGISTER 就行。
@Override
public void handle(ConsumerGroupEvent event, String group, Object... args) {if (event == null) {return;}switch (event) {case CHANGE:// 消费者变更, 但是消费者组还在, 同时消费者组下面的其余消费者进行重平衡if (args == null || args.length < 1) {return;}// 消费者组下面的消费者连接List<Channel> channels = (List<Channel>) args[0];// 如果参数里面传入了连接且配置是设置了当消费者变更就通知其他消费者重平衡if (channels != null && brokerController.getBrokerConfig().isNotifyConsumerIdsChangedEnable()) {for (Channel chl : channels) {// 通知消费者重平衡this.brokerController.getBroker2Client().notifyConsumerIdsChanged(chl, group);}}break;case UNREGISTER:// 这里就是整个消费者组从 consumerTable 集合中删掉了, 由于消费者组下面没有消费者, 因此也不需要重平衡this.brokerController.getConsumerFilterManager().unRegister(group);break;case REGISTER:// 消费者注册if (args == null || args.length < 1) {return;}Collection<SubscriptionData> subscriptionDataList = (Collection<SubscriptionData>) args[0];this.brokerController.getConsumerFilterManager().register(group, subscriptionDataList);break;default:throw new RuntimeException("Unknown event " + event);}
}
3.3.2 ConsumerFilterManager#unRegister
UNREGISTER 就是整个消费者组从 consumerTable 集合中删掉了, 由于消费者组下面没有消费者, 因此也不需要重平衡,所以 unRegister
就是简单处理下消费者组的消费者过滤信息。
/*** 删除消费者组过滤信息* @param consumerGroup*/
public void unRegister(String consumerGroup) {// 如果不包括, 直接返回if (!this.groupFilterData.containsKey(consumerGroup)) {return;}// 获取消费者组的过滤信息ConsumerFilterData data = this.groupFilterData.get(consumerGroup);// 如果过滤信息过期了, 就是 dead > bornTimeif (data == null || data.isDead()) {return;}long now = System.currentTimeMillis();log.info("Unregister consumer filter: {}, deadTime: {}", data, now);// 设置删除时间是当前时间data.setDeadTime(now);
}
当然了这里也是简单看下里面的逻辑,等到后面讲消费者的时候也会讲消息过滤这块的内容。
3.3.3 Broker2Client#notifyConsumerIdsChanged
/*** 通知消费者重平衡* @param channel* @param consumerGroup*/
public void notifyConsumerIdsChanged(final Channel channel,final String consumerGroup) {if (null == consumerGroup) {log.error("notifyConsumerIdsChanged consumerGroup is null");return;}// 构建请求头NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader();requestHeader.setConsumerGroup(consumerGroup);// 请求类型是 NOTIFY_CONSUMER_IDS_CHANGEDRemotingCommand request =RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader);try {// 发送 OneWay 请求this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);} catch (Exception e) {log.error("notifyConsumerIdsChanged exception. group={}, error={}", consumerGroup, e.toString());}
}
这里就是直接发送单向请求,请求 Code 为 NOTIFY_CONSUMER_IDS_CHANGED,意思是消费者组消费者发生变化,发送请求进行重平衡。
消费者的重平衡我们后面讲消费者的时候再详细介绍,这里先不说,先知道 Broker 怎么处理的就行。
3.4 FilterServerManager#doChannelCloseEvent
过滤服务的 doChannelCloseEvent 就是简单讲过滤服务连接从 filterServerTable 中删掉。
/*** 删除过滤服务连接, 直接从 filterServerTable 集合中删除* @param remoteAddr* @param channel*/
public void doChannelCloseEvent(final String remoteAddr, final Channel channel) {FilterServerInfo old = this.filterServerTable.remove(channel);if (old != null) {log.warn("The Filter Server<{}> connection<{}> closed, remove it", old.getFilterServerAddr(),remoteAddr);}
}
4. 小结
这篇文章中我们主要是讲述了 broker 的 NettyRemotingClient 和 NettyRemotingServer,其中 NettyRemotingServer 重点是讲了 ClientHousekeepingServer 这个连接处理类,根据前面的探讨我们也知道了 broker 既可以作为服务端和生产者、消费者、消息过滤服务通信,也可以作为客户端和 NameServer 通信。
如有错误,欢迎指出!!!!