0. Overview
之前说明了Netty的几大组件,包括:
ChannelChannelPipelineChannelHandler-
EventLoop/EventLoopGroup Future/Promise
本文就整理一下服务端bind/客户端connect的流程。
1. 客户端connect
这里回到Bootstrap#doResolveAndConnect(remoteAddr, localAddr)方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
// 之前完成了的
// 0. 配置中初始化了EventLoopGroup线程池
// 1. 初始化套接字和Channel、Pipeline
// 2. 将Handler添加到Channel的pipeline上
// 3. 选择一个EventLoop,注册Channel到Selector上
// 可知,一个连接绑定一个Channel,
// 而Channel被绑定到某个NioEventLoop上, 因为不同NioEventLoop的selector是互不相干的
// (如Linux上,SelectorProvider#openSelector()是创建一个新EpollSelectorImpl实例)
// 即在客户端下:一个连接的数据被一个线程处理
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.isDone()) {
if (!regFuture.isSuccess()) {
return regFuture;
}
// 看这里
return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
} else {
// ....
}
}
上面解释中可知道:在客户端下,一个连接的事件只被一个固定的线程处理
这里主要看doResolveAndConnect0,这里省略篇幅,下面是调用链:
Bootstrap#doResolveAndConnect0Bootstrap#doConnectAbstractChannel#connectDefaultChannelPipeline#connectTailContext#connect:这是最后的一步,即到调用链的尾部进行connect
前面可知,connect事件是Outbound事件,因此tail是不作处理,只向前找OutboundHandler并传播该事件:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Override
public ChannelFuture connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
// Validation ...
// 1. 找下一个OutboundHandler
final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
EventExecutor executor = next.executor();
// 2. 调用invokeConnect以传播事件
if (executor.inEventLoop()) {
next.invokeConnect(remoteAddress, localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeConnect(remoteAddress, localAddress, promise);
}
}, promise, null);
}
return promise;
}
最后会传入head(HeadContext)上,它调用了unsafe.connect(remoteAddress, localAddress, promise),看看内部是怎么做的:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public final void connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
// validation ...
boolean wasActive = isActive();
// doConnect就是进行连接操作
if (doConnect(remoteAddress, localAddress)) {
// 填充promise结果
fulfillConnectPromise(promise, wasActive);
} else {
connectPromise = promise;
requestedRemoteAddress = remoteAddress;
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
//利用当前绑定/注册的NioEventLoop的schedule功能处理timeout问题
connectTimeoutFuture = eventLoop().schedule(() -> {
// Handle timeout using schedule
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
// set cancel listener ...
} catch (Throwable t) {
// handle exception and close it
}
}
实际调用的是doConnect(remoteAddr, localAddr):
-
若连接不成功,会设置
selector关注OP_CONNECT事件,以便之后关注OP_READ | OP_WRITE | OP_ACCEPT剩下的,就由
NioEventLoop#run来处理了(方法中的processSelectedKeys()方法)- 若关注到了
OP_cONNECT,则后面会关注OP_READ | OP_WRITE | OP_ACCEPT - 利用
unsafe.finishConnect(),调用pipeline.fireChannelActive()以传播ChannelActive事件
- 若关注到了
-
若连接成功,进入
filfillConnectPromise(promise, wasActive),它会传播channelActive事件,并设置selector关注OP_READ可以看下文中服务端对于
channelActive事件的处理,它最后标记selector关注OP_ACCEPT而关注的事件是在创建
NioSocketChannel/NioServerSocketChannel时指定,它传入参数,用于初始化readInterestOps字段。而
channelActive事件传播后默认(可配置不触发,也可手动触发)会触发doBeginRead()方法,用readInterestOps字段设置selector关注的事件,以启动读取:- 对于服务端的
NioServerSocketChannel,事件是OP_ACCEPT - 对于客户端
NioSocketChannel,事件是OP_READ
- 对于服务端的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (localAddress != null) {
doBind0(localAddress);
}
boolean success = false;
try {
// 1. 这里进行底层连接
boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
if (!connected) {
// 2. 不连接成功,则让selector关注OP_CONNECT,以便成功后关注OP_READ, OP_WRITE, OP_ACCEPT
selectionKey().interestOps(SelectionKey.OP_CONNECT);
}
success = true;
return connected;
} finally {
// Close if fail...
}
}
2. 服务端bind
和客户端类似,调用链如下:
AbstractBootstrap#doBindAbstractBootstrap#doBind0AbstractChannel#bindDefaultChannelPipeline#bindTailContext#bind:这里进入调用链
和connect一样,bind也是Outbound事件,所以最终传入HeadContext#bind:
1
2
3
4
5
6
// In HeadContext
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
// 执行bind
unsafe.bind(localAddress, promise);
}
实质上也是用Unsafe进行bind,执行2步:
- 底层
socket进行地址bind & listen - 传播
ChannelActive事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop(); // 这里的EventLoop是parent/boss,它从parentGroup里选的
// validation...
// 并判断,当绑定的地址不是掩码地址,但Channel配置了广播,且在*nix系统上,警告非root下收不到广播包...
boolean wasActive = isActive();
try {
// 1. 真正bind的操作
doBind(localAddress);
} catch (Throwable t) {
// deal with error and close
return;
}
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
// 2. 传播ChannelActive的Inbound事件
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
2.1. bind
首先是bind,进入NioServerSocketChannel#doBind,这里就是底层Channel的bind(实际就是下面socket的bind & listen):
1
2
3
4
5
6
7
8
9
10
protected void doBind(SocketAddress localAddress) throws Exception {
// 底层操作就是:
// Net.bind(fd, isa.getAddress(), isa.getPort());
// Net.listen(fd, backlog < 1 ? 50 : backlog)
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
2.2. 传播channelActive事件
然后就是传播channelActive事件,从head开始传播:
1
2
3
4
5
6
7
@Override
public void channelActive(ChannelHandlerContext ctx) {
// 传播事件
ctx.fireChannelActive();
// 开启读取
readIfIsAutoRead();
}
这里传播事件没什么好说的,最后也不会处理什么。但关键的是下面的readIfIsAutoRead():
1
2
3
4
5
6
private void readIfIsAutoRead() {
// 只在AUTO_READ打开的时候,才会读
if (channel.config().isAutoRead()) {
channel.read();
}
}
这里涉及一个AUTO_READ,默认情况下是打开的。当然你可以关闭它。但是关闭后需要用下面的方法打开,才能读取数据:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
channel.config().setAutoRead(true);
// 原因,在DefaultChannelConfig中,打开该设置后,channel会强制读取(关闭该设置同理)
public ChannelConfig setAutoRead(boolean autoRead) {
boolean oldAutoRead = AUTOREAD_UPDATER.getAndSet(this, autoRead ? 1 : 0) == 1;
if (autoRead && !oldAutoRead) {
// 打开后强制读取
channel.read();
} else if (!autoRead && oldAutoRead) {
// 关闭后清除读取状态,NioServerSocketChannel会清除读取的标志(OP_READ, OP_ACCEPT)
autoReadCleared();
}
return this;
}
回到readIfIsAutoRead(),下一步是channel.read(),它也是Outbound事件,从tail开始向head传播,最后到head,然后调用链如下:
-
HeadContext#read -
AbstractUnsafe#beginRead -
AbstractNioMessageChannel#doBeginRead注意这里是
AbstractNioMessageChannel,对应的是NioMessageUnsafe;而客户端对应的是
AbstractNioByteChannel,对应的是NioByteUnsafe -
AbstractNioChannel#doBeginRead
看最后一个方法,在这里,Selector的interestOps被设置:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// In AbstractNioChannel#doBeginRead
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
// 这里设置关注的事件,使得Channel可以关注读取事件(如OP_READ, OP_ACCEPT)
selectionKey.interestOps(interestOps | readInterestOp);
}
}
这个事件是什么?答案是OP_ACCEPT,这在NioServerSocketChannel初始化的时候指定的:
1
2
3
4
5
public NioServerSocketChannel(ServerSocketChannel channel) {
// 这里设置readInterestOp = OP_ACCEPT
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
关注了OP_ACCEPT后,之后的流程就会交给NioEventLoop#run处理:
- 读取数据
- 传播
channelRead事件
上面服务端bind时,挑选的eventLoop是从bossGroup取,因此bind是单线程的;而accept时,由于eventLoop并没有变,因此accept也是单线程的。(即一个channel下,这些操作最多只用了1个线程)
通常情况下,一条
pipeline上的handler处理都是由相同的eventLoop处理的,而channel和pipeline绑定,即:一个channel对应一条pipeline,对应一个eventLoop/线程,一条channel的所有任务都由相同线程处理。所以耗时的任务千万不要添加到
pipeline,除非使用异步处理。
那么workerGroup是怎么回事,它和bossGroup有什么关系,看下面第3节。
3. 服务端bossGroup和workerGroup
我们再回到EnchoServer的初始化,我们关注下面注释标记的内容:
1
2
3
4
5
6
7
8
9
b.group(bossGroup, workerGroup) // bossGroup & workerGroup
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO)) // handler & childHandler
.childHandler(ch -> {
ChannelPipeline p = ch.pipeline();
// ...
p.addLast(new EchoServerHandler());
});
3.1. NioServerSocketChannel绑定的EventLoop
ServerBootstrap继承了AbstractBootstrap,因而在ServerBootstrap中,可知:调用bootstrap.config().group()返回的是bossGroup。
1
2
3
4
5
6
7
// ServerBootstrap初始化
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
//...
this.childGroup = childGroup;
return this;
}
而服务端在初始化和注册Channel到EventLoop时调用的也是这行:
1
2
// In AbstractBootstrap#initAndRegister
config().group().register(channel)
所以NioServerSocketChannel是被绑到bossGroup上。
3.2. handler和childHandler
3.2.1. 构造handler链
在将handler绑到pipeline上,执行的是下面的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// In ServerBootstrap#init
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
// 添加handler
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// 添加了ServerBootstrapAcceptor
pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
可知,初始化(register)后,channel(NioServerSocketChannel)绑定的pipeline,它上面的链是:head -> handler -> serverBootstrapAcceptor -> tail。很明显,这里多了个ServerBootstrapAcceptor。
在
EchoServer中,NioServerSocketChannel对应的这条链是:
head -> loggingHanler -> serverBootstrapAcceptor -> tail
3.2.2. ServerBootstrapAcceptor
ServerBootstrapAcceptor是一个InboundChannelHandler,它实际上主要重写了channelRead(ctx, msg)方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 1. 获取accept后创建的与客户端通信的SocketChannel
// 之后来说明这个channel怎么来的
final Channel child = (Channel) msg;
// 2. 给这个channel的pipeline添加childHandler,注意这个pipeline是新创建的
child.pipeline().addLast(childHandler);
// 3. 设置新创建channel的options
setChannelOptions(child, childOptions, logger);
// 4. 设置新创建channel的attribute
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
// 5. 将channel注册到childGroup的eventLoop中
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
可见,ServerBootstrapAcceptor在处理channelRead事件时:
- 获取新创建的
channel(SocketChannel),和里面新创建的pipeline(DefaultChannelPipeline) - 给
pipeline添加childHandler - 设置一些配置
- 将得到的
channel注册到childGroup上(注册到childGroup其中的一个eventLoop上)
注册的流程和之前提及的一样,会把ChannelInitializer内部的ChannelHandler展开,最后会产生一条新链:head -> childHandlers -> ... -> tail。
在
EchoServer中,新产生的SocketChannel对应的这条链是:
head -> sslHandler -> echoServerHandler -> tail
那么第1步的Channel(即child变量)怎么来的?
之前得知,当触发OP_ACCEPT时,调用unsafe.read(),因此答案在NioMessageUnsafe#read(注意不在NioByteUnsafe,因为它是给客户端用的):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// In NioMessageUnsafe#read
public void read() {
// ....
try {
try {
do {
// ...
// 1. 读取内容,这里doReadMessage是真正的读取
int localRead = doReadMessages(readBuf);
//...
} while (allocHandle.continueReading());
} catch (Throwable t) {
// ...
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
// 2. 传播channelRead事件,和客户端NioByteUnsafe一样
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
// 3. 读取完毕,传播channelReadComplete事件,和客户端NioByteUnsafe一样
pipeline.fireChannelReadComplete();
// ...
} finally {
// ...
}
}
而这里doReadMessage(readBuf)方法被NioServerSocketChannel实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// In NioServerSocketChannel#doReadMessage
protected int doReadMessages(List<Object> buf) throws Exception {
// 1. accept连接,创建了新的SocketChannel供服务端和客户端通信
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
// 2. 把上面的channel添加到结果里
// 这里NioSocketChannel的pipeline是新创建的,readInterestOps = OP_READ
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
// ...
}
return 0;
}
现在,很明显,doReadMessage(readBuf)返回的就是服务端accept连接而新创建的SocketChannel了。
而上文可知NioSocketChannel初始化关注的事件(readInterestOps)是OP_READ。
注册的时候,当channel是active时还得手动触发channelActive事件/强制开始读取(channelActive事件的处理见前面的章节),默认下最终都会设置selector关注OP_READ事件。
由于注册在
childGroup上,之后可交给childGroup的NioEventLoop轮询并处理I/O事件了
总结,服务端接收客户端连接后的变化,可由下图表示:

注意
childGroup中,eventLoop,channel,pipeline和selector的关系:
- 1个
NioSocketChannel各自配有1个Pipeline- 1个
NioEventLoop各自配有1个Selector- 1个
NioEventLoop/Selector可以和多个NioSocketChannel/Pipeline绑定,进行select轮询- 1个
NioSocketChannel/Pipeline只能和1个NioEventLoop/Selector绑定
3.2.3. handler和childHandler的区别
总结上面的分析,handler和childHandler的区别是:
handler:在客户端和服务端的新连接建立时,handler处理这些新连接childHandler:连接建立后,客户端发送请求,childHandler用于处理这些请求
3.3. 当连接数量很多时
上文可知,当连接建立后,会新创建一个NioSocketChannel(数量不会超过BACKLOG值),然后它被注册到childGroup的NioEventLoop和对应的selector上。
因此,一个NioEventLoop/线程会和多个channel绑定,它里面的selector也会和很多个channel绑定。当连接数很多时,一个childGroup里的线程需要处理多个连接的请求。
即这里,
channel和eventLoop是多对一的关系
这样,当某些请求阻塞时,是非常容易造成其它连接上的请求(I/O任务)和队列中的非I/O任务超时,造成“饥饿”现象。
所以,在Netty上处理任务时,添加的任务一定要满足下面的原则:
-
添加的任务不能阻塞
-
若任务是阻塞,想方设法将其异步化
根据上面的源码分析:
- 读取请求时,所在线程必须是
childGroup中的 - 返回请求时,没有线程的要求(因为传播
Inbound和Outbound事件本身没有线程的要求)
所以可以把阻塞任务扔到线程池,并在线程池内将把结果输出
- 读取请求时,所在线程必须是