引子
在前面的文章中,我们基于 Netty 构建了一套单体架构的即时通讯服务。虽然单体架构在开发初期简单高效,但随着用户量的增长和业务规模的扩大,其局限性逐渐显现。当面对高并发场景时,单体 Netty 服务很容易触及性能天花板,导致消息推送延迟、连接频繁断开等问题。
然而,如果只是简单地复制多个 Netty 实例进行水平扩展:
这种方案会带来新的问题:客户端连接分散在不同服务器上,消息无法跨服务器传递。比如用户 A 和用户 B 连接到不同的 Netty 实例,他们之间的消息将无法送达。
三种构建 Netty 集群方案
没有什么问题是加一层解决不了的,下面为大家介绍三种常见的 Netty 集群构建方案。
方案一:基于 Nginx 的负载均衡架构
通过 Nginx 作为负载均衡器,将用户请求分发到不同的 Netty 服务器上。每个用户固定连接到某一台服务器,保证了点对点通信的简单性。
这个方案架构简单,部署方便,Nginx 也是成熟稳定的中间件。但它无法实现跨服务器消息传递,用户只能与同服务器的其他用户通信,无法实现真正的集群化。
方案二:基于注册中心的服务发现架构
引入 Nacos 作为服务注册中心,Netty 服务器启动时向 Nacos 注册。客户端通过 Gateway 获取可用服务器列表,实现动态服务发现。
这个方案支持服务动态上下线,负载均衡策略灵活,服务监控管理方便。但它需要引入阿里系的相关中间件,如果原架构中未使用,突然引入大量中间件,让架构复杂度增加,而且阿里系在Java这块儿通常都是引一个,你得用一套它的方案才能用好。
方案三:基于消息路由的分布式架构
通过 ZooKeeper 记录用户连接信息,Controller 服务查询用户位置并路由消息到正确的 Netty 实例,实现跨服务器通信。
这个方案完美解决跨服务器消息传递,而且支持无限水平扩展,而且只需要引入一个中间件 ZooKeeper,只需要把消息通过相应的 Controller 转发下即可。
通过对三种方案优劣的综合考量,我们最终选择方案三。它真正实现了 Netty 服务的集群化,解决了跨服务器消息传递这一核心问题,而且架构也相对简单,是构建 Netty 集群的最佳选择。
实现消息广播
在选定了基于消息路由的分布式架构后,我们面临的下一个挑战是如何实现高效的消息广播。当用户发送消息时,不仅需要推送给特定的接收者,还可能需要广播给群组成员或者进行系统通知。这就需要一个可靠的消息分发机制。
在前面的文章中,我们已经引入了 RabbitMQ 来处理离线消息的持久化存储。既然已有成熟的消息中间件,我们可以充分利用其发布/订阅模式来实现广播功能。
ZooKeeper 环境搭建
既然我们选择了方案三(基于消息路由的分布式架构),那么在项目改造前,首先需要搭建 ZooKeeper 环境。ZooKeeper 将作为我们的分布式协调服务,记录用户的连接信息,帮助 Controller 服务准确定位用户所在的 Netty 实例。
在我们的即时通讯架构中,ZooKeeper 将承担以下职责:
- 存储用户与 Netty 服务器的映射关系
- 监控 Netty 服务器的健康状态
- 提供服务发现功能
安装 ZooKeeper 的方式有很多,大家也可以选择自己喜欢的方式,我这里就只演示下如何通过 docker 安装。
1. 拉取镜像
首先拉取 ZooKeeper 镜像,大家在安装时先去 docker hub 上查看最新版本,我写这篇文章时最新版本是 3.9.3:
docker pull zookeeper:3.9.3
2.创建挂载目录
为了数据持久化和方便配置管理,我们需要创建本地挂载目录:
3. 启动容器
使用以下命令启动 ZooKeeper 容器,大家在使用时需要注意下操作系统和挂载目录的路径,根据自己的实际情况修改:
docker run --name zookeeper \
-p 2181:2181 \
--restart always \
-v D:\devolop\zookeeper\data:/data \
-v D:\devolop\zookeeper\conf:/conf \
-v D:\devolop\zookeeper\logs:/datalog \
-d zookeeper:3.9.3
4.补充配置文件
容器启动后,查看 conf
目录,会发现自动生成了 zoo.cfg
配置文件:
但是还缺少日志配置文件 logback.xml
。我们需要从 Apache ZooKeeper 官网下载对应版本的安装包来获取完整的配置文件。
访问 https://zookeeper.apache.org/releases.html,下载 3.9.3 版本的二进制包,解压后将 conf/logback.xml
文件复制到本地挂载的 conf
目录中:
5.验证安装
接着重启容器后,进入容器并检查 ZooKeeper 状态:
/apache-zookeeper-3.9.3-bin/bin/zkServer.sh status
如果看到以下输出,说明 ZooKeeper 已成功启动:
Netty 集群改造实战
搭建好 ZooKeeper 环境后,我们开始对原有的单体 Netty 服务进行集群化改造。改造的核心思路是:
-
让每个 Netty 实例启动时自动注册到 ZooKeeper
-
实现基于最少连接数的负载均衡策略
-
通过 RabbitMQ 实现跨服务器的消息广播
-
处理用户上下线时的在线人数同步
Spring Boot 集成 ZooKeeper
1. 引入依赖
首先在项目中引入 Apache Curator 依赖,它是 ZooKeeper 的客户端,提供了更友好的 API:
<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.9.3</version>
</dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.8.0</version>
</dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>5.8.0</version>
</dependency>
2. 配置 ZooKeeper 连接
在 application.yml
中添加 ZooKeeper 配置:
# zookeeper配置
zookeeper:curator:host: 127.0.0.1:2181connectionTimeoutMs: 30000sessionTimeoutMs: 3000sleepMsBetweenRetry: 2000maxRetries: 3namespace: wechat
3.创建 Curator 配置类
这里添加了一个监听器对 Redis 残留端口的处理,如果不加处理,一直累加下去肯定会加到上限的。
@Slf4j
@Component
@Data
@ConfigurationProperties(prefix = "zookeeper.curator")
public class CuratorConfig {private String host;private Integer connectionTimeout;private Integer sessionTimeout;private Integer sleepMsBetweenRetry;private Integer maxRetries;private String namespace;@Autowiredprivate RedisOperator redisOperator;public static final String PATH = "/server-list";@Bean("curatorClient")public CuratorFramework curatorClient() {RetryPolicy retryPolicy = new ExponentialBackoffRetry(sleepMsBetweenRetry,maxRetries);CuratorFramework client = CuratorFrameworkFactory.builder().connectString(host).connectionTimeoutMs(connectionTimeout).sessionTimeoutMs(sessionTimeout).retryPolicy(retryPolicy).namespace(namespace).build();client.start();// 添加节点监听器addWatcher(PATH, client);return client;}/*** 注册节点的事件监听*/public void addWatcher(String path, CuratorFramework client) {CuratorCache curatorCache = CuratorCache.build(client, path);curatorCache.listenable().addListener(((type, oldData, data) -> {switch (type.name()) {case "NODE_DELETED":log.info("节点删除事件");// 清理 Redis 中的端口缓存NettyServerNode oldNode = JsonUtils.jsonToPojo(new String(oldData.getData()), NettyServerNode.class);String oldPort = oldNode.getPort() + "";redisOperator.hdel("netty_port", oldPort);break;default:break;}}));}
}
改造 Netty 服务启动流程
1. 动态端口分配
为了支持在同一台服务器上启动多个 Netty 实例,我们需要实现动态端口分配。通过 Redis 记录已使用的端口,每次启动时自动分配一个新端口:
public class ChatServer {public static final Integer nettyDefaultPort = 875;/*** 动态获取端口号*/public static Integer selectPort(Integer port) {String portKey = "netty_port";Jedis jedis = JedisPoolUtils.getJedis();Map<String, String> portMap = jedis.hgetAll(portKey);List<Integer> portList = portMap.entrySet().stream().map(entry -> Integer.valueOf(entry.getKey())).collect(Collectors.toList());Integer nettyPort = null;if (portList == null || portList.isEmpty()) {jedis.hset(portKey, port+"", "0");nettyPort = port;} else {// 获取最大端口号并加10Optional<Integer> maxInteger = portList.stream().max(Integer::compareTo);Integer maxPort = maxInteger.get().intValue();Integer currentPort = maxPort + 10;jedis.hset(portKey, currentPort+"", "0");nettyPort = currentPort;}return nettyPort;}
}
2. 向 ZooKeeper 注册服务
创建服务注册工具类:
public class ZookeeperRegister {/*** 注册 Netty 服务到 ZooKeeper*/public static void registerNettyServer(String nodeName, String ip, Integer port) throws Exception {CuratorFramework zkClient = CuratorConfig.getClient();String path = "/" + nodeName;Stat stat = zkClient.checkExists().forPath(path);// 创建持久节点if (stat == null) {zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path);}// 创建临时顺序节点,存储服务器信息NettyServerNode serverNode = new NettyServerNode();serverNode.setIp(ip);serverNode.setPort(port);serverNode.setOnlineCounts(0);String nodeJson = JsonUtils.objectToJson(serverNode);zkClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path + "/im-", nodeJson.getBytes());}/*** 获取本机IP地址*/public static String getLocalIp() throws UnknownHostException {InetAddress address = InetAddress.getLocalHost();return address.getHostAddress();}/*** 处理在线人数(加锁保证数据一致性)*/public static void dealOnlineCounts(NettyServerNode serverNode,Integer counts) throws Exception {CuratorFramework zkClient = CuratorConfig.getClient();// 使用分布式读写锁InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(zkClient, "rw-lock");readWriteLock.writeLock().acquire();try {String path = "/server-list";List<String> list = zkClient.getChildren().forPath(path);for (String node : list) {String nodeValue = new String(zkClient.getData().forPath(path + "/" + node));NettyServerNode pendingNode = JsonUtils.jsonToPojo(nodeValue, NettyServerNode.class);if (pendingNode.getIp().equals(serverNode.getIp()) && pendingNode.getPort().intValue() == serverNode.getPort().intValue()) {pendingNode.setOnlineCounts(pendingNode.getOnlineCounts() + counts);String nodeJson = JsonUtils.objectToJson(pendingNode);zkClient.setData().forPath(path + "/" + node, nodeJson.getBytes());}}} finally {readWriteLock.writeLock().release();}}public static void incrementOnlineCounts(NettyServerNode serverNode) throws Exception {dealOnlineCounts(serverNode, 1);}public static void decrementOnlineCounts(NettyServerNode serverNode) throws Exception {dealOnlineCounts(serverNode, -1);}
}
3.改造 Netty 启动类
public class ChatServer {public static void main(String[] args) throws Exception {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();// 动态分配端口Integer nettyPort = selectPort(nettyDefaultPort);// 注册到 ZooKeeperZookeeperRegister.registerNettyServer("Netty-Server-List",ZookeeperRegister.getLocalIp(), nettyPort);// 启动 RabbitMQ 监听器String queueName = "queue_" + ZookeeperRegister.getLocalIp() + "_" + nettyPort;RabbitMQConnectUtils mqConnectUtils = new RabbitMQConnectUtils();mqConnectUtils.listen("fanout_exchange", queueName);try {ServerBootstrap server = new ServerBootstrap();server.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new WSServerInitializer());ChannelFuture channelFuture = server.bind(nettyPort).sync();log.info("Netty 服务启动成功,端口:{}", nettyPort);channelFuture.channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}
实现客户端负载均衡
当客户端请求连接时,我们需要从 ZooKeeper 中获取所有可用的 Netty 服务器,并选择连接数最少的服务器:
@RestController
public class NettyController {@Resource(name = "curatorClient")private CuratorFramework zkClient;@PostMapping("getNettyOnlineInfo")public GraceJSONResult getNettyOnlineInfo() throws Exception {// 从 ZooKeeper 获取所有 Netty 服务器节点String path = "/server-list";List<String> list = zkClient.getChildren().forPath(path);List<NettyServerNode> serverNodeList = new ArrayList<>();for (String node : list) {String nodeValue = new String(zkClient.getData().forPath(path + "/" + node));NettyServerNode serverNode = JsonUtils.jsonToPojo(nodeValue, NettyServerNode.class);serverNodeList.add(serverNode);}// 选择连接数最少的服务器Optional<NettyServerNode> minNodeOptional = serverNodeList.stream().min(Comparator.comparing(NettyServerNode::getOnlineCounts));NettyServerNode minNode = minNodeOptional.get();return GraceJSONResult.ok(minNode);}
}
跨服务器消息广播
1.RabbitMQ 连接工具类
public class RabbitMQConnectUtils {private final List<Connection> connections = new ArrayList<>();private final int maxConnection = 20;// RabbitMQ 连接配置private final String host = "127.0.0.1";private final int port = 5672;private final String username = "guest";private final String password = "guest";private final String virtualHost = "/";private ConnectionFactory factory;/*** 初始化连接工厂*/private void initFactory() {if (factory == null) {factory = new ConnectionFactory();factory.setHost(host);factory.setPort(port);factory.setUsername(username);factory.setPassword(password);factory.setVirtualHost(virtualHost);}}/*** 监听消息队列*/public void listen(String exchangeName, String queueName) throws Exception {Connection connection = getConnection();Channel channel = connection.createChannel();// 声明交换机(fanout 模式用于广播)channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true, false, null);// 声明队列channel.queueDeclare(queueName, true, false, false, null);// 绑定队列到交换机channel.queueBind(queueName, exchangeName, "");// 创建消费者Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {String msg = new String(body);String exchange = envelope.getExchange();if (exchange.equalsIgnoreCase(exchangeName)) {// 处理接收到的消息DataContent dataContent = JsonUtils.jsonToPojo(msg, DataContent.class);String senderId = dataContent.getChatMsg().getSenderId();String receiverId = dataContent.getChatMsg().getReceiverId();// 发送给接收者List<Channel> receiverChannels = UserChannelSession.getMultiChannels(receiverId);UserChannelSession.sendToTarget(receiverChannels, dataContent);// 同步到发送者的其他设备String currentChannelId = dataContent.getExtend();List<Channel> senderChannels = UserChannelSession.getMyOtherChannels(senderId, currentChannelId);UserChannelSession.sendToTarget(senderChannels, dataContent);}}};// 开始消费channel.basicConsume(queueName, true, consumer);}
}
2.消息发布工具类
public class MessagePublisher {// 定义交换机名称public static final String EXCHANGE = "pitayafruits_exchange";public static final String FANOUT_EXCHANGE = "fanout_exchange";// 定义路由键public static final String ROUTING_KEY_SEND = "pitayafruits.wechat.send";/*** 发送消息到数据库保存*/public static void sendMsgToSave(ChatMsg msg) throws Exception {RabbitMQConnectUtils connectUtils = new RabbitMQConnectUtils();connectUtils.sendMsg(JsonUtils.objectToJson(msg),EXCHANGE,ROUTING_KEY_SEND);}/*** 广播消息到所有 Netty 服务器*/public static void sendMsgToNettyServers(String msg) throws Exception {RabbitMQConnectUtils connectUtils = new RabbitMQConnectUtils();connectUtils.sendMsg(msg, FANOUT_EXCHANGE, "");}
}
3. 改造消息处理器
在 ChatHandler
中处理用户连接、消息发送和断线:
public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {public static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);@Overrideprotected void channelRead0(ChannelHandlerContext ctx,TextWebSocketFrame msg) throws Exception {String content = msg.text();DataContent dataContent = JsonUtils.jsonToPojo(content, DataContent.class);ChatMsg chatMsg = dataContent.getChatMsg();Integer msgType = chatMsg.getMsgType();Channel currentChannel = ctx.channel();String currentChannelId = currentChannel.id().asLongText();String senderId = chatMsg.getSenderId();if (msgType == MsgTypeEnum.CONNECT_INIT.type) {// 用户初次连接UserChannelSession.putMultiChannels(senderId, currentChannel);UserChannelSession.putUserChannelIdRelation(currentChannelId, senderId);// 更新在线人数NettyServerNode minNode = dataContent.getServerNode();ZookeeperRegister.incrementOnlineCounts(minNode);// 保存用户与服务器的映射关系到 RedisJedis jedis = JedisPoolUtils.getJedis();jedis.set(senderId, JsonUtils.objectToJson(minNode));} else if (msgType == MsgTypeEnum.WORDS.type || msgType == MsgTypeEnum.IMAGE.type|| msgType == MsgTypeEnum.VIDEO.type|| msgType == MsgTypeEnum.VOICE.type) {// 生成消息IDSnowflake snowflake = new Snowflake(new IdWorkerConfigBean());chatMsg.setMsgId(snowflake.nextId());// 设置服务器时间chatMsg.setChatTime(LocalDateTime.now());dataContent.setChatMsg(chatMsg);dataContent.setExtend(currentChannelId);// 广播消息到所有 Netty 服务器MessagePublisher.sendMsgToNettyServers(JsonUtils.objectToJson(dataContent));// 保存消息到数据库MessagePublisher.sendMsgToSave(chatMsg);}}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {Channel currentChannel = ctx.channel();String userId = UserChannelSession.getUserIdByChannelId(currentChannel.id().asLongText());// 移除用户会话UserChannelSession.removeUserChannels(userId, currentChannel.id().asLongText());clients.remove(currentChannel);// 更新在线人数Jedis jedis = JedisPoolUtils.getJedis();NettyServerNode serverNode = JsonUtils.jsonToPojo(jedis.get(userId), NettyServerNode.class);ZookeeperRegister.decrementOnlineCounts(serverNode);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {// 异常处理逻辑与 handlerRemoved 相同handlerRemoved(ctx);}
}
小结
通过本文的实践,我们成功将单体 Netty 服务改造成了高可用的分布式集群架构。整个方案利用了 ZooKeeper 的分布式协调能力实现服务注册与发现,通过 RabbitMQ 的广播模式解决了跨服务器消息传递的难题,并使用 Redis 实现了动态端口分配。这套架构已经能够满足大部分即时通讯场景的需求。当然,在超大规模场景下,还可以进一步优化,比如引入更智能的负载均衡策略、实现跨地域部署等。希望本文的实践经验能为大家在构建高性能即时通讯服务时提供参考。