0. 回顾
之前讲了一个Netty服务端和客户端启动所需要的一些组件,以及相互交互时通信的通道Channel
。
而Channel
的创建,不仅仅需要创建socket
句柄,将Java NIO的Channel
包装,而且还需要:
- 将其和
EventGroup
集成 - 将其和
ChannelPipeline
集成 - 错误处理
- …
这里要讲的是ChannelPipeline
。
1. ChannelPipeline
的大体说明
每个Channel
内部会维护一个channelPipeline
(即ChannelPipeline
),而channelPipeline
由一系列channelHandler
(即ChannelHandler
,实际上是被ChannelHandlerContext
包装)组成一条双向链表。
而channelHandler
用于处理任务逻辑,多个channelHandler
构成职责链,共同完成一项任务。
2. Inbound & Outbound
Netty引入Inbound和Outbound概念,将ChannelHandler
分成2类,它们作用在不同的地方:
ChannelInboundHandler
:当read事件发生时调用ChannelOutboundHandler
:当write事件发生时调用
对应具体顺序是:
-
socket.read() -> inboundHandler -> ... -> inboundHandler_tail
-
req from channel/channelContext -> outbountHandler ... ->socket.write()
3. 再探ChannelPipeline
的初始化
3.1. ChannelPipeline
的实例化
ChannelPipeline
的实例化在Channel
实例化时进行,以客户端为例,它的调用链可以是:
connect() -> doResolveAndConnect() -> initAndRegister() -> channelFactory#newChannel() -> ... -> ctor in NioSocketChannel
而ChannelPipeline
的实例化在AbstractChannel
的构造方法的newChannelPipeline()
进行,它会实例化一个DefaultChannelPipeline
实例
1
2
3
4
5
6
7
8
9
10
11
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
// 初始化头尾ChannelHandlerContext
// 而ChannelHandlerContext包装了ChannelHandler
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
易知,初始化了头尾2个节点的双向链表,它们都是ChannelHandlerContext
实例(它包装了ChannelHandler
),其中:
- 头实现了
ChannelInboundHandler
和ChannelOutboundHandler
- 尾仅实现了
ChannelInboundHandler
而ChannelHandlerContext
初始化中,有一步很重要,即标记mask
以标识包装的ChannelHandler
支持哪些操作,也即可被跳过,加上了@Skip
注解。
所以若希望某个
channelHandler
不执行某些操作:
- 空实现
- 加上
@Skip
注解
3.2. 添加ChannelHandler
启动服务端/客户端时,会调用handler()
方法添加ChannelHandler
,可往里面添加ChannelInitializer
实例(它也是ChannelInboundHandler
实例),以添加多个ChannelHandler
。
而实际上,真正的添加,是在之前所述的initAndRegister()
方法中的init()
方法执行:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 客户端
void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
// handler()中指定的Handler,并添加
p.addLast(config.handler());
//...
}
// 服务端
void init(Channel channel) throws Exception {
// ...
ChannelPipeline p = channel.pipeline();
// ...
p.addLast(new ChannelInitializer<Channel>() {
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
// handler()中指定的Handler
ChannelHandler handler = config.handler();
if (handler != null) {
// 添加Handler
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
public void run() {
// 对于服务端,还得添加一个ServerBootstrapAcceptor
// 它是ChannelInboundHandler,用于处理新连接
// 注意到这里添加了childHandler等信息,
// 也说明了,只有连接建立后,childHandler才会被触发
pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
对于客户端,直接往最后添加ChannelHandler
(可以是ChannelInitializer
);而对于服务端,添加的是一个包装的ChannelInitializer
。添加在pipeline.addLast()
执行:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
// 1. 检查重复、重名,创建新的DefaultChannelHandlerContext
checkMultiplicity(handler);
newCtx = newContext(group, filterName(name, handler), handler);
// 2. 添加刚创建的Context
addLast0(newCtx);
// ...
}
// 3. 触发Handler被添加的事件
callHandlerAdded0(newCtx);
return this;
}
主要看第2步,可知只是简单的链表添加:
1
2
3
4
5
6
7
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
3.3. ChannelInitializer
的ChannelHandler
添加
由3.2.知,若什么都不处理,Netty只做了延迟处理,链表中的项只是ChannelInitializer
,而不是我们真正添加的ChannelHandler
(ChannelHandlerContext
),那么我们添加的ChannelHandler
是如何加入到链表中的?
这里涉及到EventGroup
的内容,而这部分内容之后会讲,大体的步骤是:
-
Channel
向EventGroup
注册 -
注册时,需要触发
Channel
注册的事件,那么需要对pipeline
传播注册消息,这里调用了Pipeline#fireChannelRegistered()
方法 -
调用该方法,会从
head
传播(传播具体后面描述)到整个链表项 -
到
ChannelInitializer
对应的Context
时,会触发handler.channelRegistered()
方法,这时:- 它会调用自己的
initChannel()
,将我们添加的handler
添加到链表中 - 然后向后传播
- 最后将自己从链表中移除
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception { // 1. 初始化我们定义的channelHandler // 这一步会对我们重写的方法进行包装,利用基于ConcurrentHashMap的Set, // 利用add是否成功,防止重复初始化 if (initChannel(ctx)) { // 2. 重新传播register事件(从head) // 若我们添加的是仍然是ChannelInitializer // 那么很明显,会递归添加handler并传播register事件 ctx.pipeline().fireChannelRegistered(); // 3. 移除自己到链表外 removeState(ctx); } else { // 4. 重复初始化,直接向后传播即可 ctx.fireChannelRegistered(); } }
- 它会调用自己的
该内容之后也会多次提及。
4. ChannelHandler
的传播
这里盗一张图,说明不同的ChannelHandler
是通过什么方法传播事件的:
LoggingHandler
对于读写,读在channelRead()
实现,而写在write()
实现,所以:
- I/O就绪后
4.1. InboundChannelHandler
的事件传播
这里还是挑之前提及的fireChannelRegistered()
。看代码:
1
2
3
4
public ChannelHandlerContext fireChannelRegistered() {
invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED));
return this;
}
注意第一行的findContextInbound
,它会从当前节点开始,向后找符合的ChannelHandlerContext
,然后调用对应的:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private AbstractChannelHandlerContext findContextInbound(int mask) {
// 向后寻找有对应操作的inbound handler context
AbstractChannelHandlerContext ctx = this;
do {
// 向后找
ctx = ctx.next;
} while ((ctx.executionMask & mask) == 0);
return ctx;
}
private void invokeChannelRegistered() {
if (invokeHandler()) {
try {
// 调用对应的channelRegistered方法
((ChannelInboundHandler) handler()).channelRegistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRegistered();
}
}
注意,向后找会检查mask
,这在之前提及过,它代表这个channelHandler
支持哪些操作(即可被跳过,加上了@Skip
注解)
4.2. OutboundChannelHandler
的事件传播
和4.1.节类似,只不过,传播的时候,是沿链表相反方向进行的,证据就是下面的代码:
1
2
3
4
5
6
7
8
private AbstractChannelHandlerContext findContextOutbound(int mask) {
AbstractChannelHandlerContext ctx = this;
do {
// 向前找
ctx = ctx.prev;
} while ((ctx.executionMask & mask) == 0);
return ctx;
}
4.3. 总结
根据事件传播,以及Netty源码的注释,可以得到:
对于Inbound
而言:
- 都是通知事件(如
Channel
状态改变,I/O就绪,如socket.read()
完成) - 由第一点可知,发起者必然是
Unsafe
,通知ChannelHandler
,所以处理者是ChannelHandler
- 传播由前向后(
head -> tail
) - 需要调用
context.fireIN_EVT()
才能传播事件,否则传播中止
对于Outbound
而言:
- 都是请求事件(如
Channel
或者ChannelHandlerContext
发出的请求) - 由第一点可知,发起者是
Channel
/ChannelHandlerContext
,最终处理一般都会进入Unsafe
,传播到最后一般也是Unsafe
处理(见HeadContext
,如写数据) - 传播由后向前(
tail -> head
) - 需要调用
context.OUT_EVT()
才能传播事件,否则传播中止
假如需要将Inbound
的内容传入Outbound
:
-
在
Inbound
尾部,通过context.OUT_EVT()
传播到Outbound
上例如将读取的数据写出,则:
- 在
Inbound
尾部的channelRead(ctx, msg)
中执行ctx.write(msg)
- 在
Outbound
头部实现write(ctx, msg, promise)
,并执行cxt.write(ctx, msg)
传播事件
- 在