源码阅读-Netty ChannelPipeline

Posted by keys961 on July 12, 2019

0. 回顾

之前讲了一个Netty服务端和客户端启动所需要的一些组件,以及相互交互时通信的通道Channel

Channel的创建,不仅仅需要创建socket句柄,将Java NIO的Channel包装,而且还需要:

  • 将其和EventGroup集成
  • 将其和ChannelPipeline集成
  • 错误处理

这里要讲的是ChannelPipeline

1. ChannelPipeline的大体说明

每个Channel内部会维护一个channelPipeline(即ChannelPipeline),而channelPipeline由一系列channelHandlerChannelHandler,实际上是被ChannelHandlerContext包装)组成一条双向链表

channelHandler用于处理任务逻辑,多个channelHandler构成职责链,共同完成一项任务。

2. Inbound & Outbound

Netty引入Inbound和Outbound概念,将ChannelHandler分成2类,它们作用在不同的地方:

  • ChannelInboundHandler:当read事件发生时调用
  • ChannelOutboundHandler:当write事件发生时调用

对应具体顺序是:

  • socket.read() -> inboundHandler -> ... -> inboundHandler_tail

  • req from channel/channelContext -> outbountHandler ... ->socket.write()

3. 再探ChannelPipeline的初始化

3.1. ChannelPipeline的实例化

ChannelPipeline的实例化在Channel实例化时进行,以客户端为例,它的调用链可以是:

  • connect() -> doResolveAndConnect() -> initAndRegister() -> channelFactory#newChannel() -> ... -> ctor in NioSocketChannel

ChannelPipeline的实例化在AbstractChannel的构造方法的newChannelPipeline()进行,它会实例化一个DefaultChannelPipeline实例

1
2
3
4
5
6
7
8
9
10
11
protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);
    // 初始化头尾ChannelHandlerContext
    // 而ChannelHandlerContext包装了ChannelHandler
    tail = new TailContext(this);
    head = new HeadContext(this);
    head.next = tail;
    tail.prev = head;
}

易知,初始化了头尾2个节点的双向链表,它们都是ChannelHandlerContext实例(它包装了ChannelHandler),其中:

  • 头实现了ChannelInboundHandlerChannelOutboundHandler
  • 尾仅实现了ChannelInboundHandler

ChannelHandlerContext初始化中,有一步很重要,即标记mask以标识包装的ChannelHandler支持哪些操作,也即可被跳过,加上了@Skip注解

所以若希望某个channelHandler不执行某些操作:

  • 空实现
  • 加上@Skip注解

3.2. 添加ChannelHandler

启动服务端/客户端时,会调用handler()方法添加ChannelHandler,可往里面添加ChannelInitializer实例(它也是ChannelInboundHandler实例),以添加多个ChannelHandler

而实际上,真正的添加,是在之前所述的initAndRegister()方法中的init()方法执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 客户端
void init(Channel channel) throws Exception {
    ChannelPipeline p = channel.pipeline();
    // handler()中指定的Handler,并添加
    p.addLast(config.handler());
    //...
}

// 服务端
void init(Channel channel) throws Exception {
    // ...
    ChannelPipeline p = channel.pipeline();
    // ...
    p.addLast(new ChannelInitializer<Channel>() {
        public void initChannel(final Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            // handler()中指定的Handler
            ChannelHandler handler = config.handler();
            if (handler != null) {
                // 添加Handler
                pipeline.addLast(handler);
            }
            ch.eventLoop().execute(new Runnable() {
                public void run() {
                    // 对于服务端,还得添加一个ServerBootstrapAcceptor
                    // 它是ChannelInboundHandler,用于处理新连接
                    // 注意到这里添加了childHandler等信息,
                    // 也说明了,只有连接建立后,childHandler才会被触发
                    pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
    		});
        }
    });
}

对于客户端,直接往最后添加ChannelHandler(可以是ChannelInitializer);而对于服务端,添加的是一个包装的ChannelInitializer。添加在pipeline.addLast()执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        // 1. 检查重复、重名,创建新的DefaultChannelHandlerContext
        checkMultiplicity(handler);
        newCtx = newContext(group, filterName(name, handler), handler);
        // 2. 添加刚创建的Context
        addLast0(newCtx);
        // ...
    }
    // 3. 触发Handler被添加的事件
    callHandlerAdded0(newCtx);
    return this;
}

主要看第2步,可知只是简单的链表添加:

1
2
3
4
5
6
7
private void addLast0(AbstractChannelHandlerContext newCtx) {
    AbstractChannelHandlerContext prev = tail.prev;
    newCtx.prev = prev;
    newCtx.next = tail;
    prev.next = newCtx;
    tail.prev = newCtx;
}

3.3. ChannelInitializerChannelHandler添加

由3.2.知,若什么都不处理,Netty只做了延迟处理,链表中的项只是ChannelInitializer,而不是我们真正添加的ChannelHandler(ChannelHandlerContext),那么我们添加的ChannelHandler是如何加入到链表中的?

这里涉及到EventGroup的内容,而这部分内容之后会讲,大体的步骤是:

  • ChannelEventGroup注册

  • 注册时,需要触发Channel注册的事件,那么需要对pipeline传播注册消息,这里调用了Pipeline#fireChannelRegistered()方法

  • 调用该方法,会从head传播(传播具体后面描述)到整个链表项

  • ChannelInitializer对应的Context时,会触发handler.channelRegistered()方法,这时:

    • 它会调用自己的initChannel(),将我们添加的handler添加到链表中
    • 然后向后传播
    • 最后将自己从链表中移除
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    
    public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        // 1. 初始化我们定义的channelHandler
        // 这一步会对我们重写的方法进行包装,利用基于ConcurrentHashMap的Set,
        // 利用add是否成功,防止重复初始化
        if (initChannel(ctx)) {
            // 2. 重新传播register事件(从head)
            // 若我们添加的是仍然是ChannelInitializer
            // 那么很明显,会递归添加handler并传播register事件
            ctx.pipeline().fireChannelRegistered();
            // 3. 移除自己到链表外
            removeState(ctx);
         } else {
            // 4. 重复初始化,直接向后传播即可
            ctx.fireChannelRegistered();
        }
    }
    

该内容之后也会多次提及。

4. ChannelHandler的传播

这里盗一张图,说明不同的ChannelHandler是通过什么方法传播事件的:

19

LoggingHandler对于读写,读在channelRead()实现,而写在write()实现,所以:

  • I/O就绪后

4.1. InboundChannelHandler的事件传播

这里还是挑之前提及的fireChannelRegistered()。看代码:

1
2
3
4
public ChannelHandlerContext fireChannelRegistered() {
    invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED));
    return this;
}

注意第一行的findContextInbound,它会从当前节点开始,向后找符合的ChannelHandlerContext,然后调用对应的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private AbstractChannelHandlerContext findContextInbound(int mask) {
    // 向后寻找有对应操作的inbound handler context
    AbstractChannelHandlerContext ctx = this;
    do {
        // 向后找
        ctx = ctx.next;
    } while ((ctx.executionMask & mask) == 0);
    return ctx;
}

private void invokeChannelRegistered() {
    if (invokeHandler()) {
        try {
            // 调用对应的channelRegistered方法
            ((ChannelInboundHandler) handler()).channelRegistered(this);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelRegistered();
    }
}

注意,向后找会检查mask,这在之前提及过,它代表这个channelHandler支持哪些操作(即可被跳过,加上了@Skip注解

4.2. OutboundChannelHandler的事件传播

和4.1.节类似,只不过,传播的时候,是沿链表相反方向进行的,证据就是下面的代码:

1
2
3
4
5
6
7
8
private AbstractChannelHandlerContext findContextOutbound(int mask) {
    AbstractChannelHandlerContext ctx = this;
    do {
        // 向前找
        ctx = ctx.prev;
    } while ((ctx.executionMask & mask) == 0);
    return ctx;
}

4.3. 总结

根据事件传播,以及Netty源码的注释,可以得到:

对于Inbound而言:

  • 都是通知事件(如Channel状态改变,I/O就绪,如socket.read()完成)
  • 由第一点可知,发起者必然是Unsafe,通知ChannelHandler,所以处理者是ChannelHandler
  • 传播由前向后(head -> tail
  • 需要调用context.fireIN_EVT()才能传播事件,否则传播中止

对于Outbound而言:

  • 都是请求事件(如Channel或者ChannelHandlerContext发出的请求)
  • 由第一点可知,发起者是Channel/ChannelHandlerContext,最终处理一般都会进入Unsafe,传播到最后一般也是Unsafe处理(见HeadContext,如写数据)
  • 传播由后向前(tail -> head
  • 需要调用context.OUT_EVT()才能传播事件,否则传播中止

假如需要将Inbound的内容传入Outbound

  • Inbound尾部,通过context.OUT_EVT()传播到Outbound

    例如将读取的数据写出,则:

    • Inbound尾部的channelRead(ctx, msg)中执行ctx.write(msg)
    • Outbound头部实现write(ctx, msg, promise),并执行cxt.write(ctx, msg)传播事件