Netty NioEventLoop

NioEventLoop

在初始化过程中,ClientServer用的 EventLoopGroupNioEventLoopGroup,而其内部是NioEventLoop数组。
NioEventLoop并不是一个纯粹的I/O线程,还负责处理两类任务:

  • 系统Task: 当I/O线程和用户线程同时操作网路资源时,为了防止并发操作导致的锁竞争,将用户线程的操作封装成 Task放入消息队列中,由I/O线程负责执行,这样就实现了局部无锁化;
  • 定时任务: 通过schedule方法可以执行定时任务。
    继续来看其run方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
protected void run() {
for (;;) {
// 唤醒 Selector
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
}
cancelledKeys = 0;
needsToSelectAgain = false;
// io任务比例,默认 50
final int ioRatio = this.ioRatio;
// 100时,则先执行io任务,再执行task
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
runAllTasks();
}
} else {
//记录IO时间
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
//反推任务能执行的时间长度
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
}
}

可以看到里面有个ioRatio,表示IO任务的比例。
当为100%时,先进行IO操作再执行任务;否则先进行IO操作,然后根据 IO操作的时间 来计算任务可以执行的时间长度。默认为50%
再细看processSelectedKeys

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
selectedKeys.keys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
}
//...
}
}
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
try {
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}

它遍历每个SelectionKey,查看有哪些已经准备好的操作,根据准备好的事件进行相应的操作。

runAllTasks方法从 taskQueue 中获取所有的任务进行执行。对于taskQueue维护着的业务方提交的任务。在执行其execute方法的时候即把任务放进taskQueue中。

您的支持是我创作源源不断的动力