源码阅读-Netty EventLoop(1)

Posted by keys961 on August 8, 2019

0. Overview

之前,讲了Channel以及ChannelPipeline,而在初始化一个Channel,还需要一个非常重要的组件,才能将应用启动:EventGroup

不过首先,需要说明Netty的线程模型,然后根据NioEventLoopGroup切入进入分析源码。

1. Reactor线程模型

Reactor线程模型主要分为下面3种:

  • 单线程模型:接收器(acceptor)和处理器(handler)在同一线程运行
  • 多线程模型:接收器单独一个线程,接收请求;处理器在一个线程池内运行,处理移交的请求
  • 主从多线程模型:接收器在一个线程池内运行,其它和多线程模型基本一样

EchoServer中,可以看到下面的代码:

1
2
3
4
5
6
7
// ...
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
    ServerBootstrap b = new ServerBootstrap();
    b.group(bossGroup, workerGroup)
    //....

这就是典型的多线程模型,bossGroup执行接收器,而workerGroup执行处理器(线程数是CPU核心数的2倍)。

为何bossGroup线程数为1?

  • 某个端口的ServerSocketChannel只会绑定到bossGroup的一个线程,执行select操作时也在一个线程中,因此多个线程会造成浪费
  • 若监听多个端口,那么bossGroup多线程还是需要的

而在EchoClient中,可以看到下面的代码:

1
2
3
4
5
EventLoopGroup group = new NioEventLoopGroup();
try {
    Bootstrap b = new Bootstrap();
    b.group(group)
    // ....

客户端不需要监听接收请求,只需处理响应,因此配置处理器的线程池即可。

2. NioEventLoopGroup实例化

2.1. 参数说明

该类的初始化最后会调入下面的方法:

1
2
3
4
5
6
7
8
public NioEventLoopGroup(int nThreads, 
                         Executor executor, 
                         EventExecutorChooserFactory chooserFactory,
                         final SelectorProvider selectorProvider,
                         final SelectStrategyFactory selectStrategyFactory,
                         final RejectedExecutionHandler rejectedExecutionHandler) {
    super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory, rejectedExecutionHandler);
}

这里有几个参数:

  • nThreads:线程池的线程数
  • executor:这个线程池不是NioEventLoopGroup的线程池,而是给NioEventLoop用的,后面会提及
  • chooserFactory:从线程池中选择线程的一个策略工厂
  • selectorProviderChannelselector提供者,通常是JDK自带的,可通过provider.openSelector()返回内置的selector,以select感兴趣的事件
  • selectStrategyFactoryselect时的策略选择实现,在NioEventLoop会用,后面会提及
  • rejectedExecutionHandler:拒绝策略,默认抛出异常,不过它是给NioEventLoop用的,而不是NioEventLoopGroup

2.2. MultithreadEventExecutorGroup中的实例化

初始化后,会走到父类,直到MultithreadEventExecutorGroup类的构造方法,说明如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
protected MultithreadEventExecutorGroup(int nThreads, 
                                        Executor executor,
                                        EventExecutorChooserFactory chooserFactory,
                                        Object... args) {
    // ...
    if (executor == null) {
        // 默认是创建这个线程池: ThreadPerTaskExecutor
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }
	// IMPORTANT: 这里维护了线程数组,即一个线程池,这里通常每个元素是NioEventLoop
    children = new EventExecutor[nThreads];
    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            // IMPORTANT: 创建对应的执行线程,即NioEventLoop
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            if (!success) {
               // Terminate & shutdown...
            }
        }
    }
	// 创建选择器,默认实现是round-robin(这里自己见代码)
    chooser = chooserFactory.newChooser(children);
    // 配置监听器
    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            // 当所有的线程都中止后,才设置
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    };
    for (EventExecutor e: children) {
        e.terminationFuture().addListener(terminationListener);
    }
    // Set read-only child group
    // ...
}

这里的“线程”指childrenEventExecutor个体,它可能包含多个线程,只不过在NioEventLoop中,它只有1个线程,所以混淆也是可以的。

关键方法在newChild()中,这在NioEventLoopGroup实现。

2.3. 创建NioEventLoop

newChild()实现在NioEventLoopGroup中,实际上就是创建NioEventLoop,填充到children数组。

NioEventLoop实际上就是单线程的线程池,可以从下面的继承关系看出:

1
NioEventLoop => SingleThreadEventLoop => SingleThreadEventExecutor => AbstractScheduledEventExecutor => AbstractEventExecutor => ...

我们从父类开始看。

首先在AbstractEventExecutor,这里只是初始化了parent,即对应的NioEventLoopGroup

然后在AbstractScheduledEventExecutor,这里没有做任何事(里面的字段只是对任务优先级进行排序而已)。

接着在SingleThreadEventExecutor,详细见下面代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
protected SingleThreadEventExecutor(EventExecutorGroup parent, 
                                    Executor executor, 
                                    boolean addTaskWakesUp, 
                                    int maxPendingTasks,
                                    RejectedExecutionHandler rejectedHandler) {
    super(parent);
    // 这里是false
    this.addTaskWakesUp = addTaskWakesUp;
    // 设置最大的等待任务,默认是16
    this.maxPendingTasks = Math.max(16, maxPendingTasks);
    // 设置给NioEventLoop的executor
    // 默认时,参数为ThreadPerTaskExecutor的实例
    this.executor = ThreadExecutorMap.apply(executor, this);
    // 这个 queue 的默认容量是maxPendingTasks,默认是16
    taskQueue = newTaskQueue(this.maxPendingTasks);
    // 设置reject handler
    rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}

接着在SingleThreadEventLoop,代码实际上没做什么东西,只是初始化了一个不太重要的字段:

1
2
3
4
5
6
7
8
9
protected SingleThreadEventLoop(EventLoopGroup parent, 
                                ThreadFactory threadFactory, 
                                boolean addTaskWakesUp, 
                                int maxPendingTasks, 
                                RejectedExecutionHandler rejectedExecutionHandler) {
    super(parent, threadFactory, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
    // 这个字段,所涉及的API是被标为Unstable的,这里就忽略
    tailTasks = newTaskQueue(maxPendingTasks);
}

最后到NioEventLoop

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
NioEventLoop(NioEventLoopGroup parent,
             Executor executor,
             SelectorProvider selectorProvider,
             SelectStrategy strategy,
             RejectedExecutionHandler rejectedExecutionHandler) {
    super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
    if (selectorProvider == null) {
        throw new NullPointerException("selectorProvider");
    }
    if (strategy == null) {
        throw new NullPointerException("selectStrategy");
    }
    // 设置用于创建Selector的provider
    provider = selectorProvider;
    // 开启NIO的Selector,非常重要(不同平台不一样,如Linux会使用epoll)
    final SelectorTuple selectorTuple = openSelector();
    selector = selectorTuple.selector;
    unwrappedSelector = selectorTuple.unwrappedSelector;
    // 设置select策略
    selectStrategy = strategy;
}

至此,NioEventLoop初始化完成。

可以看出,NioEventLoop实际上除了设置了selector、任务队列等信息之外,最重要的就是包装了一个ThreadPerTaskExecutor线程池(默认下):

1
2
// In MultithreadEventExecutorGroup
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());

该类本身,只是简单的通过threadFactory(这里是DefaultThreadFactory)创建线程。而创建的线程是FastThreadLocalThread,它也只是对Thread进行包装,可以取出内部的InternalThreadLocalMap而已。

2.4. 线程的启动:为何NioEventLoop只有1个线程?

答案在父类SingleThreadEventExecutorexecute()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void execute(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }

    boolean inEventLoop = inEventLoop();
    // 添加任务到队列,之后NioEventLoop会从队列中执行里面的任务
    addTask(task);
    if (!inEventLoop) {
        // 若上下文在线程池所在线程的外面,创建线程
        // 这里使用CAS创建线程(设置state为ST_STARTED),所以只有1个线程被创建
        // 若线程已经创建,则不会创建新的线程
        // 具体的在doStartThread()启动,执行的逻辑是NioEventLoop#run
        startThread();
        if (isShutdown()) {
            //... Handler rejection
        }
    }

    if (!addTaskWakesUp && wakesUpForTask(task)) {
        wakeup(inEventLoop);
    }
}

这里关键在于startThread(),它会创建一个线程(通过CAS创建,所以只会创建一个),然后就会执行NioEventLooprun()方法,而这个方法是在无限循环中进行select并处理事件的,后面会讲。

而具体的线程创建在doStartThread()方法中实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private void doStartThread() {
    assert thread == null;
    // 启动线程,这里executor就是"经过包装“的ThreadPerTaskExecutor
    executor.execute(new Runnable() {
        @Override
        public void run() {
            // ...
            try {
                // 这里就是NioEventLoop的执行,是一个无限循环流程
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally {
            	// Handle cleanup after termination/failure....
            }
        }
    });
}

3. EventLoopChannel的关联:重回register事件

3.1. 加入register任务

这里依旧以客户端的connect为例(服务端的bind同理),它都会走到之前提及的initAndRegister()方法,注意第3步:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
final ChannelFuture initAndRegister() {
	Channel channel = null;
    try {
        // 1. channelFactory通过反射创建Channel
        channel = channelFactory.newChannel();
        // 2. 涉及channelPipeline的配置初始化
        init(channel);
    } catch(Throwable t) {
        // ...
    }
    // ...
    // ***3. 将channel注册到EventGroup中***
    ChannelFuture regFuture = config().group().register(channel);
    // ...
    return regFuture;
}

第3步会进入NioEventLoopGroup的父类MultithreadEventLoopGroupregister()

1
2
3
4
public ChannelFuture register(Channel channel) {
    // next()利用轮转法挑选一个NioEventLoop/线程,任何register
    return next().register(channel);
}

然后进入NioEventLoop的父类SingleThreadEventLoopregister()

1
2
3
4
5
6
7
8
9
10
11
public ChannelFuture register(Channel channel) {
    return register(new DefaultChannelPromise(channel, this));
}

// 实际会进入这里
public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    // 这里调用Channel的unsafe,进行register
    promise.channel().unsafe().register(this, promise);
    return promise;
}

之后进入AbstractUnsaferegister()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
   	// ...
    AbstractChannel.this.eventLoop = eventLoop;
    
    // 执行register
    if (eventLoop.inEventLoop()) {
        // 若当前线程是自己,直接执行,通常不会进入这里
        register0(promise);
    } else {
        try {
            // 调用NioEventLoop,将register0的任务推入队列,必要时创建线程(一般进入这里)
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            // ...
        }
    }
}

3.2. 执行register任务

最后会执行核心的register0()NioEventLoop将会轮询并从队列中获取并执行这个任务,之后会说明。

它将ChannelSelector相关联,并传播register事件(这时候ChannelInitializer会展开handler)。

这里看下register0()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private void register0(ChannelPromise promise) {
    try {
        // ...
        boolean firstRegistration = neverRegistered;
        // 1. JDK底层:将Channel注册到Selector上
        doRegister();
        neverRegistered = false;
        registered = true;
		// 2. 保证handlerAdded(...)事件在Promise通知前被调用
        // 这里最重要的是ChannelInitializer:通过这步调用,它内部的handler才会被展开
        // 即:在这步,内部的handler被展开成链
        // 而其它的handlerAdded(...)在ChannelInitializer展开成链的时候进行调用
        pipeline.invokeHandlerAddedIfNeeded();
		// 3. 设置对应的Promise为success,以便于通知
        safeSetSuccess(promise);
        // 4. 传播register事件(见https://keys961.github.io/2019/07/12/源码阅读-Netty(3)/)
        pipeline.fireChannelRegistered();
        // 5. 若Channel已经打开(通常是不会active的)
        // 不过当服务端accept连接,创建新的SocketChannel,注册时会进入这里,以便于Selector监听OP_READ
        if (isActive()) {
            if (firstRegistration) {
                // 5.a. 若第一次执行register,还需要传播ChannelActive事件,通常进入这里
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                // 5.b. 若被注册,且autoRead被设置,需要设置Channel的interestOp,让其监听OP_READ事件,通常不会进入这里
                beginRead();
            }
        }
    } catch (Throwable t) {
        // Close fd
    }
}

可见,register0()做了下面几件事情:

  • ChannelSelector绑定

  • 预先触发handlerAdded()(主要是ChannelInitializer,用于展开内部的handler

    一般而言p.addLast(handler)会触发handlerAdded(...),但是只会在channel被注册后才会被触发(即registered == true)。

    对于ChannelInitializer的添加,此时registered == false,所以不会被触发handlerAdded(...),只会在channel被注册后手动显式调用才能触发(从而展开内部handler为链)

  • 传播register事件

  • channel本身已激活,需要传播channelActive事件或者开始准备读取(前者一般也会调用到后者)

后2步的内容可参考这里的3.3.和4.小节。

而对于JDK的ChannelSelector的绑定,见下面的代码:

1
2
// Channel#register(selector, ops, attribute)
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);

可见监听操作是0,即什么都不监听。可以推测,后面的操作会修改监听的事件。

register后,才会触发后面的connect/bind

4. 总结

本文主要讲述了EventLoopGroupEventLoop的创建,说明了Netty是如何管理线程池的(主要是NioXX的线程池)。

此外,通过回顾register事件,将Channel初始化和EventLoop联系到了一起。

之后会详细说明NioEventLoop#run,说明它的整个轮询工作流程。