0. Overview
之前说明了Netty的几大组件,包括:
Channel
ChannelPipeline
ChannelHandler
-
EventLoop
/EventLoopGroup
Future
/Promise
本文就整理一下服务端bind
/客户端connect
的流程。
1. 客户端connect
这里回到Bootstrap#doResolveAndConnect(remoteAddr, localAddr)
方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
// 之前完成了的
// 0. 配置中初始化了EventLoopGroup线程池
// 1. 初始化套接字和Channel、Pipeline
// 2. 将Handler添加到Channel的pipeline上
// 3. 选择一个EventLoop,注册Channel到Selector上
// 可知,一个连接绑定一个Channel,
// 而Channel被绑定到某个NioEventLoop上, 因为不同NioEventLoop的selector是互不相干的
// (如Linux上,SelectorProvider#openSelector()是创建一个新EpollSelectorImpl实例)
// 即在客户端下:一个连接的数据被一个线程处理
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.isDone()) {
if (!regFuture.isSuccess()) {
return regFuture;
}
// 看这里
return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
} else {
// ....
}
}
上面解释中可知道:在客户端下,一个连接的事件只被一个固定的线程处理
这里主要看doResolveAndConnect0
,这里省略篇幅,下面是调用链:
Bootstrap#doResolveAndConnect0
Bootstrap#doConnect
AbstractChannel#connect
DefaultChannelPipeline#connect
TailContext#connect
:这是最后的一步,即到调用链的尾部进行connect
前面可知,connect
事件是Outbound
事件,因此tail
是不作处理,只向前找OutboundHandler
并传播该事件:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Override
public ChannelFuture connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
// Validation ...
// 1. 找下一个OutboundHandler
final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
EventExecutor executor = next.executor();
// 2. 调用invokeConnect以传播事件
if (executor.inEventLoop()) {
next.invokeConnect(remoteAddress, localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeConnect(remoteAddress, localAddress, promise);
}
}, promise, null);
}
return promise;
}
最后会传入head
(HeadContext
)上,它调用了unsafe.connect(remoteAddress, localAddress, promise)
,看看内部是怎么做的:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public final void connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
// validation ...
boolean wasActive = isActive();
// doConnect就是进行连接操作
if (doConnect(remoteAddress, localAddress)) {
// 填充promise结果
fulfillConnectPromise(promise, wasActive);
} else {
connectPromise = promise;
requestedRemoteAddress = remoteAddress;
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
//利用当前绑定/注册的NioEventLoop的schedule功能处理timeout问题
connectTimeoutFuture = eventLoop().schedule(() -> {
// Handle timeout using schedule
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
// set cancel listener ...
} catch (Throwable t) {
// handle exception and close it
}
}
实际调用的是doConnect(remoteAddr, localAddr)
:
-
若连接不成功,会设置
selector
关注OP_CONNECT
事件,以便之后关注OP_READ | OP_WRITE | OP_ACCEPT
剩下的,就由
NioEventLoop#run
来处理了(方法中的processSelectedKeys()
方法)- 若关注到了
OP_cONNECT
,则后面会关注OP_READ | OP_WRITE | OP_ACCEPT
- 利用
unsafe.finishConnect()
,调用pipeline.fireChannelActive()
以传播ChannelActive
事件
- 若关注到了
-
若连接成功,进入
filfillConnectPromise(promise, wasActive)
,它会传播channelActive
事件,并设置selector
关注OP_READ
可以看下文中服务端对于
channelActive
事件的处理,它最后标记selector
关注OP_ACCEPT
而关注的事件是在创建
NioSocketChannel
/NioServerSocketChannel
时指定,它传入参数,用于初始化readInterestOps
字段。而
channelActive
事件传播后默认(可配置不触发,也可手动触发)会触发doBeginRead()
方法,用readInterestOps
字段设置selector
关注的事件,以启动读取:- 对于服务端的
NioServerSocketChannel
,事件是OP_ACCEPT
- 对于客户端
NioSocketChannel
,事件是OP_READ
- 对于服务端的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (localAddress != null) {
doBind0(localAddress);
}
boolean success = false;
try {
// 1. 这里进行底层连接
boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
if (!connected) {
// 2. 不连接成功,则让selector关注OP_CONNECT,以便成功后关注OP_READ, OP_WRITE, OP_ACCEPT
selectionKey().interestOps(SelectionKey.OP_CONNECT);
}
success = true;
return connected;
} finally {
// Close if fail...
}
}
2. 服务端bind
和客户端类似,调用链如下:
AbstractBootstrap#doBind
AbstractBootstrap#doBind0
AbstractChannel#bind
DefaultChannelPipeline#bind
TailContext#bind
:这里进入调用链
和connect
一样,bind
也是Outbound
事件,所以最终传入HeadContext#bind
:
1
2
3
4
5
6
// In HeadContext
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
// 执行bind
unsafe.bind(localAddress, promise);
}
实质上也是用Unsafe
进行bind
,执行2步:
- 底层
socket
进行地址bind & listen
- 传播
ChannelActive
事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop(); // 这里的EventLoop是parent/boss,它从parentGroup里选的
// validation...
// 并判断,当绑定的地址不是掩码地址,但Channel配置了广播,且在*nix系统上,警告非root下收不到广播包...
boolean wasActive = isActive();
try {
// 1. 真正bind的操作
doBind(localAddress);
} catch (Throwable t) {
// deal with error and close
return;
}
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
// 2. 传播ChannelActive的Inbound事件
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
2.1. bind
首先是bind
,进入NioServerSocketChannel#doBind
,这里就是底层Channel
的bind
(实际就是下面socket
的bind & listen
):
1
2
3
4
5
6
7
8
9
10
protected void doBind(SocketAddress localAddress) throws Exception {
// 底层操作就是:
// Net.bind(fd, isa.getAddress(), isa.getPort());
// Net.listen(fd, backlog < 1 ? 50 : backlog)
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
2.2. 传播channelActive
事件
然后就是传播channelActive
事件,从head
开始传播:
1
2
3
4
5
6
7
@Override
public void channelActive(ChannelHandlerContext ctx) {
// 传播事件
ctx.fireChannelActive();
// 开启读取
readIfIsAutoRead();
}
这里传播事件没什么好说的,最后也不会处理什么。但关键的是下面的readIfIsAutoRead()
:
1
2
3
4
5
6
private void readIfIsAutoRead() {
// 只在AUTO_READ打开的时候,才会读
if (channel.config().isAutoRead()) {
channel.read();
}
}
这里涉及一个AUTO_READ
,默认情况下是打开的。当然你可以关闭它。但是关闭后需要用下面的方法打开,才能读取数据:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
channel.config().setAutoRead(true);
// 原因,在DefaultChannelConfig中,打开该设置后,channel会强制读取(关闭该设置同理)
public ChannelConfig setAutoRead(boolean autoRead) {
boolean oldAutoRead = AUTOREAD_UPDATER.getAndSet(this, autoRead ? 1 : 0) == 1;
if (autoRead && !oldAutoRead) {
// 打开后强制读取
channel.read();
} else if (!autoRead && oldAutoRead) {
// 关闭后清除读取状态,NioServerSocketChannel会清除读取的标志(OP_READ, OP_ACCEPT)
autoReadCleared();
}
return this;
}
回到readIfIsAutoRead()
,下一步是channel.read()
,它也是Outbound
事件,从tail
开始向head
传播,最后到head
,然后调用链如下:
-
HeadContext#read
-
AbstractUnsafe#beginRead
-
AbstractNioMessageChannel#doBeginRead
注意这里是
AbstractNioMessageChannel
,对应的是NioMessageUnsafe
;而客户端对应的是
AbstractNioByteChannel
,对应的是NioByteUnsafe
-
AbstractNioChannel#doBeginRead
看最后一个方法,在这里,Selector
的interestOps
被设置:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// In AbstractNioChannel#doBeginRead
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
// 这里设置关注的事件,使得Channel可以关注读取事件(如OP_READ, OP_ACCEPT)
selectionKey.interestOps(interestOps | readInterestOp);
}
}
这个事件是什么?答案是OP_ACCEPT
,这在NioServerSocketChannel
初始化的时候指定的:
1
2
3
4
5
public NioServerSocketChannel(ServerSocketChannel channel) {
// 这里设置readInterestOp = OP_ACCEPT
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
关注了OP_ACCEPT
后,之后的流程就会交给NioEventLoop#run
处理:
- 读取数据
- 传播
channelRead
事件
上面服务端bind
时,挑选的eventLoop
是从bossGroup
取,因此bind
是单线程的;而accept
时,由于eventLoop
并没有变,因此accept
也是单线程的。(即一个channel
下,这些操作最多只用了1个线程)
通常情况下,一条
pipeline
上的handler
处理都是由相同的eventLoop
处理的,而channel
和pipeline
绑定,即:一个channel
对应一条pipeline
,对应一个eventLoop
/线程,一条channel的所有任务都由相同线程处理。所以耗时的任务千万不要添加到
pipeline
,除非使用异步处理。
那么workerGroup
是怎么回事,它和bossGroup
有什么关系,看下面第3节。
3. 服务端bossGroup
和workerGroup
我们再回到EnchoServer
的初始化,我们关注下面注释标记的内容:
1
2
3
4
5
6
7
8
9
b.group(bossGroup, workerGroup) // bossGroup & workerGroup
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO)) // handler & childHandler
.childHandler(ch -> {
ChannelPipeline p = ch.pipeline();
// ...
p.addLast(new EchoServerHandler());
});
3.1. NioServerSocketChannel
绑定的EventLoop
ServerBootstrap
继承了AbstractBootstrap
,因而在ServerBootstrap
中,可知:调用bootstrap.config().group()
返回的是bossGroup
。
1
2
3
4
5
6
7
// ServerBootstrap初始化
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
//...
this.childGroup = childGroup;
return this;
}
而服务端在初始化和注册Channel
到EventLoop
时调用的也是这行:
1
2
// In AbstractBootstrap#initAndRegister
config().group().register(channel)
所以NioServerSocketChannel
是被绑到bossGroup
上。
3.2. handler
和childHandler
3.2.1. 构造handler
链
在将handler
绑到pipeline
上,执行的是下面的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// In ServerBootstrap#init
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
// 添加handler
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// 添加了ServerBootstrapAcceptor
pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
可知,初始化(register
)后,channel
(NioServerSocketChannel
)绑定的pipeline
,它上面的链是:head -> handler -> serverBootstrapAcceptor -> tail
。很明显,这里多了个ServerBootstrapAcceptor
。
在
EchoServer
中,NioServerSocketChannel
对应的这条链是:
head -> loggingHanler -> serverBootstrapAcceptor -> tail
3.2.2. ServerBootstrapAcceptor
ServerBootstrapAcceptor
是一个InboundChannelHandler
,它实际上主要重写了channelRead(ctx, msg)
方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 1. 获取accept后创建的与客户端通信的SocketChannel
// 之后来说明这个channel怎么来的
final Channel child = (Channel) msg;
// 2. 给这个channel的pipeline添加childHandler,注意这个pipeline是新创建的
child.pipeline().addLast(childHandler);
// 3. 设置新创建channel的options
setChannelOptions(child, childOptions, logger);
// 4. 设置新创建channel的attribute
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
// 5. 将channel注册到childGroup的eventLoop中
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
可见,ServerBootstrapAcceptor
在处理channelRead
事件时:
- 获取新创建的
channel
(SocketChannel
),和里面新创建的pipeline
(DefaultChannelPipeline
) - 给
pipeline
添加childHandler
- 设置一些配置
- 将得到的
channel
注册到childGroup
上(注册到childGroup
其中的一个eventLoop
上)
注册的流程和之前提及的一样,会把ChannelInitializer
内部的ChannelHandler
展开,最后会产生一条新链:head -> childHandlers -> ... -> tail
。
在
EchoServer
中,新产生的SocketChannel
对应的这条链是:
head -> sslHandler -> echoServerHandler -> tail
那么第1步的Channel
(即child
变量)怎么来的?
之前得知,当触发OP_ACCEPT
时,调用unsafe.read()
,因此答案在NioMessageUnsafe#read
(注意不在NioByteUnsafe
,因为它是给客户端用的):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// In NioMessageUnsafe#read
public void read() {
// ....
try {
try {
do {
// ...
// 1. 读取内容,这里doReadMessage是真正的读取
int localRead = doReadMessages(readBuf);
//...
} while (allocHandle.continueReading());
} catch (Throwable t) {
// ...
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
// 2. 传播channelRead事件,和客户端NioByteUnsafe一样
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
// 3. 读取完毕,传播channelReadComplete事件,和客户端NioByteUnsafe一样
pipeline.fireChannelReadComplete();
// ...
} finally {
// ...
}
}
而这里doReadMessage(readBuf)
方法被NioServerSocketChannel
实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// In NioServerSocketChannel#doReadMessage
protected int doReadMessages(List<Object> buf) throws Exception {
// 1. accept连接,创建了新的SocketChannel供服务端和客户端通信
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
// 2. 把上面的channel添加到结果里
// 这里NioSocketChannel的pipeline是新创建的,readInterestOps = OP_READ
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
// ...
}
return 0;
}
现在,很明显,doReadMessage(readBuf)
返回的就是服务端accept
连接而新创建的SocketChannel
了。
而上文可知NioSocketChannel
初始化关注的事件(readInterestOps
)是OP_READ
。
注册的时候,当channel
是active
时还得手动触发channelActive
事件/强制开始读取(channelActive
事件的处理见前面的章节),默认下最终都会设置selector
关注OP_READ
事件。
由于注册在
childGroup
上,之后可交给childGroup
的NioEventLoop
轮询并处理I/O事件了
总结,服务端接收客户端连接后的变化,可由下图表示:
注意
childGroup
中,eventLoop
,channel
,pipeline
和selector
的关系:
- 1个
NioSocketChannel
各自配有1个Pipeline
- 1个
NioEventLoop
各自配有1个Selector
- 1个
NioEventLoop
/Selector
可以和多个NioSocketChannel
/Pipeline
绑定,进行select
轮询- 1个
NioSocketChannel
/Pipeline
只能和1个NioEventLoop
/Selector
绑定
3.2.3. handler
和childHandler
的区别
总结上面的分析,handler
和childHandler
的区别是:
handler
:在客户端和服务端的新连接建立时,handler
处理这些新连接childHandler
:连接建立后,客户端发送请求,childHandler
用于处理这些请求
3.3. 当连接数量很多时
上文可知,当连接建立后,会新创建一个NioSocketChannel
(数量不会超过BACKLOG
值),然后它被注册到childGroup
的NioEventLoop
和对应的selector
上。
因此,一个NioEventLoop
/线程会和多个channel
绑定,它里面的selector
也会和很多个channel
绑定。当连接数很多时,一个childGroup
里的线程需要处理多个连接的请求。
即这里,
channel
和eventLoop
是多对一的关系
这样,当某些请求阻塞时,是非常容易造成其它连接上的请求(I/O任务)和队列中的非I/O任务超时,造成“饥饿”现象。
所以,在Netty上处理任务时,添加的任务一定要满足下面的原则:
-
添加的任务不能阻塞
-
若任务是阻塞,想方设法将其异步化
根据上面的源码分析:
- 读取请求时,所在线程必须是
childGroup
中的 - 返回请求时,没有线程的要求(因为传播
Inbound
和Outbound
事件本身没有线程的要求)
所以可以把阻塞任务扔到线程池,并在线程池内将把结果输出
- 读取请求时,所在线程必须是