Netty 初始化

初始化

对于Client的初始化过程,本章主要会介绍怎样初始化ChannelPipeline以及与Server建立连接。
Server初始化过程比Client端复杂一些,它分为两部分,分为Acceptor的处理链初始化过程和Reactor链的初始化过程。绑定端口提供服务的过程。

Client

首先来看Client的初始化代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 配置事件组,即线程组,会在构造函数中创建线程,默认情况下他会创建 CPU数*2个 线程
EventLoopGroup group = new NioEventLoopGroup();
try {
// Client端启动辅助类
Bootstrap b = new Bootstrap();
// 设置Client端线程池,一般一个线程负责至少一个Channel
b.group(group)
// 设置 Client 的 SocketChannel,也可以选择 像 EpollSocketChannel这种!
.channel(NioSocketChannel.class)
//添加配置项
.option(ChannelOption.TCP_NODELAY, true)
// 设置 ChannelHandler,这里的 ChannelInitializer不是最终最终编制在 Pipeline上的ChannelHandler,而只是起一个初始化作用的Handler,后文中会细说
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
//在其中添加业务定制的Handler
p.addLast(new EchoClientHandler());
}
});

// 发起连接
ChannelFuture f = b.connect(HOST, PORT).sync();
// 阻塞等待,一直等到链接关闭
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}

Bootstrap类是Client端的启动辅助类。
整个过程中,分为两大步,第一步进行参数设置,设置Bootstrap的参数,第二步则是发起连接。
Bootstrap参数设置里,可以看到业务方配置了group(客户端事件线程池EventLoopGroup,后面单领出来讲解)、Channel的类型(比如对于NIOEpollChannel)和 ChannelHandler(这里的ChannelInitializer 是给业务放提供的入口,让业务将自定义的ChannelHandler编制进Pipeline)。

发起连接: 利用设置的参数进行初始化处理链,注册事件、开启事件线程、发起连接的过程:

1
2
3
4
5
6
7
8
9
Bootstrap.connect
--> Bootstrap.doResolveAndConnect
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
// 初始化Channel,并将Channel注册到事件线程中
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
// 解析地址 并创建连接
return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
}

执行InitAndRegister方法主要是初始化Channel并与某个EventLoop绑定。执行doResolveAndConnect0方法用于解析地址并创建连接。下面就来分别详细解析这两个方法:

1
2
3
4
5
6
7
8
9
10
11
final ChannelFuture initAndRegister() {
Channel channel = null;
channel =
// 根据之前设置的Channel类型实例化一个Channel然后根绝配置的option设置参数
channelFactory.newChannel();
//参数设置,options
init(channel);
// 将channel和一个EventLoop绑定
ChannelFuture regFuture = config().group().register(channel);
return regFuture;
}

register方法最终会调用MultithreadEventLoopGroup.register方法,而其只是按轮询的方式选择一个SingleThreadEventLoop执行register方法。

1
2
3
4
5
6
7
8
9
10
11
12
public ChannelFuture MultithreadEventExecutorGroup.register(Channel channel) {
return next().register(channel);
}
public ChannelFuture register(Channel channel) {
//将Channel包装进ChannelPromise中,其主作用主要起一个回调的作用,比如当完成连接后会触发相应的监听器
return register(new DefaultChannelPromise(channel, this));
}
public ChannelFuture SingleThreadEventLoop.register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}

可以看到register方法最终会调用Channel里的UnsafeUnsafe是操作底层)的register方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
}
}

这里首先给Channel赋一个EventLoop,因为是启动线程不是该EventLoop,所以将register操作丢给该EventLoop执行。
SingleThreadEventLoop.execute方法里:
可以看到当前线程没启动的时候会开启该线程,并将任务丢进EventLoop的任务队列中,EventLoop线程执行时从任务队列中获取任务执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void execute(Runnable task) {
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
// 开启线程
startThread();
// 将注册任务加到任务队列中
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}

执行regsiter0方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private void register0(ChannelPromise promise) {
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
safeSetSuccess(promise);
// 触发 pipleline的编制,将业务自定义的handler加入到pipeline中
pipeline.fireChannelRegistered();
// 当channel重新注册的时候,已经连接
if (isActive()) {
if (firstRegistration) {
//触发 激活channel Active事件
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
//
beginRead();
}
}
}
protected void doRegister() throws Exception {
// 真正的注册,将channel注册到 EventLoop中的Selector上
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
}

完成初始化处理链后,EchoClient的处理链如下:

完成注册后,则开始创建连接(Bootstrap.doConnect),

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private static void Bootstrap.doConnect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {
// 在register之后才会执行,排在register0任务之后
final Channel channel = connectPromise.channel();
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (localAddress == null) {
channel.connect(remoteAddress, connectPromise);
} else {
channel.connect(remoteAddress, localAddress, connectPromise);
}
connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
});
}

可以看到最终是会执行Channel.connect方法:

1
2
3
4
5
6
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return pipeline.connect(remoteAddress, promise);
}
public final ChannelFuture DefaultChannelPipeline.connect(SocketAddress remoteAddress, ChannelPromise promise) {
return tail.connect(remoteAddress, promise);
}

Channel.connect方法则开始执行处理链,可以看到其从TailContext往前执行,执行ChannelOutboundHandler.connect方法,最终会执行HeadContext.connect方法。而其调用Unsafe.connect方法进行连接。最终调用原生SocketChannel.connect方法进行连接,连接成功后设置Promise状态通知监听者并执行pipeline.fireChannelActive

Pipeline.fireChannelActive是由HeadContext往前传递的,其会触发EchoClientHandlerchannelActive方法,然后再会触发readIfAutoRead方法,将NioSocketChannel中的selectionKey设置为SelectionKey.OP_READ(对于ServerSocketChannel则设置为OP_ACCEPT),以监听网络读事件。

1
2
3
HeadContext->EchoClientHandler: channelActive
EchoClientHandler->EchoClientHandler:writeAndFlush(发送第一个消息)
HeadContext->HeadContext:readIfAutoRead(设置读事件)

到此Client初始化的过程就讲完了,下面我们将来讲解服务端内容。

Server

还是以Echo为例来讲解此过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// acceptor事件组
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
//reactor事件组
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
//设置 ServerSocketChannel
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
// 在 Acceptor 的处理链上添加 Handler
.handler(new LoggingHandler(LogLevel.INFO))
// 设置 reactor的处理链
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new EchoServerHandler());
}
});
// 绑定接口提供服务
ChannelFuture f = b.bind(PORT).sync();
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}

ServerBootstrapServer端的辅助启动类。用户使用的时候会配置 AcceptorReactor的事件线程组(boosGroup的线程个数和监听端口数相等)。设置ServerSocketChannel类型,设置参数Channel的参数option,然后在Acceptor的处理链上中添加Handler。最后childHandler配置Reactor处理链,再其上添加业务方定制的Handler

bind方法最终执行的是doBind方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private ChannelFuture doBind(final SocketAddress localAddress) {
//
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
ChannelPromise promise = channel.newPromise();
//绑定端口,开启服务
doBind0(regFuture, channel, localAddress, promise);
return promise;
}
final ChannelFuture initAndRegister() {
Channel channel = channelFactory.newChannel();
//调用 ServerBootStrap.init
init(channel);
// 将SeverBootStrap注册到线程池上,然后监听 OP_ACCEPT事件
ChannelFuture regFuture = config().group().register(channel);
return regFuture;
}

在讲Client的初始化过程中,解释了initAndRegister的作用,实例化SocektChannel,并将其与一个EventLoop进行绑定,并将其注册到EventLoopSelector上。这里,对于ServerSocketChannel会有所不同:init方法会调用ServerBootStrapinit方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void init(Channel channel) {
// ServerSocketChannel的pipeline
ChannelPipeline p = channel.pipeline();
//...
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
// 将业务配置的channel加到 pipeline中
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// ServerBootstrapAcceptor 在acceptor与reactor之间衔接
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}

其主要是在 ServerSocketChannel 的处理链上加上添加 ChannelHandler,包括业务方配置的 LoggingHandlerServerBootstrapAcceptorAcceptor,负责接收请求,然后把请求丢给Reactor线程)。

初始化完成后,boss线程的处理链:

服务端到此就完成了初始化过程。

您的支持是我创作源源不断的动力