源码阅读-Netty Future and Promise

Posted by keys961 on July 8, 2019

0. 回顾

之前讲了一个Netty服务端和客户端启动所需要的一些组件,以及相互交互时通信的通道Channel

Channel的创建,不仅仅需要创建socket句柄,将Java NIO的Channel包装,而且还需要:

  • 将其和EventGroup集成
  • 将其和ChannelPipeline集成
  • 错误处理

不过在此之前,首先查看下面2行代码:

1
2
ChannelFuture f = b.connect(HOST, PORT)/bind(PORT).sync();
f.channel().closeFuture().sync();

这里涉及到ChannelFuture的东西,涉及到Netty异步框架。

本文介绍Netty的FuturePromise,将上面2行代码打通。

1. Netty Future

Netty的Future继承了JDK的Future,JDK的Future有下面的功能:

  • 取消任务
  • 判断任务是否取消/完成
  • 阻塞(可带超时)获得结果

Netty的Future还添加了下述的功能:

  • 判断任务是否成功
  • 判断任务是否可取消
  • 获取异常信息
  • 配置监听回调
  • 阻塞等待任务结束:
    • sync:会抛失败的异常
    • await:不会抛失败的异常,且有超时功能
  • 不阻塞获取结果:getNow

1.1. ChannelFuture

Netty IO基本都是用这个接口,它继承Netty的Future,添加了下面的功能:

  • 获取关联的Channel
  • 判断是否为ChannelFuture<Void>,如果是,则不能阻塞获取值,或者添加监听回调

2. Netty Promise

Promise依旧继承于Netty的Future

它内部包含一个任务(常常是异步的),意思是“承诺内部的任务会被执行”。除了有Future的功能外,还提供:

  • 设置成功的结果
  • 设置失败的异常
  • 设置任务不可取消

前2步一旦设置,阻塞获取结果的方法就会立刻返回。

通常,异步任务执行处理结果有下面的方式:

  • 阻塞等待结果(await/sync),然后处理
  • 通过监听器回调

2.1. ChannelPromise

ChannelPromise同时继承了Netty的FuturePromise

它有2个接口的所有功能,但是没添加新的接口。

2.2. DefaultPromise

这个类是Promise类的实现,为了简便,它有下面几个字段:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
    // 执行结果
	private volatile Object result;
    // 任务运行的线程池
    private final EventExecutor executor;
    // 监听回调
    private Object listeners;
    // 等待的线程数
    private short waiters;
    // 是否正在唤醒等待线程,用于防止重复执行唤醒,不然会重复执行listeners的回调方法
    private boolean notifyingListeners;
    // ...
}

该类有设置成功和失败的方法:

  • set/trySuccess

  • set/tryFailure

    它们都会唤醒等待的线程(使用Object#wait & notifyAll方法),并执行回调,set方法会进行设置,不成功则抛异常(即任务已经执行完了/出错停下来了),而try不会。

同样syncawait的比较上文中也有说到:

  • sync:会抛失败的异常

  • await:不会抛失败的异常,且有超时功能

    这里是由于sync会调用rethrowIfFailed(),任务失败会重新抛出异常

2.3. DefaultChannelPromise

它继承了DefaultPromise,并没对其进行扩展;

它同时实现了 ChannelPromise, FlushCheckpoint接口:

  • 第一个接口:维护了一个channel实例,以实现该接口
  • 第二个接口:维护了一个checkpoint计数器,以实现该接口

3. 回顾启动的2行代码

1
2
ChannelFuture f = b.connect(HOST, PORT)/bind(PORT).sync(); // 1
f.channel().closeFuture().sync(); // 2

第一行:

  • 服务端/客户端若连接/绑定成功,则main线程从该方法返回
  • 若失败,则sync()会抛出异常,进入finally块,从而关闭连接

第二行:closeFuture()也会返回ChannelFuture,并调用sync(),当连接被关闭时,会返回:

  • 这里调用的是CloseFuture#setClose(),本质上调用的是DefaultChannelPromisetrySuccess(),所以会返回

4. 之后

现在提及了ChannelFuture & Promise,之前也提到创建Channel需要:

  • EventGroup
  • ChannelPipeline

后面会讲ChannelPipeline流水。