Netty 服务端启动

服务端启动代码

1
2
3
4
5
6
7
8
9
10
11
12
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
// ...
}
}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture channelFuture = bootstrap.bind(new InetSocketAddress(port)).sync();

启动过程分析

跟踪ServerBootstrap.bind()方法,追到AbstractBootstrap.doBind()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private ChannelFuture doBind(final SocketAddress localAddress) {
// 创建channel并且注册selector
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
// 将ServerSocketChannel绑定到本地的端口
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// ...
}
}

跟踪initAndRegister()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Channel channel = null;
try {
// 创建channel
channel = channelFactory.newChannel();
// 初始化channel
init(channel);
} catch (Throwable t) {
if (channel != null) {
channel.unsafe().closeForcibly();
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
// 注册channel到selector
ChannelFuture regFuture = config().group().register(channel);

这里先通过工厂创建了一个Channel(这里服务端指定类型NioServerSocketChannel),然后初始化这个Channel。最后把这个Channel注册到EventLoopGroup上(config().group()返回的是我们设置的Boss Group

创建channel

查看NioServerSocketChannel构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public NioServerSocketChannel() {
//调用下面这个方法
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
public NioServerSocketChannel(ServerSocketChannel channel) {
//注意,这里是初始化了一个对SelectionKey.OP_ACCEPT感兴趣的管道
//Boss的Reactor线程轮询到都是ACCEPT事件
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
private static ServerSocketChannel newSocket(SelectorProvider provider) {
return provider.openServerSocketChannel();
}

newSocket使用SelectorProvideropenServerSocketChannel打开服务器套接字通道。SelectorProvider主要工作是根据操作系统类型和版本选择合适的Provider:如果Linux内核版本>=2.6则,具体的SelectorProviderEPollSelectorProvider,否则为默认的PollSelectorProvider

继续追溯super方法到AbstractChannel,如下方法:

1
2
3
4
5
6
7
protected AbstractChannel(Channel parent) {
this.parent = parent;
// ChannelId是一个全局唯一的值
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}

这里主要做了2件事:

  • 创建NioMessageUnsafe实例(客户端创建NioByteUnsafe),该类为Channel提供了用于完成网络通讯相关的底层操作,如connect(),read(),register(),bind(),close()等;

  • 给该Channel创建DefaultChannelPipeline并初始化,这里用的是默认Pipeline

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise = new VoidChannelPromise(channel, true);
    // 创建tail
    tail = new TailContext(this);
    // 创建head
    head = new HeadContext(this);
    head.next = tail;
    tail.prev = head;
    }

    具体的ChannelPipeline执行流程看前篇文章

到这里整个Channel创建就结束了。

总结一下:

  1. 通过工厂创建一个Channel,这里的Channel由于是在服务端,使用的是NioServerSocketChannel
  2. 这个管道的构造方法中,会初始化一个原生的ServerSocketChannel,最后初始化pipelineunsafe
  3. 初始化pipeline的时候,又会初始化pipeline中的HeadTail

初始化channel

服务端init()实现方法在ServerBootstrap中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void init(Channel channel) throws Exception {
// 添加一个handle-ChannelInitializer
p.addLast(new ChannelInitializer<Channel>() {
@Override
// 这里还并未执行initChannel方法,该方法是在执行回调方法handlerAdded后调用的
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}

服务端的init()分为2个步骤:

  • bootstrap配置的属性设置到Channel上面
  • NioServerSocketChannelpipeline里,添加了一个ChannelHandle-ChannelInitializer,供注册后使用

初始化结束,此时NioServerSocketChannelpipeline链表结构为:

1
Head <--> ChannelInitializer <--> Tail

注册channel

1
ChannelFuture regFuture = config().group().register(channel);

主要将创建并初始化后的Channel注册到selector上面:

  • Channel注册到EventLoopselector上;
  • 触发Pipeline上面ChannelHandlerchannelRegistered

这里的config().group()结果是NioEventLoopGroup,父类是MultithreadEventLoopGroup

1
2
3
public ChannelFuture register(Channel channel) {
return next().register(channel);
}

这里的next()方法最后执行到chooser.next()chooser有两种:PowerOfTwoEventExecutorChooserGenericEventExecutorChooser。如果该EventLoopGroupEventLoop是2的倍数则选择PowerOfTwoEventExecutorChooser(2的倍数方便位操作);因为这里的GroupNioEventLoopGroup,所以next()返回的是NioEventLoop

继续看NioEventLoop.register(),具体方法实现在SingleThreadEventLoop中:

1
2
promise.channel().unsafe().register(this, promise);
return promise;

最终使用channel中的unsafe()注册,具体方法实现在AbstractUnsafe中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private void register0(ChannelPromise promise) {
try {
boolean firstRegistration = neverRegistered;// 是否为首次注册
// 实际的注册动作, 把Channel感兴趣的事件注册到Eventloop.selector上面
doRegister();
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();// 将注册之前加入的handler加入进来
safeSetSuccess(promise); // 注册成功,通知promise
pipeline.fireChannelRegistered();// pipeline通知触发注册成功
if (isActive()) { // 是否已经绑定 因为register和bind阶段是异步的
if (firstRegistration) {
pipeline.fireChannelActive(); // 首次注册,通知
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}

invokeHandlerAddedIfNeeded: 注册成功后,找到初始化阶段通过pipeline.addLast()加入的ChannelInitializer,执行其ChannelInitializerinitChannel方法(添加ChannelHandle-ServerBootstrapAcceptor),之后将ChannelInitializerpipeline中删除

注册结束,此时NioServerSocketChannelpipeline链表结构为:

1
Head <--> ServerBootstrapAcceptor <--> Tail

fireChannelRegistered沿着pipelineheadtail,调用ChannelHandler.channelRegistered方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public final ChannelPipeline fireChannelRegistered() {
AbstractChannelHandlerContext.invokeChannelRegistered(head);
return this;
}
private void invokeChannelRegistered() {
if (invokeHandler()) { // 状态是否正确
try {
((ChannelInboundHandler) handler()).channelRegistered(this); // 触发
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRegistered();// 状态不正确,通知下一个Handler
}
}

fireChannelActive 由于注册阶段和绑定bind阶段都是异步的,如果此时注册完成时bind阶段已经绑定了本地端口,会沿着pipelineheadtail,调用各个ChannelHandlerchannelActive方法

bind本地地址

NioServerSocketChannel内部ServerSocketChannel绑定到本地的端口上面,然后调用fireChannelActive通知pipeline里的ChannelHandle,执行其channelActive方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}

AbstractBootstrapdoBind0(),内部会调用pipeline中的bind方法,逻辑为从tail出发调用outboundChannelHandlerbind方法。当前pipeline链表结构中只有Head是继承了outbound接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
// 找出下一个outbound,这里找出来的是HeadContext
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, null);
}
return promise;
}

Headbind方法由NioServerSocketChannel创建过程中生成的unsafe的实例NioMessageUnsafe来执行,该实例的bind方法继承于AbstractUnsafe.bind(),然后触发fireChannelActive

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// AbstractUnsafe.class
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
boolean wasActive = isActive();
try {
// 把channel bind到本地地址
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
// bind之后,这里的isActive会返回true
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Overridem
public void run() {
//在这里触发的是NioServerSocketChannel对应的pipeline的channelActive
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}

这里的doBind(localAddress)方法由创建的NioServerSocketChannel执行

1
2
3
4
5
6
7
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}

这里把java原生的ServerSocketChannel绑定到本地地址上

绑定到本地地址后会执行pipeline.fireChannelActive()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// DefaultChannelPipeline.class
public final ChannelPipeline fireChannelActive() {
AbstractChannelHandlerContext.invokeChannelActive(head);
return this;
}
static void invokeChannelActive(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelActive();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelActive();
}
});
}
}
private void invokeChannelActive() {
if (invokeHandler()) {
try {
// 这里handler()返回HeadContext
((ChannelInboundHandler) handler()).channelActive(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelActive();
}
}

这里的channelInactive会调用HeadContext类的channelInactive方法:

1
2
3
4
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
readIfIsAutoRead();
}

这里首先执行fireChannelActive方法,沿着pipeline传播给后续ChannelInboundHandler,执行hannelActive()方法;接着执行readIfIsAutoRead(),可以发现这个方法作用在pipeline上,从tailhead遍历ChannelOutboundHandler执行read()方法,我们这里只有headoutBoundread()方法最后进入AbstractNioChannel.doBeginRead()

1
2
3
4
5
6
7
8
9
10
11
12
protected void doBeginRead() throws Exception {
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
// interestOps是0,按位与结果肯定是0
selectionKey.interestOps(interestOps | readInterestOp);
}
}

可以发现在执行doRegister()注册这个Channelselector的时候,selectKey赋予了0

1
2
3
4
5
6
7
8
9
10
11
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// selectKey赋予了0
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
}
}
}

这里readInterestOp这个哪来的:在创建NioServerSocketChannel的时候

1
2
3
4
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

在这里重新注册了accept事件,也就是说从此开始,selector再轮询到新的连接接入事件,就可以交给这个NioServerSocketChannel来处理了

至此,Netty服务器段已经启动,ChannelChannelPipeline已经建立,EventLoop也在不断的select()查找准备好的I/O。

总结

  1. 通过工厂生成NioServerSocketChannel
  2. 设置NioServerSocketChannelaccept事件感兴趣。此时Channel已经绑定了unsafepipeline
  3. 初始化Channel,这里主要是设置Channelpipeline中的handlerattroption。这里的handler是特殊类型ChannelInitializer,等待注册后回调。
  4. 注册到EventLoopGroup里,最终是注册到某一个NioEventLoop上面
  5. 在注册时实际上的register0操作是异步的,register0的主要作用是把原生Channel注册到原生selector上。
  6. 执行doBind0也是异步只想,这里其实是和register0一起在执行的。doBind0是把channel注册到本地端口上
  7. 执行pipeline.read()方法,最终传递到AbstractNioChannel,重新向selector注册OP_ACCEPT事件

热评文章