0. Overview
之前说明了EventLoop
的创建、管理,以及和Channel
的关联。
本文说明NioEventLoop
整个轮询的流程。
1. 前置:JDK的Selector
1.1. 创建
通常调用Selector.open()
即可。
1.2. 注册Channel
到Selector
之前说明EventLoop
和Channel
的关系时,在register0()
中的doRegister()
进行注册。注册意思是:通过Selector
监听Channel
某些感兴趣的事件。
一个
Selector
监听多个Channel
,轮询Channel
以获取感兴趣的事件
注册时,channel
必须非阻塞(因此FileChannel
不适用于Selector
)。
调用的函数通常是channel.register(selector, ops, att)
,第二个参数是监听的操作,可通过|
连接以监听多个操作:
OP_CONNECT
OP_ACCEPT
OP_READ
OP_WRITE
1.3. SelectionKey
Selector
可通过selectedKeys()
获取SelectionKey
集合,每个元素表示了一个特定的通道对象和一个特定的选择器对象之间的注册关系。
它可返回事件的类型bit mask(readyOps()
方法),表明其对应的Channel
现在已就绪的操作,比如读、写、连接、接收等等。
1.4. select
操作
有3种,返回的都是就绪的事件个数:
select()
:阻塞到至少有一个Channel
在注册的事件上就绪select(timeout)
:带超时的select
,最多阻塞timeout
毫秒selectNow()
:非阻塞的select
,立即返回
select
不为0时,就可遍历selectedKeys()
获取集合,遍历它即可(最后需要移除集合)
此外,可调用wakeup()
,让上一个阻塞的操作立即返回(若上一个不是阻塞,下一个阻塞操作会立即返回)。
1.5. 模板
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
ServerSocketChannel channel = ServerSocketChannel.open();
channel.socket().bind(new InetSocketAddress(HOST, PORT));
// 设置非阻塞
channel.configureBlocking(false);
// 注册Channel到Selector
Selector selector = Selector.open();
channel.register(selector, OPS);
// 轮询
for(;;) {
int cnt = selector.select(TIMEOUT);
if(cnt == 0) {
continue;
}
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
while(it.hasNext()) {
SelectionKey key = it.next();
// Deal with key on READ/WRITE/CONNECT/ACCEPT
// ...
it.remove();
}
}
2. NioEventLoop
工作流程
它的工作流程就在NioEventLoop#run
中。大体分析如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
protected void run() {
for (;;) {
try {
try {
// 1. I/O事件轮询
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
}
} catch (IOException e) {
// Handle exception and rebuilt selectors
// ...
}
// 2. 执行I/O与非I/O任务
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
// Handle exceptions
}
// Handle loop shutting down...
}
}
可见上面步骤分为2步:
- I/O事件轮询
- I/O事件和非I/O事件的处理
上面有一些方法需要说明,下面会按照顺序进行说明。
2.1. I/O事件轮询
NioEventLoop
本身就是一个无限循环的轮询+执行流程。
轮询/询问I/O事件主要是在这个switch-case
代码块。
1
2
3
4
5
6
7
8
9
10
11
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
}
这里有2个主要内容需要说明:
- 计算
SelectStrategy
select
操作
2.1.1. 计算SelectStrategy
SelectStrategy
是一个整数,表示事件就绪的channel
个数(因为它是selectNow()
得到的),包含:
CONTINUE(-2)
SELECT(-1)
BUSY_WAIT(-3)
- 其它
可以看到计算SelectStrategy
包含2个参数:
selectSupplier
:实际上就是调用一次selector.selectNow()
(并当wakeUp
字段为true
时,唤醒selector
)hasTask()
:队列中的是否有任务
计算的方式是以下的逻辑:
- 若
hasTask()
返回true
,则调用selectNow()
,返回就绪事件个数 - 否则返回
SELECT
,此时会阻塞进行select
2.1.2.select
操作
代码中包含下面2个东西:
-
wakeUp
:AtomicBoolean
字段,表明selector
是否需要唤醒 -
select(oldWakeUp)
:执行select
操作
这里主要看select(oldWakeUp)
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
// 默认1000ms的大select周期
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
// 大周期被拆成500ms,即每轮500ms(select 500ms)
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
// 若没时间且没select过,直接selectNow返回
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
// 若队列有任务,也直接selectNow返回,保证队列的任务尽量快的执行
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
// 若没任务,则可以等待I/O,执行阻塞的select
int selectedKeys = selector.select(timeoutMillis);
selectCnt++;
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
// 有事件监听到/外部wakeup/队列有任务/周期任务需要执行,退出select
// 以尽快执行需要执行的任务(I/O和非I/O任务)
break;
}
if (Thread.interrupted()) {
// ... Log interrupt
selectCnt = 1;
break;
}
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// ***这里很重要***
selector = selectRebuildSelector(selectCnt);
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
// ...
} catch (CancelledKeyException e) {
// Handle cancel exception
}
}
这里就是进行select
操作,使用带超时的select
,且默认下超时时间是0.5s。
另外注意标*处的代码,它解决了epoll空轮询过多的bug,可以见这里,Netty的解决方法是:
- 利用计数
selectCnt
记录当前循环select
的次数,并记录一次select
的时间 - 若时间大于
timeoutMillis
,说明肯定不是空轮询,只需重置标记selectCnt
即可 - 否则出现空轮询(因为另外的情况在第28行处处理了,循环会退出),
selectCnt
就有用处了,只需下面的逻辑:- 若次数
selectCnt
大于一个阈值,就重建一个新的selector
,注册到原channel
上,并销毁旧的selector
(具体在rebuildSelector0()
中)
- 若次数
2.2. 事件的处理
2.2.1. I/O事件的处理
调用的方法是:processSelectedKeys()
。
1
2
3
4
5
6
7
8
9
10
11
12
private void processSelectedKeys() {
if (selectedKeys != null) {
// 通常会进入这里
// 里面selectedKeys类成员字段,
// 会在NioEventLoop初始化时的openSelector(),
// 通过Unsafe的进行赋值(从Selector实现类实例里拿)
// 因此Selector监听到事件后,这个成员字段的key会自动填充
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
上面2个方法实际上没什么区别,最后都会进入这个方法:processSelectedKey(SelectionKey k, AbstractNioChannel ch)
。
第二个参数是由selectionKey.attachment()
获得,而这个attachment
在注册的时候加入的,具体的位置在AbstractNioChannel#doRegister()
中:
1
2
// 第三个参数就是attachment。this就是Channel自己。
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
I/O事件的处理主要分为3类:
-
OP_READ
/OP_ACCEPT
:直接就从unsafe
中读取数据1 2 3 4
// 0也处理是因为JDK可能的bug导致一直在循环 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); }
而
unsafe.read()
实现主要步骤是:- 申请堆外内存空间
- 从
Channel
中读取,填充内存 - 传播
ChannelRead
事件,调用pipeline.fireChannelRead(buf)
-
OP_WRITE
:直接从unsafe
中强制刷新输出流1 2 3
if ((readyOps & SelectionKey.OP_WRITE) != 0) { ch.unsafe().forceFlush(); }
-
OP_CONNECT
:代码如下1 2 3 4 5 6
if ((readyOps & SelectionKey.OP_CONNECT) != 0) { int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); }
主要工作是:
- 清除对
OP_CONNECT
的关注,取而代之关注OP_READ | OP_WRITE | OP_ACCEPT
- 调用
unsafe.finishConnect()
通知连接已建立,它最后会调用pipeline.fireChannelActive()
传播ChannelActive
事件
- 清除对
2.2.2. 非I/O事件的处理
事件分为普通任务和周期任务:
- 对于普通任务:调用
execute(runnable)
时,任务会被添加到taskQueue
(LinkedBlockingQueue
)中 - 对于周期任务:调用
schedule(runnable, delay, unit)
时,任务会被添加到scheduledTaskQueue
(PriorityQueue
)中
这类事件会被放置到任务队列里,即taskQueue
。
而这类任务的执行,是通过runAllTasks()
/runAllTasks(time)
执行。
前者是执行所有的任务:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
protected boolean runAllTasks() {
assert inEventLoop();
boolean fetchedAll;
boolean ranAtLeastOne = false;
do {
// 1. 取出所有的可以执行的周期任务,添加到taskQueue队列中
// fetchedAll返回true时,添加完毕,返回false时,添加时队列已满,需要重试
fetchedAll = fetchFromScheduledTaskQueue();
// 2. 尝试运行队列中所有的任务
if (runAllTasksFrom(taskQueue)) {
// 若队列有任务就会进入这里
ranAtLeastOne = true;
}
} while (!fetchedAll); // 要重试直到所有的周期任务都要被执行
if (ranAtLeastOne) {
// 记录时间
lastExecutionTime = ScheduledFutureTask.nanoTime();
}
// tail task, unstable api, 忽略
afterRunningAllTasks();
return ranAtLeastOne;
}
后者限定了时间,限定的时间计算方式是:ioTime * (100 - ioRatio) / ioRatio
:
x / ioTime = (100 - ioRatio) / ioRatio
$\Rightarrow$x = ioTime * (100 - ioRatio) / ioRatio
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
protected boolean runAllTasks(long timeoutNanos) {
// 1. 尝试从周期任务队列中将任务添加到taskQueue
fetchFromScheduledTaskQueue();
// 2. 从队列中获取任务
Runnable task = pollTask();
if (task == null) {
afterRunningAllTasks();
return false;
}
// 计算deadline
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
// 3. 执行任务
safeExecute(task);
runTasks++;
if ((runTasks & 0x3F) == 0) {
// 4. 检查超时
// 只会每隔64个任务时才会检查timeout并更新
// 因为nanoTime()比较耗时
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
// 超时退出
break;
}
}
// 5. 获取下一个任务
task = pollTask();
if (task == null) {
// 6. 没有任务,记录时间,退出
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
3. 总结
NioEventLoop
主要执行下面的任务:
- 循环
select
轮询(或者Linux上的epoll
)以监听I/O事件(OP_READ
,OP_WRITE
,OP_CONNECT
,OP_ACCEPT
) - 处理I/O事件,必要时传播事件通知到
pipeline
- 处理队列中的其它任务(包括普通和周期任务)
由于需要处理I/O和非I/O任务,非I/O任务一定不能耗时过长