0. Overview
之前,讲了Channel
以及ChannelPipeline
,而在初始化一个Channel
,还需要一个非常重要的组件,才能将应用启动:EventGroup
。
不过首先,需要说明Netty的线程模型,然后根据NioEventLoopGroup
切入进入分析源码。
1. Reactor线程模型
Reactor线程模型主要分为下面3种:
- 单线程模型:接收器(acceptor)和处理器(handler)在同一线程运行
- 多线程模型:接收器单独一个线程,接收请求;处理器在一个线程池内运行,处理移交的请求
- 主从多线程模型:接收器在一个线程池内运行,其它和多线程模型基本一样
在EchoServer
中,可以看到下面的代码:
1
2
3
4
5
6
7
// ...
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
//....
这就是典型的多线程模型,bossGroup
执行接收器,而workerGroup
执行处理器(线程数是CPU核心数的2倍)。
为何
bossGroup
线程数为1?
- 某个端口的
ServerSocketChannel
只会绑定到bossGroup
的一个线程,执行select
操作时也在一个线程中,因此多个线程会造成浪费- 若监听多个端口,那么
bossGroup
多线程还是需要的
而在EchoClient
中,可以看到下面的代码:
1
2
3
4
5
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
// ....
客户端不需要监听接收请求,只需处理响应,因此配置处理器的线程池即可。
2. NioEventLoopGroup
实例化
2.1. 参数说明
该类的初始化最后会调入下面的方法:
1
2
3
4
5
6
7
8
public NioEventLoopGroup(int nThreads,
Executor executor,
EventExecutorChooserFactory chooserFactory,
final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory,
final RejectedExecutionHandler rejectedExecutionHandler) {
super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory, rejectedExecutionHandler);
}
这里有几个参数:
nThreads
:线程池的线程数executor
:这个线程池不是NioEventLoopGroup
的线程池,而是给NioEventLoop
用的,后面会提及chooserFactory
:从线程池中选择线程的一个策略工厂selectorProvider
:Channel
的selector
提供者,通常是JDK自带的,可通过provider.openSelector()
返回内置的selector
,以select
感兴趣的事件selectStrategyFactory
:select
时的策略选择实现,在NioEventLoop
会用,后面会提及rejectedExecutionHandler
:拒绝策略,默认抛出异常,不过它是给NioEventLoop
用的,而不是NioEventLoopGroup
2.2. MultithreadEventExecutorGroup
中的实例化
初始化后,会走到父类,直到MultithreadEventExecutorGroup
类的构造方法,说明如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
protected MultithreadEventExecutorGroup(int nThreads,
Executor executor,
EventExecutorChooserFactory chooserFactory,
Object... args) {
// ...
if (executor == null) {
// 默认是创建这个线程池: ThreadPerTaskExecutor
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
// IMPORTANT: 这里维护了线程数组,即一个线程池,这里通常每个元素是NioEventLoop
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// IMPORTANT: 创建对应的执行线程,即NioEventLoop
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
// Terminate & shutdown...
}
}
}
// 创建选择器,默认实现是round-robin(这里自己见代码)
chooser = chooserFactory.newChooser(children);
// 配置监听器
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
// 当所有的线程都中止后,才设置
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
// Set read-only child group
// ...
}
这里的“线程”指
children
的EventExecutor
个体,它可能包含多个线程,只不过在NioEventLoop
中,它只有1个线程,所以混淆也是可以的。
关键方法在newChild()
中,这在NioEventLoopGroup
实现。
2.3. 创建NioEventLoop
newChild()
实现在NioEventLoopGroup
中,实际上就是创建NioEventLoop
,填充到children
数组。
而NioEventLoop
实际上就是单线程的线程池,可以从下面的继承关系看出:
1
NioEventLoop => SingleThreadEventLoop => SingleThreadEventExecutor => AbstractScheduledEventExecutor => AbstractEventExecutor => ...
我们从父类开始看。
首先在AbstractEventExecutor
,这里只是初始化了parent
,即对应的NioEventLoopGroup
。
然后在AbstractScheduledEventExecutor
,这里没有做任何事(里面的字段只是对任务优先级进行排序而已)。
接着在SingleThreadEventExecutor
,详细见下面代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
protected SingleThreadEventExecutor(EventExecutorGroup parent,
Executor executor,
boolean addTaskWakesUp,
int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);
// 这里是false
this.addTaskWakesUp = addTaskWakesUp;
// 设置最大的等待任务,默认是16
this.maxPendingTasks = Math.max(16, maxPendingTasks);
// 设置给NioEventLoop的executor
// 默认时,参数为ThreadPerTaskExecutor的实例
this.executor = ThreadExecutorMap.apply(executor, this);
// 这个 queue 的默认容量是maxPendingTasks,默认是16
taskQueue = newTaskQueue(this.maxPendingTasks);
// 设置reject handler
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
接着在SingleThreadEventLoop
,代码实际上没做什么东西,只是初始化了一个不太重要的字段:
1
2
3
4
5
6
7
8
9
protected SingleThreadEventLoop(EventLoopGroup parent,
ThreadFactory threadFactory,
boolean addTaskWakesUp,
int maxPendingTasks,
RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, threadFactory, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
// 这个字段,所涉及的API是被标为Unstable的,这里就忽略
tailTasks = newTaskQueue(maxPendingTasks);
}
最后到NioEventLoop
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
NioEventLoop(NioEventLoopGroup parent,
Executor executor,
SelectorProvider selectorProvider,
SelectStrategy strategy,
RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
// 设置用于创建Selector的provider
provider = selectorProvider;
// 开启NIO的Selector,非常重要(不同平台不一样,如Linux会使用epoll)
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
// 设置select策略
selectStrategy = strategy;
}
至此,NioEventLoop
初始化完成。
可以看出,NioEventLoop
实际上除了设置了selector
、任务队列等信息之外,最重要的就是包装了一个ThreadPerTaskExecutor
线程池(默认下):
1
2
// In MultithreadEventExecutorGroup
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
该类本身,只是简单的通过threadFactory
(这里是DefaultThreadFactory
)创建线程。而创建的线程是FastThreadLocalThread
,它也只是对Thread
进行包装,可以取出内部的InternalThreadLocalMap
而已。
2.4. 线程的启动:为何NioEventLoop
只有1个线程?
答案在父类SingleThreadEventExecutor
的execute()
方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
// 添加任务到队列,之后NioEventLoop会从队列中执行里面的任务
addTask(task);
if (!inEventLoop) {
// 若上下文在线程池所在线程的外面,创建线程
// 这里使用CAS创建线程(设置state为ST_STARTED),所以只有1个线程被创建
// 若线程已经创建,则不会创建新的线程
// 具体的在doStartThread()启动,执行的逻辑是NioEventLoop#run
startThread();
if (isShutdown()) {
//... Handler rejection
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
这里关键在于startThread()
,它会创建一个线程(通过CAS创建,所以只会创建一个),然后就会执行NioEventLoop
的run()
方法,而这个方法是在无限循环中进行select
并处理事件的,后面会讲。
而具体的线程创建在doStartThread()
方法中实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private void doStartThread() {
assert thread == null;
// 启动线程,这里executor就是"经过包装“的ThreadPerTaskExecutor
executor.execute(new Runnable() {
@Override
public void run() {
// ...
try {
// 这里就是NioEventLoop的执行,是一个无限循环流程
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
// Handle cleanup after termination/failure....
}
}
});
}
3. EventLoop
与Channel
的关联:重回register
事件
3.1. 加入register
任务
这里依旧以客户端的connect
为例(服务端的bind
同理),它都会走到之前提及的initAndRegister()
方法,注意第3步:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 1. channelFactory通过反射创建Channel
channel = channelFactory.newChannel();
// 2. 涉及channelPipeline的配置初始化
init(channel);
} catch(Throwable t) {
// ...
}
// ...
// ***3. 将channel注册到EventGroup中***
ChannelFuture regFuture = config().group().register(channel);
// ...
return regFuture;
}
第3步会进入NioEventLoopGroup
的父类MultithreadEventLoopGroup
的register()
:
1
2
3
4
public ChannelFuture register(Channel channel) {
// next()利用轮转法挑选一个NioEventLoop/线程,任何register
return next().register(channel);
}
然后进入NioEventLoop
的父类SingleThreadEventLoop
的register()
:
1
2
3
4
5
6
7
8
9
10
11
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
// 实际会进入这里
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
// 这里调用Channel的unsafe,进行register
promise.channel().unsafe().register(this, promise);
return promise;
}
之后进入AbstractUnsafe
的register()
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// ...
AbstractChannel.this.eventLoop = eventLoop;
// 执行register
if (eventLoop.inEventLoop()) {
// 若当前线程是自己,直接执行,通常不会进入这里
register0(promise);
} else {
try {
// 调用NioEventLoop,将register0的任务推入队列,必要时创建线程(一般进入这里)
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
// ...
}
}
}
3.2. 执行register
任务
最后会执行核心的register0()
,NioEventLoop
将会轮询并从队列中获取并执行这个任务,之后会说明。
它将Channel
和Selector
相关联,并传播register
事件(这时候ChannelInitializer
会展开handler
)。
这里看下register0()
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private void register0(ChannelPromise promise) {
try {
// ...
boolean firstRegistration = neverRegistered;
// 1. JDK底层:将Channel注册到Selector上
doRegister();
neverRegistered = false;
registered = true;
// 2. 保证handlerAdded(...)事件在Promise通知前被调用
// 这里最重要的是ChannelInitializer:通过这步调用,它内部的handler才会被展开
// 即:在这步,内部的handler被展开成链
// 而其它的handlerAdded(...)在ChannelInitializer展开成链的时候进行调用
pipeline.invokeHandlerAddedIfNeeded();
// 3. 设置对应的Promise为success,以便于通知
safeSetSuccess(promise);
// 4. 传播register事件(见https://keys961.github.io/2019/07/12/源码阅读-Netty(3)/)
pipeline.fireChannelRegistered();
// 5. 若Channel已经打开(通常是不会active的)
// 不过当服务端accept连接,创建新的SocketChannel,注册时会进入这里,以便于Selector监听OP_READ
if (isActive()) {
if (firstRegistration) {
// 5.a. 若第一次执行register,还需要传播ChannelActive事件,通常进入这里
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// 5.b. 若被注册,且autoRead被设置,需要设置Channel的interestOp,让其监听OP_READ事件,通常不会进入这里
beginRead();
}
}
} catch (Throwable t) {
// Close fd
}
}
可见,register0()
做了下面几件事情:
-
将
Channel
和Selector
绑定 -
预先触发
handlerAdded()
(主要是ChannelInitializer
,用于展开内部的handler
)一般而言
p.addLast(handler)
会触发handlerAdded(...)
,但是只会在channel
被注册后才会被触发(即registered == true
)。对于
ChannelInitializer
的添加,此时registered == false
,所以不会被触发handlerAdded(...)
,只会在channel
被注册后手动显式调用才能触发(从而展开内部handler
为链) -
传播
register
事件 -
若
channel
本身已激活,需要传播channelActive
事件或者开始准备读取(前者一般也会调用到后者)
后2步的内容可参考这里的3.3.和4.小节。
而对于JDK的Channel
和Selector
的绑定,见下面的代码:
1
2
// Channel#register(selector, ops, attribute)
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
可见监听操作是0,即什么都不监听。可以推测,后面的操作会修改监听的事件。
register
后,才会触发后面的connect
/bind
。
4. 总结
本文主要讲述了EventLoopGroup
和EventLoop
的创建,说明了Netty是如何管理线程池的(主要是NioXX
的线程池)。
此外,通过回顾register
事件,将Channel
初始化和EventLoop
联系到了一起。
之后会详细说明NioEventLoop#run
,说明它的整个轮询工作流程。