源码阅读-Netty Connect & Bind 流程

Posted by keys961 on August 16, 2019

0. Overview

之前说明了Netty的几大组件,包括:

  • Channel
  • ChannelPipeline
  • ChannelHandler
  • EventLoop/EventLoopGroup

  • Future/Promise

本文就整理一下服务端bind/客户端connect的流程。

1. 客户端connect

这里回到Bootstrap#doResolveAndConnect(remoteAddr, localAddr)方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
    // 之前完成了的
    // 0. 配置中初始化了EventLoopGroup线程池
    // 1. 初始化套接字和Channel、Pipeline
    // 2. 将Handler添加到Channel的pipeline上
    // 3. 选择一个EventLoop,注册Channel到Selector上
    // 可知,一个连接绑定一个Channel,
    // 而Channel被绑定到某个NioEventLoop上, 因为不同NioEventLoop的selector是互不相干的
    // (如Linux上,SelectorProvider#openSelector()是创建一个新EpollSelectorImpl实例)
    // 即在客户端下:一个连接的数据被一个线程处理
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();

    if (regFuture.isDone()) {
        if (!regFuture.isSuccess()) {
            return regFuture;
        }
        // 看这里
        return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
    } else {
        // ....
    }
}

上面解释中可知道:在客户端下,一个连接的事件只被一个固定的线程处理

这里主要看doResolveAndConnect0,这里省略篇幅,下面是调用链:

  • Bootstrap#doResolveAndConnect0
  • Bootstrap#doConnect
  • AbstractChannel#connect
  • DefaultChannelPipeline#connect
  • TailContext#connect:这是最后的一步,即到调用链的尾部进行connect

前面可知,connect事件是Outbound事件,因此tail是不作处理,只向前找OutboundHandler并传播该事件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Override
public ChannelFuture connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
        // Validation ...
		
    // 1. 找下一个OutboundHandler
    final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
    EventExecutor executor = next.executor();
    // 2. 调用invokeConnect以传播事件
    if (executor.inEventLoop()) {
        next.invokeConnect(remoteAddress, localAddress, promise);
    } else {
        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                next.invokeConnect(remoteAddress, localAddress, promise);
            }
        }, promise, null);
    }
    return promise;
}

最后会传入head(HeadContext)上,它调用了unsafe.connect(remoteAddress, localAddress, promise),看看内部是怎么做的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public final void connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    // validation ...

    boolean wasActive = isActive();
    // doConnect就是进行连接操作
    if (doConnect(remoteAddress, localAddress)) {
        // 填充promise结果
        fulfillConnectPromise(promise, wasActive);
    } else {
        connectPromise = promise;
        requestedRemoteAddress = remoteAddress;
        int connectTimeoutMillis = config().getConnectTimeoutMillis();
        if (connectTimeoutMillis > 0) {
            //利用当前绑定/注册的NioEventLoop的schedule功能处理timeout问题
            connectTimeoutFuture = eventLoop().schedule(() -> {
                // Handle timeout using schedule 
            }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
        }
		// set cancel listener ...        
    } catch (Throwable t) {
        // handle exception and close it
    }
}

实际调用的是doConnect(remoteAddr, localAddr)

  • 若连接不成功,会设置selector关注OP_CONNECT事件,以便之后关注OP_READ | OP_WRITE | OP_ACCEPT

    剩下的,就由NioEventLoop#run来处理了(方法中的processSelectedKeys()方法)

    • 若关注到了OP_cONNECT,则后面会关注OP_READ | OP_WRITE | OP_ACCEPT
    • 利用unsafe.finishConnect(),调用pipeline.fireChannelActive()以传播ChannelActive事件
  • 若连接成功,进入filfillConnectPromise(promise, wasActive),它会传播channelActive事件,并设置selector关注OP_READ

    可以看下文中服务端对于channelActive事件的处理,它最后标记selector关注OP_ACCEPT

    而关注的事件是在创建NioSocketChannel/NioServerSocketChannel时指定,它传入参数,用于初始化readInterestOps字段。

    channelActive事件传播后默认(可配置不触发,也可手动触发)会触发doBeginRead()方法,用readInterestOps字段设置selector关注的事件,以启动读取:

    • 对于服务端的NioServerSocketChannel,事件是OP_ACCEPT
    • 对于客户端NioSocketChannel,事件是OP_READ
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
    if (localAddress != null) {
        doBind0(localAddress);
    }
    boolean success = false;
    try {
        // 1. 这里进行底层连接
        boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
        if (!connected) {
            // 2. 不连接成功,则让selector关注OP_CONNECT,以便成功后关注OP_READ, OP_WRITE, OP_ACCEPT
            selectionKey().interestOps(SelectionKey.OP_CONNECT);
        }
        success = true;
        return connected;
    } finally {
        // Close if fail...
    }
}

2. 服务端bind

和客户端类似,调用链如下:

  • AbstractBootstrap#doBind
  • AbstractBootstrap#doBind0
  • AbstractChannel#bind
  • DefaultChannelPipeline#bind
  • TailContext#bind:这里进入调用链

connect一样,bind也是Outbound事件,所以最终传入HeadContext#bind

1
2
3
4
5
6
// In HeadContext
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
    // 执行bind
    unsafe.bind(localAddress, promise);
}

实质上也是用Unsafe进行bind,执行2步:

  • 底层socket进行地址bind & listen
  • 传播ChannelActive事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    assertEventLoop(); // 这里的EventLoop是parent/boss,它从parentGroup里选的
    // validation...
    // 并判断,当绑定的地址不是掩码地址,但Channel配置了广播,且在*nix系统上,警告非root下收不到广播包...
    boolean wasActive = isActive();
    try {
        // 1. 真正bind的操作
        doBind(localAddress);
    } catch (Throwable t) {
        // deal with error and close
        return;
    }

    if (!wasActive && isActive()) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                // 2. 传播ChannelActive的Inbound事件
                pipeline.fireChannelActive();
            }
        });
    }
    safeSetSuccess(promise);
}

2.1. bind

首先是bind,进入NioServerSocketChannel#doBind,这里就是底层Channelbind(实际就是下面socketbind & listen):

1
2
3
4
5
6
7
8
9
10
protected void doBind(SocketAddress localAddress) throws Exception {
    // 底层操作就是:
    // Net.bind(fd, isa.getAddress(), isa.getPort());
    // Net.listen(fd, backlog < 1 ? 50 : backlog)
    if (PlatformDependent.javaVersion() >= 7) {
        javaChannel().bind(localAddress, config.getBacklog());
    } else {
        javaChannel().socket().bind(localAddress, config.getBacklog());
    }
}

2.2. 传播channelActive事件

然后就是传播channelActive事件,从head开始传播:

1
2
3
4
5
6
7
@Override
public void channelActive(ChannelHandlerContext ctx) {
    // 传播事件
    ctx.fireChannelActive();
    // 开启读取
    readIfIsAutoRead();
}

这里传播事件没什么好说的,最后也不会处理什么。但关键的是下面的readIfIsAutoRead()

1
2
3
4
5
6
private void readIfIsAutoRead() {
    // 只在AUTO_READ打开的时候,才会读
    if (channel.config().isAutoRead()) {
        channel.read();
    }
}

这里涉及一个AUTO_READ,默认情况下是打开的。当然你可以关闭它。但是关闭后需要用下面的方法打开,才能读取数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
channel.config().setAutoRead(true);

// 原因,在DefaultChannelConfig中,打开该设置后,channel会强制读取(关闭该设置同理)
public ChannelConfig setAutoRead(boolean autoRead) {
    boolean oldAutoRead = AUTOREAD_UPDATER.getAndSet(this, autoRead ? 1 : 0) == 1;
    if (autoRead && !oldAutoRead) {
        // 打开后强制读取
        channel.read();
    } else if (!autoRead && oldAutoRead) {
        // 关闭后清除读取状态,NioServerSocketChannel会清除读取的标志(OP_READ, OP_ACCEPT)
        autoReadCleared();
    }
    return this;
}

回到readIfIsAutoRead(),下一步是channel.read(),它也是Outbound事件,从tail开始向head传播,最后到head,然后调用链如下:

  • HeadContext#read

  • AbstractUnsafe#beginRead

  • AbstractNioMessageChannel#doBeginRead

    注意这里是AbstractNioMessageChannel,对应的是NioMessageUnsafe

    而客户端对应的是AbstractNioByteChannel,对应的是NioByteUnsafe

  • AbstractNioChannel#doBeginRead

看最后一个方法,在这里,SelectorinterestOps被设置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// In AbstractNioChannel#doBeginRead
protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
        return;
    }
    readPending = true;
    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        // 这里设置关注的事件,使得Channel可以关注读取事件(如OP_READ, OP_ACCEPT)
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}

这个事件是什么?答案是OP_ACCEPT,这在NioServerSocketChannel初始化的时候指定的:

1
2
3
4
5
public NioServerSocketChannel(ServerSocketChannel channel) {
    // 这里设置readInterestOp = OP_ACCEPT
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

关注了OP_ACCEPT后,之后的流程就会交给NioEventLoop#run处理:

  • 读取数据
  • 传播channelRead事件

上面服务端bind时,挑选的eventLoop是从bossGroup取,因此bind是单线程的;而accept时,由于eventLoop并没有变,因此accept也是单线程的。(即一个channel下,这些操作最多只用了1个线程)

通常情况下,一条pipeline上的handler处理都是由相同的eventLoop处理的,而channelpipeline绑定,即:一个channel对应一条pipeline,对应一个eventLoop/线程,一条channel的所有任务都由相同线程处理。

所以耗时的任务千万不要添加到pipeline,除非使用异步处理。

那么workerGroup是怎么回事,它和bossGroup有什么关系,看下面第3节。

3. 服务端bossGroupworkerGroup

我们再回到EnchoServer的初始化,我们关注下面注释标记的内容:

1
2
3
4
5
6
7
8
9
b.group(bossGroup, workerGroup) // bossGroup & workerGroup
 .channel(NioServerSocketChannel.class)
 .option(ChannelOption.SO_BACKLOG, 100)
 .handler(new LoggingHandler(LogLevel.INFO)) // handler & childHandler
 .childHandler(ch -> {
     ChannelPipeline p = ch.pipeline();
     // ...
     p.addLast(new EchoServerHandler());
});                 

3.1. NioServerSocketChannel绑定的EventLoop

ServerBootstrap继承了AbstractBootstrap,因而在ServerBootstrap中,可知:调用bootstrap.config().group()返回的是bossGroup

1
2
3
4
5
6
7
// ServerBootstrap初始化
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
    super.group(parentGroup);
    //...
    this.childGroup = childGroup;
    return this;
}

而服务端在初始化和注册ChannelEventLoop时调用的也是这行:

1
2
// In AbstractBootstrap#initAndRegister
config().group().register(channel)

所以NioServerSocketChannel是被绑到bossGroup上。

3.2. handlerchildHandler

3.2.1. 构造handler

在将handler绑到pipeline上,执行的是下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// In ServerBootstrap#init
p.addLast(new ChannelInitializer<Channel>() {
    @Override
    public void initChannel(final Channel ch) throws Exception {
        final ChannelPipeline pipeline = ch.pipeline();
        ChannelHandler handler = config.handler();
        // 添加handler
        if (handler != null) {
            pipeline.addLast(handler);
        }
        ch.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                // 添加了ServerBootstrapAcceptor
                pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
            }
        });
    }
});

可知,初始化(register)后,channelNioServerSocketChannel)绑定的pipeline,它上面的链是:head -> handler -> serverBootstrapAcceptor -> tail。很明显,这里多了个ServerBootstrapAcceptor

EchoServer中,NioServerSocketChannel对应的这条链是:

head -> loggingHanler -> serverBootstrapAcceptor -> tail

3.2.2. ServerBootstrapAcceptor

ServerBootstrapAcceptor是一个InboundChannelHandler,它实际上主要重写了channelRead(ctx, msg)方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    // 1. 获取accept后创建的与客户端通信的SocketChannel
    // 之后来说明这个channel怎么来的
    final Channel child = (Channel) msg;
    // 2. 给这个channel的pipeline添加childHandler,注意这个pipeline是新创建的
    child.pipeline().addLast(childHandler);
    // 3. 设置新创建channel的options
    setChannelOptions(child, childOptions, logger);
	// 4. 设置新创建channel的attribute
    for (Entry<AttributeKey<?>, Object> e: childAttrs) {
        child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
    }

    try {
        // 5. 将channel注册到childGroup的eventLoop中
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}

可见,ServerBootstrapAcceptor在处理channelRead事件时:

  • 获取新创建的channel(SocketChannel),和里面新创建的pipeline(DefaultChannelPipeline)
  • pipeline添加childHandler
  • 设置一些配置
  • 将得到的channel注册到childGroup上(注册到childGroup其中的一个eventLoop上)

注册的流程和之前提及的一样,会把ChannelInitializer内部的ChannelHandler展开,最后会产生一条新链:head -> childHandlers -> ... -> tail

EchoServer中,新产生的SocketChannel对应的这条链是:

head -> sslHandler -> echoServerHandler -> tail

那么第1步的Channel(即child变量)怎么来的?

之前得知,当触发OP_ACCEPT时,调用unsafe.read(),因此答案在NioMessageUnsafe#read(注意不在NioByteUnsafe,因为它是给客户端用的):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// In NioMessageUnsafe#read
public void read() {
    // ....
    try {
        try {
            do {
                // ...
                // 1. 读取内容,这里doReadMessage是真正的读取
                int localRead = doReadMessages(readBuf);
                //...
            } while (allocHandle.continueReading());
        } catch (Throwable t) {
            // ...
        }

        int size = readBuf.size();
        for (int i = 0; i < size; i ++) {
            readPending = false;
            // 2. 传播channelRead事件,和客户端NioByteUnsafe一样
            pipeline.fireChannelRead(readBuf.get(i));
        }
        readBuf.clear();
        allocHandle.readComplete();
        // 3. 读取完毕,传播channelReadComplete事件,和客户端NioByteUnsafe一样
        pipeline.fireChannelReadComplete();
        // ...
    } finally {
        // ...
    }
}

而这里doReadMessage(readBuf)方法被NioServerSocketChannel实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// In NioServerSocketChannel#doReadMessage
protected int doReadMessages(List<Object> buf) throws Exception {
    // 1. accept连接,创建了新的SocketChannel供服务端和客户端通信
    SocketChannel ch = SocketUtils.accept(javaChannel());
    try {
        if (ch != null) {
            // 2. 把上面的channel添加到结果里
            // 这里NioSocketChannel的pipeline是新创建的,readInterestOps = OP_READ
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
    } catch (Throwable t) {
        // ...
    }
    return 0;
}

现在,很明显,doReadMessage(readBuf)返回的就是服务端accept连接而新创建的SocketChannel了。

而上文可知NioSocketChannel初始化关注的事件(readInterestOps)是OP_READ

注册的时候,当channelactive时还得手动触发channelActive事件/强制开始读取(channelActive事件的处理见前面的章节),默认下最终都会设置selector关注OP_READ事件。

由于注册在childGroup上,之后可交给childGroupNioEventLoop轮询并处理I/O事件了

总结,服务端接收客户端连接后的变化,可由下图表示

HB8ZXceTjU.jpg

注意childGroup中,eventLoop,channel,pipelineselector的关系:

  • 1个NioSocketChannel各自配有1个Pipeline
  • 1个NioEventLoop各自配有1个Selector
  • 1个NioEventLoop/Selector可以和多个NioSocketChannel/Pipeline绑定,进行select轮询
  • 1个NioSocketChannel/Pipeline只能和1个NioEventLoop/Selector绑定

3.2.3. handlerchildHandler的区别

总结上面的分析,handlerchildHandler的区别是:

  • handler:在客户端和服务端的新连接建立时,handler处理这些新连接
  • childHandler:连接建立后,客户端发送请求,childHandler用于处理这些请求

3.3. 当连接数量很多时

上文可知,当连接建立后,会新创建一个NioSocketChannel(数量不会超过BACKLOG值),然后它被注册到childGroupNioEventLoop和对应的selector上。

因此,一个NioEventLoop/线程会和多个channel绑定,它里面的selector也会和很多个channel绑定当连接数很多时,一个childGroup里的线程需要处理多个连接的请求。

即这里,channeleventLoop是多对一的关系

这样,当某些请求阻塞时,是非常容易造成其它连接上的请求(I/O任务)和队列中的非I/O任务超时,造成“饥饿”现象。

所以,在Netty上处理任务时,添加的任务一定要满足下面的原则:

  1. 添加的任务不能阻塞

  2. 若任务是阻塞,想方设法将其异步化

    根据上面的源码分析:

    • 读取请求时,所在线程必须是childGroup中的
    • 返回请求时,没有线程的要求(因为传播InboundOutbound事件本身没有线程的要求)

    所以可以把阻塞任务扔到线程池,并在线程池内将把结果输出