源码阅读-Netty EventLoop(2)

Posted by keys961 on August 10, 2019

0. Overview

之前说明了EventLoop的创建、管理,以及和Channel的关联。

本文说明NioEventLoop整个轮询的流程。

1. 前置:JDK的Selector

1.1. 创建

通常调用Selector.open()即可。

1.2. 注册ChannelSelector

之前说明EventLoopChannel的关系时,在register0()中的doRegister()进行注册。注册意思是:通过Selector监听Channel某些感兴趣的事件

一个Selector监听多个Channel,轮询Channel以获取感兴趣的事件

注册时,channel必须非阻塞(因此FileChannel不适用于Selector)。

调用的函数通常是channel.register(selector, ops, att),第二个参数是监听的操作,可通过|连接以监听多个操作:

  • OP_CONNECT
  • OP_ACCEPT
  • OP_READ
  • OP_WRITE

1.3. SelectionKey

Selector可通过selectedKeys()获取SelectionKey集合,每个元素表示了一个特定的通道对象和一个特定的选择器对象之间的注册关系。

它可返回事件的类型bit mask(readyOps()方法),表明其对应的Channel现在已就绪的操作,比如读、写、连接、接收等等。

1.4. select操作

有3种,返回的都是就绪的事件个数:

  • select():阻塞到至少有一个Channel在注册的事件上就绪
  • select(timeout):带超时的select,最多阻塞timeout毫秒
  • selectNow():非阻塞的select,立即返回

select不为0时,就可遍历selectedKeys()获取集合,遍历它即可(最后需要移除集合

此外,可调用wakeup(),让上一个阻塞的操作立即返回(若上一个不是阻塞,下一个阻塞操作会立即返回)。

1.5. 模板

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
ServerSocketChannel channel = ServerSocketChannel.open();
channel.socket().bind(new InetSocketAddress(HOST, PORT));
// 设置非阻塞
channel.configureBlocking(false);
// 注册Channel到Selector
Selector selector = Selector.open();
channel.register(selector, OPS);
// 轮询
for(;;) {
    int cnt = selector.select(TIMEOUT);
    if(cnt == 0) {
        continue;
    }
    Set<SelectionKey> keys = selector.selectedKeys();
    Iterator<SelectionKey> it = keys.iterator();
    while(it.hasNext()) {
        SelectionKey key = it.next();
        // Deal with key on READ/WRITE/CONNECT/ACCEPT
        // ...
        it.remove();
    }
}

2. NioEventLoop工作流程

它的工作流程就在NioEventLoop#run中。大体分析如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
protected void run() {
    for (;;) {
        try {
            try {
                // 1. I/O事件轮询
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;
                case SelectStrategy.BUSY_WAIT:
                case SelectStrategy.SELECT:
                    select(wakenUp.getAndSet(false));     
                    if (wakenUp.get()) {
                        selector.wakeup();
                    }                        
                default:
                }
            } catch (IOException e) {
            	// Handle exception and rebuilt selectors
                // ...
            }
			// 2. 执行I/O与非I/O任务
            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                try {
                    processSelectedKeys();
                } finally {                       
                    runAllTasks();
                }
            } else {
                final long ioStartTime = System.nanoTime();
                try {
                    processSelectedKeys();
                } finally {                       
                    final long ioTime = System.nanoTime() - ioStartTime;
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            }
        } catch (Throwable t) {
            // Handle exceptions
        }
    // Handle loop shutting down...         
    }
}

可见上面步骤分为2步:

  • I/O事件轮询
  • I/O事件和非I/O事件的处理

上面有一些方法需要说明,下面会按照顺序进行说明。

2.1. I/O事件轮询

NioEventLoop本身就是一个无限循环的轮询+执行流程。

轮询/询问I/O事件主要是在这个switch-case代码块。

1
2
3
4
5
6
7
8
9
10
11
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
    case SelectStrategy.CONTINUE:
        continue;
    case SelectStrategy.BUSY_WAIT:
    case SelectStrategy.SELECT:
        select(wakenUp.getAndSet(false));     
        if (wakenUp.get()) {
            selector.wakeup();
        }                        
    default:
}

这里有2个主要内容需要说明:

  • 计算SelectStrategy
  • select操作

2.1.1. 计算SelectStrategy

SelectStrategy是一个整数,表示事件就绪的channel个数(因为它是selectNow()得到的),包含:

  • CONTINUE(-2)
  • SELECT(-1)
  • BUSY_WAIT(-3)
  • 其它

可以看到计算SelectStrategy包含2个参数:

  • selectSupplier:实际上就是调用一次selector.selectNow()(并当wakeUp字段为true时,唤醒selector
  • hasTask():队列中的是否有任务

计算的方式是以下的逻辑:

  • hasTask()返回true,则调用selectNow(),返回就绪事件个数
  • 否则返回SELECT,此时会阻塞进行select

2.1.2.select操作

代码中包含下面2个东西:

  • wakeUpAtomicBoolean字段,表明selector是否需要唤醒

  • select(oldWakeUp):执行select操作

这里主要看select(oldWakeUp)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
private void select(boolean oldWakenUp) throws IOException {
    Selector selector = this.selector;
    try {
        int selectCnt = 0;
        long currentTimeNanos = System.nanoTime();
        // 默认1000ms的大select周期
        long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
        for (;;) {
            // 大周期被拆成500ms,即每轮500ms(select 500ms)
            long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
            // 若没时间且没select过,直接selectNow返回
            if (timeoutMillis <= 0) {
                if (selectCnt == 0) {
                    selector.selectNow();
                    selectCnt = 1;
                }
                break;
            }
            // 若队列有任务,也直接selectNow返回,保证队列的任务尽量快的执行
            if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                selector.selectNow();
                selectCnt = 1;
                break;
            }
			// 若没任务,则可以等待I/O,执行阻塞的select
            int selectedKeys = selector.select(timeoutMillis);
            selectCnt++;            
            if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                // 有事件监听到/外部wakeup/队列有任务/周期任务需要执行,退出select
                // 以尽快执行需要执行的任务(I/O和非I/O任务)
                break;
            }            
            if (Thread.interrupted()) {
                // ... Log interrupt
                selectCnt = 1;
                break;
            }
            long time = System.nanoTime();
            if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                selectCnt = 1;
            } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                    selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                // ***这里很重要***
                selector = selectRebuildSelector(selectCnt);
                selectCnt = 1;
                break;
            }
            currentTimeNanos = time;
        }
        // ...
    } catch (CancelledKeyException e) {
        // Handle cancel exception
    }
}

这里就是进行select操作,使用带超时的select,且默认下超时时间是0.5s。

另外注意标*处的代码,它解决了epoll空轮询过多的bug,可以见这里,Netty的解决方法是:

  • 利用计数selectCnt记录当前循环select的次数,并记录一次select的时间
  • 若时间大于timeoutMillis,说明肯定不是空轮询,只需重置标记selectCnt即可
  • 否则出现空轮询(因为另外的情况在第28行处处理了,循环会退出),selectCnt就有用处了,只需下面的逻辑:
    • 若次数selectCnt大于一个阈值,就重建一个新的selector,注册到原channel上,并销毁旧的selector(具体在rebuildSelector0()中)

2.2. 事件的处理

2.2.1. I/O事件的处理

调用的方法是:processSelectedKeys()

1
2
3
4
5
6
7
8
9
10
11
12
private void processSelectedKeys() {
    if (selectedKeys != null) {
        // 通常会进入这里
        // 里面selectedKeys类成员字段,
        // 会在NioEventLoop初始化时的openSelector(),
        // 通过Unsafe的进行赋值(从Selector实现类实例里拿)
        // 因此Selector监听到事件后,这个成员字段的key会自动填充
        processSelectedKeysOptimized();
    } else {
        processSelectedKeysPlain(selector.selectedKeys());
    }
}

上面2个方法实际上没什么区别,最后都会进入这个方法:processSelectedKey(SelectionKey k, AbstractNioChannel ch)

第二个参数是由selectionKey.attachment()获得,而这个attachment在注册的时候加入的,具体的位置在AbstractNioChannel#doRegister()中:

1
2
// 第三个参数就是attachment。this就是Channel自己。
selectionKey = javaChannel().register(eventLoop().selector, 0, this);

I/O事件的处理主要分为3类:

  • OP_READ/OP_ACCEPT:直接就从unsafe中读取数据

    1
    2
    3
    4
    
    // 0也处理是因为JDK可能的bug导致一直在循环
    if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
         unsafe.read();
    }
    

    unsafe.read()实现主要步骤是:

    • 申请堆外内存空间
    • Channel中读取,填充内存
    • 传播ChannelRead事件,调用pipeline.fireChannelRead(buf)
  • OP_WRITE:直接从unsafe中强制刷新输出流

    1
    2
    3
    
    if ((readyOps & SelectionKey.OP_WRITE) != 0) {
        ch.unsafe().forceFlush();
    }
    
  • OP_CONNECT:代码如下

    1
    2
    3
    4
    5
    6
    
    if ((readyOps & SelectionKey.OP_CONNECT) != 0) {         
         int ops = k.interestOps();
         ops &= ~SelectionKey.OP_CONNECT;
         k.interestOps(ops);
         unsafe.finishConnect();
    }
    

    主要工作是:

    • 清除对OP_CONNECT的关注,取而代之关注OP_READ | OP_WRITE | OP_ACCEPT
    • 调用unsafe.finishConnect()通知连接已建立,它最后会调用pipeline.fireChannelActive()传播ChannelActive事件

2.2.2. 非I/O事件的处理

事件分为普通任务和周期任务:

  • 对于普通任务:调用execute(runnable)时,任务会被添加到taskQueue(LinkedBlockingQueue)中
  • 对于周期任务:调用schedule(runnable, delay, unit)时,任务会被添加到scheduledTaskQueue(PriorityQueue)中

这类事件会被放置到任务队列里,即taskQueue

而这类任务的执行,是通过runAllTasks()/runAllTasks(time)执行。

前者是执行所有的任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
protected boolean runAllTasks() {
    assert inEventLoop();
    boolean fetchedAll;
    boolean ranAtLeastOne = false;
    do {
        // 1. 取出所有的可以执行的周期任务,添加到taskQueue队列中
        // fetchedAll返回true时,添加完毕,返回false时,添加时队列已满,需要重试
        fetchedAll = fetchFromScheduledTaskQueue();
        // 2. 尝试运行队列中所有的任务
        if (runAllTasksFrom(taskQueue)) {
            // 若队列有任务就会进入这里
            ranAtLeastOne = true;
        }
    } while (!fetchedAll); // 要重试直到所有的周期任务都要被执行
    if (ranAtLeastOne) {
        // 记录时间
        lastExecutionTime = ScheduledFutureTask.nanoTime();
    }
    // tail task, unstable api, 忽略
    afterRunningAllTasks();
    return ranAtLeastOne;
}

后者限定了时间,限定的时间计算方式是:ioTime * (100 - ioRatio) / ioRatio

x / ioTime = (100 - ioRatio) / ioRatio $\Rightarrow$ x = ioTime * (100 - ioRatio) / ioRatio

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
protected boolean runAllTasks(long timeoutNanos) {
    // 1. 尝试从周期任务队列中将任务添加到taskQueue
    fetchFromScheduledTaskQueue();
    // 2. 从队列中获取任务
    Runnable task = pollTask();
    if (task == null) {
        afterRunningAllTasks();
        return false;
    }
    // 计算deadline
    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
    long runTasks = 0;
    long lastExecutionTime;
    for (;;) {
        // 3. 执行任务
        safeExecute(task);
        runTasks++;        
        if ((runTasks & 0x3F) == 0) {
            // 4. 检查超时
            // 只会每隔64个任务时才会检查timeout并更新
            // 因为nanoTime()比较耗时
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            if (lastExecutionTime >= deadline) {
                // 超时退出
                break;
            }
        }
		// 5. 获取下一个任务 
        task = pollTask();
        if (task == null) {
            // 6. 没有任务,记录时间,退出
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            break;
        }
    }
    afterRunningAllTasks();
    this.lastExecutionTime = lastExecutionTime;
    return true;
}

3. 总结

NioEventLoop主要执行下面的任务:

  • 循环select轮询(或者Linux上的epoll)以监听I/O事件(OP_READ, OP_WRITE, OP_CONNECT, OP_ACCEPT
  • 处理I/O事件,必要时传播事件通知到pipeline
  • 处理队列中的其它任务(包括普通和周期任务)

由于需要处理I/O和非I/O任务,非I/O任务一定不能耗时过长