Netty 服务接收请求&响应

服务端创建连接并响应请求

客户端与服务端通讯必须先创建连接。
在前文介绍服务端初始化的时候说道初始化的时候会向boss线程注册一个OP_ACCEPT事件。客户端请求链接时即会触发此事件。当boss线程池中的NioEventLoop在循环的时候检测到此事件时,会触发NioMessageUnsafe.read方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public void read() {
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
do {
// NioServerSocketChannel.doReadMessage
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
// 触发 ChannelHandler的 channelRead方法,经过LoggingHandler最后到ServerBootstrapAcceptor
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
}
protected int doReadMessages(List<Object> buf) throws Exception {
// 从ServerSocketChannel.accept一个SocketChannel
SocketChannel ch = SocketUtils.accept(javaChannel());
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
return 0;
}

Client端的不同之处即在于doReadMessages方法。此处调用的是NioServerSocketChannel.doReadMessage
其调用的是原生ServerSocketChannel.accept方法来获取一个SocektChannel然后封装在NioSocetChannel中,并将其传递到ServerBootstrapAcceptor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
// 将初始化时设置的childHandler 加入到pipeline 中
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
// 往reactor线程池中注入,即跟某个`NioEventLoop`绑定。
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}

首先,将初始化时配置的childHandler加到Channelpipeline中,然后将Channel注册到某个 Reactor线程 上,然后开始监听OP_READ读事件。
此后与客户端的通讯工作即会在Reactor线程中进行,其过程与Client中一致,因此这里不再复述。

您的支持是我创作源源不断的动力