文章目录
- 一、Netty服务器端启动细节分析
- 1.1 实现一个简单的http服务器
- 1.2 服务器端启动细节分析
- 1.3 创建与初始化 NioServerSocketChannel
- 1.3.1 **通过反射工厂创建 Channel**:
- 1.3.2 **初始化 Channel**
- 1.4 注册到 Boss EventLoopGroup
- 1.4.1 **异步提交注册任务**
- 1.4.2 **EventLoop 执行注册**
- 1.4.3 触发 Handler 添加事件
- 1.4.4 触发 Channel 注册成功事件
- 1.5 绑定端口与激活
- 1.5.1 执行绑定操作
- 1.5.2 标记 Channel 为激活状态
- 1.5.3 触发 Channel 激活事件
- 1.5.4 自动注册ACCEPT事件
- 1.6 接收连接 - ServerBootstrapAcceptor 的工作
- 1.7 总结与精要分析
推荐阅读:
【01】Netty从0到1系列之I/O模型
【02】Netty从0到1系列之NIO
【03】Netty从0到1系列之Selector
【04】Netty从0到1系列之Channel
【05】Netty从0到1系列之Buffer(上)
【06】Netty从0到1系列之Buffer(下)
【07】Netty从0到1系列之零拷贝技术
【08】Netty从0到1系列之整体架构、入门程序
【09】Netty从0到1系列之EventLoop
【10】Netty从0到1系列之EventLoopGroup
【11】Netty从0到1系列之Future
【12】Netty从0到1系列之Promise
【13】Netty从0到1系列之Netty Channel
【14】Netty从0到1系列之ChannelFuture
【15】Netty从0到1系列之CloseFuture
【16】Netty从0到1系列之Netty Handler
【17】Netty从0到1系列之Netty Pipeline【上】
【18】Netty从0到1系列之Netty Pipeline【下】
【19】Netty从0到1系列之Netty ByteBuf【上】
【20】Netty从0到1系列之Netty ByteBuf【中】
【21】Netty从0到1系列之Netty ByteBuf【下】
【22】Netty从0到1系列之Netty 逻辑架构【上】
【23】Netty从0到1系列之Netty 逻辑架构【下】
一、Netty服务器端启动细节分析
1.1 实现一个简单的http服务器
目标
- 完成
http
服务器,请求-响应的过程.
作为开发者来说,只要照葫芦画瓢即可轻松上手。大多数场景下,你只需要实现与业务逻辑相关的一系列 ChannelHandler
,再加上 Netty 已经预置了 HTTP 相关的编解码器就可以快速完成服务端框架的搭建。所以,我们只需要两个类就可以完成一个最简单的 HTTP 服务器,它们分别为服务器启动类和业务逻辑处理类
,结合完整的代码实现我将对它们分别进行讲解。
- 服务器端代码
package cn.tcmeta.httpserver;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpContentCompressor;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;/*** @author: laoren* @description: 实现一个简单的http服务器* @version: 1.0.0*/
public class MyHttpSimpleServer {public void doStart(int port) throws InterruptedException {NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);NioEventLoopGroup workerGroup = new NioEventLoopGroup(2);try {ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();// 编码、解码处理器pipeline.addLast("codec", new HttpServerCodec());// 压缩处理器pipeline.addLast("compressor", new HttpContentCompressor());// 合并处理器pipeline.addLast("aggregator", new HttpObjectAggregator(1024 * 1024));// 自定义处理器pipeline.addLast("handler", new MyHttpServerHandler());}}).childOption(ChannelOption.SO_KEEPALIVE, true);ChannelFuture future = bootstrap.bind(port).sync();System.out.println("✅ MyHttpSimpleServer 启动,端口: " + port);future.channel().closeFuture().sync();}finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}public static void main(String[] args) {try {new MyHttpSimpleServer().doStart(8080);} catch (InterruptedException e) {throw new RuntimeException(e);}}}
- 自定义处理器代码
package cn.tcmeta.httpserver;import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;/*** @author: laoren* @description: 自定义处理类* @version: 1.0.0*/
@Slf4j
public class MyHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {String content =String.format("Receive http request, uri: %s, method: %s, content: %s%n", request.uri(), request.method(), request.content().toString(CharsetUtil.UTF_8));log.info("uri: {}, request method: {}, content: {}", request.uri(), request.method(), request.content());// 处理响应事件FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.OK,Unpooled.wrappedBuffer(content.getBytes()));ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);}
}
- 测试示例
-
客户端测试
-
服务器端日志
✅ MyHttpSimpleServer 启动,端口: 8080
15:47:34 [INFO ] [nioEventLoopGroup-3-1] c.t.h.MyHttpServerHandler - uri: /bad-request, request method: GET, content: UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 0, cap: 0)Process finished with exit code 130
1.2 服务器端启动细节分析
- 阶段一: 创建与初始化 NioServerSocketChannel
- 阶段二: 注册到 Boss EventLoopGroup
- 阶段三: 绑定端口与激活
- 阶段四: 接收连接 - ServerBootstrapAcceptor 的工作
1.3 创建与初始化 NioServerSocketChannel
当调用 ServerBootstrap.bind(port)
时,Netty 并不会立即进行网络调用。第一步是创建服务端的“大门”。
1.3.1 通过反射工厂创建 Channel:
ServerBootstrap
通过其内部的ChannelFactory
(通常是ReflectiveChannelFactory
)来创建一个新的NioServerSocketChannel
实例。
相关核心代码:
- io/netty/bootstrap/AbstractBootstrap.java
final ChannelFuture initAndRegister() {Channel channel = null;try {// 1. 通过反射创建NioServerSocketChannel对象channel = channelFactory.newChannel();init(channel);} catch (Throwable t) {}
- io/netty/channel/ReflectiveChannelFactory.java
@Override
public T newChannel() {try {return constructor.newInstance();} catch (Throwable t) {throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);}
}
- 底层结节实现
在 NioServerSocketChannel
的构造函数中,Netty 会做几件关键事:
- SocketChannel.open(): 打开一个新的 Java NIO
ServerSocketChannel
。 .configureBlocking(false)
: 立即将其设置为非阻塞模式。这是异步编程的基础。- 创建该 Channel 的核心组件:
ChannelId
(唯一标识)、Unsafe
(负责底层读写操作)、以及最重要的DefaultChannelPipeline
。
核心代码:
protected AbstractChannel(Channel parent) {this.parent = parent;id = newId();unsafe = newUnsafe();pipeline = newChannelPipeline();
}
rotected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent);this.ch = ch;this.readInterestOp = readInterestOp;try {ch.configureBlocking(false); // 设置为非阻塞模式} catch (IOException e) {try {ch.close();} catch (IOException e2) {logger.warn("Failed to close a partially initialized socket.", e2);}throw new ChannelException("Failed to enter non-blocking mode.", e);}
}
1.3.2 初始化 Channel
- 创建完成后,
ServerBootstrap
会调用init(Channel channel)
方法来初始化这个新 Channel。- 设置 ChannelOptions 和 Attributes: 将
ServerBootstrap.option()
和ServerBootstrap.attr()
中设置的参数应用到该 Channel 的配置中。- 设置 Pipeline 的“大门保安”:
- 这是最关键的一步。Netty 会通过
pipeline.addLast()
方法将一个特殊的ChannelHandler
——ServerBootstrapAcceptor
——添加到 Channel 的 Pipeline 中。ServerBootstrapAcceptor
的职责: 它本身是一个ChannelInboundHandler
。它的唯一使命就是在“有连接到来”这个入站事件被触发时,将新接入的客户端连接(SocketChannel
)正式接手过来。我们会在阶段三详细讲解它。
- 核心代码
final ChannelFuture initAndRegister() {Channel channel = null;try {channel = channelFactory.newChannel();init(channel); // 初始化Channel} catch (Throwable t) {}
@Override
void init(Channel channel) {// 设置ChannelOptions setChannelOptions(channel, newOptionsArray(), logger);// 设置AttributessetAttributes(channel, newAttributesArray());// 获取绑定的PipelineChannelPipeline p = channel.pipeline();final EventLoopGroup currentChildGroup = childGroup;final ChannelHandler currentChildHandler = childHandler;final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);final Collection<ChannelInitializerExtension> extensions = getInitializerExtensions();p.addLast(new ChannelInitializer<Channel>() {@Overridepublic void initChannel(final Channel ch) {final ChannelPipeline pipeline = ch.pipeline();ChannelHandler handler = config.handler();if (handler != null) {pipeline.addLast(handler);}ch.eventLoop().execute(new Runnable() {@Overridepublic void run() {// 将ServerBootstrapAcceptor添加到Pipeline中pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs,extensions));}});}});if (!extensions.isEmpty() && channel instanceof ServerChannel) {ServerChannel serverChannel = (ServerChannel) channel;for (ChannelInitializerExtension extension : extensions) {try {extension.postInitializeServerListenerChannel(serverChannel);} catch (Exception e) {logger.warn("Exception thrown from postInitializeServerListenerChannel", e);}}}
}
至此,一个功能完备、配置齐全的 NioServerSocketChannel
就准备好了,但它还没有被赋予“生命”(即注册到 EventLoop)。
1.4 注册到 Boss EventLoopGroup
这是 Netty 异步设计的核心体现。注册操作本身是异步的
1.4.1 异步提交注册任务
ServerBootstrap
将AbstractUnsafe.register0()
操作封装成一个任务,提交给BossGroup
中的一个EventLoop
。- 主线程(调用
bind()
的线程)在此刻立即返回一个ChannelFuture
,它可以用来监听注册(和后续绑定)是否成功。主线程不会被阻塞
注册不会阻塞主线程,直接返回一个ChannelFuture
ChannelFuture regFuture = config().group().register(channel);if (regFuture.cause() != null) {if (channel.isRegistered()) {channel.close();} else {channel.unsafe().closeForcibly();}}
-
bossGroup中的NioEventLoop
-
AbstractUnsafe.register0()注册的EventLoop
1.4.2 EventLoop 执行注册
- 被选中的
EventLoop
线程会在其运行周期内的某个时间点执行这个注册任务。- 真正的注册: 在
doRegister()
方法中,会调用 Java NIO 的ServerSocketChannel.register(eventLoop.selector, 0, this)
。
- 关键点 1: 将 JDK 的
ServerSocketChannel
注册到EventLoop
持有的Selector
上。- 关键点 2: 兴趣操作为
0
,表示暂时不监听任何事件。这是因为此时 Channel 还未激活(未绑定端口),监听OP_ACCEPT
没有意义。- 关键点 3: 附件(Attachment) 设置为 Netty 自己的
NioServerSocketChannel
对象。这样,当Selector
返回事件时,Netty 可以直接从SelectionKey
中取出附件,就知道是哪个 Netty Channel 发生了事件。
- io/netty/channel/AbstractChannel#register0
private void register0(ChannelPromise promise) {try {// check if the channel is still open as it could be closed in the mean time when the register// call was outside of the eventLoopif (!promise.setUncancellable() || !ensureOpen(promise)) {return;}boolean firstRegistration = neverRegistered;doRegister(); // ✅✅✅✅✅ 核心注册方法neverRegistered = false;registered = true;// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the// user may already fire events through the pipeline in the ChannelFutureListener.pipeline.invokeHandlerAddedIfNeeded();safeSetSuccess(promise);pipeline.fireChannelRegistered();// Only fire a channelActive if the channel has never been registered. This prevents firing// multiple channel actives if the channel is deregistered and re-registered.if (isActive()) {if (firstRegistration) {pipeline.fireChannelActive();} else if (config().isAutoRead()) {// This channel was registered before and autoRead() is set. This means we need to begin read// again so that we process inbound data.//// See https://github.com/netty/netty/issues/4805beginRead();}}} catch (Throwable t) {// Close the channel directly to avoid FD leak.closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}
}
- io/netty/channel/nio/AbstractNioChannel#doRegister()
@Overrideprotected void doRegister() throws Exception {boolean selected = false;for (;;) {try {// 参数1: Selector// 参数2: 0: 表示暂时不监听任何事件。这是因为此时 Channel 还未激活(未绑定端口),监听 OP_ACCEPT 没有意义。// 参数3: this: 附件(Attachment) 设置为 Netty 自己的 NioServerSocketChannel 对象。这样,当 Selector 返回事件时,Netty 可以直接从 SelectionKey 中取出附件,就知道是哪个 Netty Channel 发生了事件selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);return;} catch (CancelledKeyException e) {if (!selected) {// Force the Selector to select now as the "canceled" SelectionKey may still be// cached and not removed because no Select.select(..) operation was called yet.eventLoop().selectNow();selected = true;} else {// We forced a select operation on the selector before but the SelectionKey is still cached// for whatever reason. JDK bug ?throw e;}}}}
- register签名
// 参数1: Selector
// 参数2: 监听的事件
// 参数3: 附加数据
public abstract SelectionKey register(Selector sel, int ops, Object att)throws ClosedChannelException;
1.4.3 触发 Handler 添加事件
- 注册成功后,
EventLoop
线程会回调ChannelHandler.handlerAdded(ctx)
方法。对于ServerBootstrapAcceptor
来说,此时它正式“上岗”。
1.4.4 触发 Channel 注册成功事件
- 最后,
EventLoop
线程会通过Pipeline
从Head
开始向后传播channelRegistered
事件。此时,所有用户添加到服务端 Channel 的ChannelHandler
都会感知到注册成功。
1.5 绑定端口与激活
注册成功后,才开始真正的绑定。
1.5.1 执行绑定操作
- 同样在
EventLoop
线程中,调用doBind()
方法。 - 其内部最终会调用 JDK 的
ServerSocketChannel.bind(socketAddress, backlog)
方法,真正地将套接字绑定到指定的端口。
- 注册成功之后,进行绑定操作,调用
doBind
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {Throwable cause = future.cause();if (cause != null) {// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an// IllegalStateException once we try to access the EventLoop of the Channel.promise.setFailure(cause);} else {// Registration was successful, so set the correct executor to use.// See https://github.com/netty/netty/issues/2586promise.registered();// 调用它doBind0(regFuture, channel, localAddress, promise);}}
});
1.5.2 标记 Channel 为激活状态
- 绑定成功后,调用
setActive(true)
将 Channel 标记为激活状态。
1.5.3 触发 Channel 激活事件
- 通过
Pipeline
从Head
开始向后传播channelActive
事件
1.5.4 自动注册ACCEPT事件
- 这是一个精妙的设计!
channelActive
事件最终会传播到Pipeline
的HeadContext
。 HeadContext
的channelActive
方法中会调用readIfIsAutoRead()
。- 这个方法调用会层层向下,最终到达底层的
AbstractNioChannel
,其doBeginRead()
方法会被调用。 - 最终操作:
doBeginRead()
会修改之前注册到Selector
上的SelectionKey
的兴趣操作:selectionKey.interestOps(interestOps | OP_ACCEPT)
。 - 至此,Netty 服务端才开始正式监听
OP_ACCEPT
事件,准备接收客户端的连接。
1.6 接收连接 - ServerBootstrapAcceptor 的工作
当客户端发起连接,BossGroup
的 EventLoop
轮询到 OP_ACCEPT
事件时,真正的“魔法”开始了。
-
创建子 Channel:
EventLoop
线程会调用NioServerSocketChannel
的read()
方法,其内部通过ServerSocketChannel.accept()
创建一个 JDK 的SocketChannel
,并同样被包装成 Netty 的NioSocketChannel
。 -
“Acceptor” 接手: 这个新创建的
NioSocketChannel
会作为入站数据,在Pipeline
中传播。它首先到达Head
,然后流经ServerBootstrapAcceptor
。 -
配置并注册子 Channel:
ServerBootstrapAcceptor
的channelRead
方法被触发,它负责:- 将
ServerBootstrap.childOptions()
和childAttrs()
设置到子 Channel 上。 - 将
ServerBootstrap.childHandler()
中设置的ChannelInitializer
添加到子 Channel 的 Pipeline 中。
- 将
-
移交完成: 自此,这个客户端连接生命周期的所有后续 I/O 事件(读、写、断开)都将由
WorkerGroup
中的那个EventLoop
全权负责。BossGroup
的职责圆满完成。
1.7 总结与精要分析
阶段 | 核心操作 | 执行线程 | 关键细节 |
---|---|---|---|
创建初始化 | 创建 NioServerSocketChannel | 主线程 | 配置非阻塞,创建 Pipeline,添加 ServerBootstrapAcceptor |
注册 | 将 Channel 注册到 Selector | BossGroup 线程 | 兴趣操作为 0 ,附件为 Netty Channel,异步操作 |
绑定 | 调用 JDK bind | BossGroup 线程 | 绑定端口,触发 channelActive 事件 |
激活监听 | 注册 OP_ACCEPT 事件 | BossGroup 线程 | 由 HeadContext 在 channelActive 事件中自动完成 |
接收连接 | ServerBootstrapAcceptor 工作 | BossGroup 线程 | 接收连接,配置子 Channel,将其转交给 WorkerGroup |
设计理念与精妙之处:
- 职责分离 (Separation of Concerns):
BossGroup
只管接收连接,WorkerGroup
只管处理连接,架构清晰,易于理解和扩展。 - 完全异步: 从注册到绑定,所有耗时操作都封装成任务提交给
EventLoop
,保证启动过程不阻塞主线程。 - 惰性监听: 先注册(
interestOps=0
),后绑定,再设置OP_ACCEPT
。这是一个严谨的状态管理流程,避免了在未准备好时接收到事件。 - 统一的抽象: 无论是服务端 Channel 还是客户端 Channel,都通过
ChannelPipeline
和EventLoop
进行管理,提供了极其一致和灵活的编程模型。 - 高效的连接接收:
ServerBootstrapAcceptor
内置于框架中,将接收连接和后续处理完美衔接,整个过程都在EventLoop
线程内完成,无比高效。
Netty 的启动过程是其 Reactor 线程模型的完美体现,每一个步骤都经过精心设计,以确保在高并发场景下能够实现最高的性能和可靠性