0. 回顾
之前讲了一个Netty服务端和客户端启动所需要的一些组件,以及相互交互时通信的通道Channel。
而Channel的创建,不仅仅需要创建socket句柄,将Java NIO的Channel包装,而且还需要:
- 将其和
EventGroup集成 - 将其和
ChannelPipeline集成 - 错误处理
- …
不过在此之前,首先查看下面2行代码:
1
2
ChannelFuture f = b.connect(HOST, PORT)/bind(PORT).sync();
f.channel().closeFuture().sync();
这里涉及到ChannelFuture的东西,涉及到Netty异步框架。
本文介绍Netty的Future和Promise,将上面2行代码打通。
1. Netty Future
Netty的Future继承了JDK的Future,JDK的Future有下面的功能:
- 取消任务
- 判断任务是否取消/完成
- 阻塞(可带超时)获得结果
Netty的Future还添加了下述的功能:
- 判断任务是否成功
- 判断任务是否可取消
- 获取异常信息
- 配置监听回调
- 阻塞等待任务结束:
sync:会抛失败的异常await:不会抛失败的异常,且有超时功能
- 不阻塞获取结果:
getNow
1.1. ChannelFuture
Netty IO基本都是用这个接口,它继承Netty的Future,添加了下面的功能:
- 获取关联的
Channel - 判断是否为
ChannelFuture<Void>,如果是,则不能阻塞获取值,或者添加监听回调
2. Netty Promise
Promise依旧继承于Netty的Future。
它内部包含一个任务(常常是异步的),意思是“承诺内部的任务会被执行”。除了有Future的功能外,还提供:
- 设置成功的结果
- 设置失败的异常
- 设置任务不可取消
前2步一旦设置,阻塞获取结果的方法就会立刻返回。
通常,异步任务执行处理结果有下面的方式:
- 阻塞等待结果(
await/sync),然后处理 - 通过监听器回调
2.1. ChannelPromise
ChannelPromise同时继承了Netty的Future和Promise。
它有2个接口的所有功能,但是没添加新的接口。
2.2. DefaultPromise
这个类是Promise类的实现,为了简便,它有下面几个字段:
1
2
3
4
5
6
7
8
9
10
11
12
13
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
// 执行结果
private volatile Object result;
// 任务运行的线程池
private final EventExecutor executor;
// 监听回调
private Object listeners;
// 等待的线程数
private short waiters;
// 是否正在唤醒等待线程,用于防止重复执行唤醒,不然会重复执行listeners的回调方法
private boolean notifyingListeners;
// ...
}
该类有设置成功和失败的方法:
-
set/trySuccess -
set/tryFailure它们都会唤醒等待的线程(使用
Object#wait & notifyAll方法),并执行回调,set方法会进行设置,不成功则抛异常(即任务已经执行完了/出错停下来了),而try不会。
同样sync和await的比较上文中也有说到:
-
sync:会抛失败的异常 -
await:不会抛失败的异常,且有超时功能这里是由于
sync会调用rethrowIfFailed(),任务失败会重新抛出异常
2.3. DefaultChannelPromise
它继承了DefaultPromise,并没对其进行扩展;
它同时实现了 ChannelPromise, FlushCheckpoint接口:
- 第一个接口:维护了一个
channel实例,以实现该接口 - 第二个接口:维护了一个
checkpoint计数器,以实现该接口
3. 回顾启动的2行代码
1
2
ChannelFuture f = b.connect(HOST, PORT)/bind(PORT).sync(); // 1
f.channel().closeFuture().sync(); // 2
第一行:
- 服务端/客户端若连接/绑定成功,则
main线程从该方法返回 - 若失败,则
sync()会抛出异常,进入finally块,从而关闭连接
第二行:closeFuture()也会返回ChannelFuture,并调用sync(),当连接被关闭时,会返回:
- 这里调用的是
CloseFuture#setClose(),本质上调用的是DefaultChannelPromise的trySuccess(),所以会返回
4. 之后
现在提及了Channel和Future & Promise,之前也提到创建Channel需要:
EventGroupChannelPipeline
后面会讲ChannelPipeline流水。