Netty 客户端请求&响应

客户端请求发送与获取响应

Netty里,发送请求的过程可以分为两步:1. 将请求写到缓存队列中,2.将缓存队列中的请求进行 flush —– 调用原生的SocketChannel进行发送。

在客户端初始化时,讲到客户端的Channel会在Selector上注册OP_READ操作,当有服务端响应结果时,则触发OP_READ事件。

请求发送

首先来看发送请求的过程:SocketChannel 有两个方法用于发送请求,writewriteAndFlush,其中write用于将请求ByteBuf放入发送队列中,writeAndFlush则将请求发送到队列中,并把发送队列中的数据发送出去。这里以writeAndFlush来讲解整个过程
调用writeAndFlush方法最终会调用AbstractChannelHandlerContext.write方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
safeExecute(executor, task, promise, m);
}
}

其判断发起请求操作 channel 的对应的线程上下文中,不是则创建任务并将其扔进EventLoop的任务队列中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
invokeWrite0(msg, promise);
invokeFlush0();
} else {
writeAndFlush(msg, promise);
}
}
private void invokeWrite0(Object msg, ChannelPromise promise) {
((ChannelOutboundHandler) handler()).write(this, msg, promise);
}
private void invokeFlush0() {
((ChannelOutboundHandler) handler()).flush(this);
}

这里就可以看到invokeWrite0invokeFlush0两个过程:

  1. invokeWrite0调用链如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
HeadContext.write
-> AbstractChannel.unsafe.write
public final void write(Object msg, ChannelPromise promise) {
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
int size;
msg = filterOutboundMessage(msg);
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}

outboundBuffer.addMessage(msg, size, promise);
}
public void addMessage(Object msg, int size, ChannelPromise promise) {
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
if (tailEntry == null) {
flushedEntry = null;
tailEntry = entry;
} else {
Entry tail = tailEntry;
tail.next = entry;
tailEntry = entry;
}
if (unflushedEntry == null) {
unflushedEntry = entry;
}
}

最终可以看到,write的过程是将msg包装进Entry然后加到outboundBuffer中,并没有做真正的发送。

在看flush方法:

1
2
3
4
5
6
7
8
9
10
public final void flush() {
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
}
// 将之前加进去的数据设置为flushed,可以开始发送
outboundBuffer.addFlush();
// 将可以发送的消息进行发送
flush0();
}

flush方法先讲outboundBuffer之前add的消息标记为已发送,然后调用flush0进行真正的发送。

获取响应

Server 端接收到请求并处理后返回结果给Client端。Client端的NioEventLoop.Selector会告诉客户端数据已准备好可读触发unsafe.read方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public final void read() {
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
// ByteBuf 内存分配器(内存分配策略是Netty里值得深入理解的一块,后文中我们将单独领出来说)
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
do {
byteBuf = allocHandle.allocate(allocator);
// 从SocketChannel中读取数据到byteBuf中
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
// 触发数据读取事件,调用channelHanlder的channelRead方法
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
allocHandle.readComplete();
// 调用channelHandler的channelReadComplete方法
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
}

可以看到大体流程是,先调用allocHandle分配一个ByteBuf,然后从ScoketChannel中读取数据,触发pipeline.fireChannelRead(执行channelHandlerchannelRead方法)方法,读取完成后再执行pipeline.fireChannelReadComplete方法(执行channelHandlerchannelRead方法)。

您的支持是我创作源源不断的动力