AQS的共享模式应用于许多类,最主要的是这3个:CountDownLatch
, CyclicBarrer
与Semaphore
。
1. CountDownLatch
1.1. 使用示例
非常简单,假定一个线程等待N
个任务的完成:
- 初始化一个计数为
N
的CountDownLatch
N
个任务线程调用latch.countDown()
- 外部线程调用
latch.await()
等待所有任务完成
1.2. 数据结构
CountDownLatch
内部实际上就实现了自己的Sync
类(继承AQS),用于支持共享模式,实质上是一个共享锁。初始化需要初始化一个引用计数count
,实际上就是AQS的state
。
可通过
setState
/getState
设置或读取引用计数
CountDownLatch
主要的方法就2个:
countDown
:引用计数-1await
:阻塞至引用计数为0(且它响应中断)
1.3. await
实际上调用了sync.acquireSharedInterruptibly(1)
,而里面的代表如下:
1
2
3
4
5
6
7
8
9
10
11
12
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 0. 检查中断
if(Thread.interrupted()) {
throw new InterruptedException();
}
// 1. 获取共享状态(先尝试,后可能阻塞并再次获取)
// 小于0则表示尝试获取失败,可能需要阻塞
if(tryAcquireShared(arg) < 0) {
doAcquireSharedInterrptibly(arg);
}
}
里面关键2个方法:
tryAcquireShared
doAcquireSharedInterrptibly
1.3.1. tryAcquireShared
这里的实现就是:判断引用计数是否为0
- 是,则返回1,表明获取锁成功
- 否,则返回-1,表明获取锁失败,说明可能要进入阻塞状态并再次获取
1.3.2. doAcquireSharedInterruptibly
在CountDownLatch
下,当引用计数不为0,即大于0,说明await
线程是不能获取到锁的。
因此需要进入该方法,进入同步队列,阻塞并等待被激活,进而获取同步状态。
代码逻辑如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 1. 入队
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
// 2. 循环尝试获取锁
for(;;) {
final Node p = node.predecessor();
// 2.1 前驱为head时,尝试获取
if(p == head) {
int r = tryAcquireShared(arg);
if(r >= 0) {
// 2.2. 获取成功,则重新设置头,并向后传播
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}
// 2.3. 前驱不为head,或获取锁失败,
// 则检查线程是否需要阻塞,阻塞并检查中断
// 前面的方法之前提及过:
// a) 若前驱状态不是SIGNAL,并置状态为SIGNAL,返回false
// b) 跳过CANCELLED前驱节点,返回false
// c) a,b下,下一次循环时,检查后返回true,从而能够阻塞线程
if(shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrpt()) {
throw new InterruptedException();
}
}
} catch(Throwable t) {
// 3. 异常则取消锁的获取,将本节点出队
// NOTE: 之前提及,
// 出队还需要shouldParkAfterFailedAcquire以及cancelAcquire本身的帮助
cancelAcquire(node);
throw t;
}
}
很多方法都已经在之前的文章中说明了,如下面这些,详见这里:
addWaiter
shouldParkAfterFailedAcquire
cancelAcquire
这里,当线程调用await
时,会查看引用计数(state
是否为0):
- 如果是0,
tryAcquireShared
就会成功,从而获得到锁 - 如果大于0,
tryAcquireShared
就会失败,进入同步队列,进行阻塞
所以这里只有1个方法需要说明,那就是线程获取锁成功时设置头节点的问题,即setHeadAndPropagate
。
1.3.3. setHeadAndPropagate
可以先看1.4.节再看本节
被唤醒后,进入该方法,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
// 1. 重新设置头节点
// 依旧只有1个线程被激活(多个需要激活的线程是传播激活的),因此线程安全
setHead(node);
// 2. 传播唤醒,这里propagate > 0
if(propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if(s == null || s.isShared()) {
// 本线程已被唤醒,进而传播,唤醒后继线程
// 详见1.4.2.
doReleaseShared();
}
}
}
1.4. countDown
本方法关键调用是sync.releaseShared(1)
,代码如下:
1
2
3
4
5
6
7
8
9
public final boolean releaseShared(int arg) {
// 1. 尝试更改引用计数,以释放锁状态
if (tryReleaseShared(arg)) {
// 2. 若成功,则真正释放锁,唤醒后继阻塞的线程
doReleaseShared();
return true;
}
return false;
}
1.4.1. tryReleaseShared
在CountDownLatch
中调用countDown
,会调用这个方法。
它仅仅对引用计数-1:
- 若引用计数变为0,则返回
true
,可以释放锁并唤醒后面的线程 - 若引用计数大于0,则返回
false
,锁状态依旧保持
1.4.2. doReleaseShared
tryReleaseShared
成功后,会调用该方法。
在CountDownLatch
中,引用计数为0,需要唤醒等待的await
线程。
代码逻辑如下,if
条件也有说明:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 假设h -> t3 -> t4
private void doReleaseShared() {
// 循环
for(;;) {
Node h = head;
if(h != null && h != tail) {
int ws = h.waitStatus;
// 1. 正常情况,头节点是SIGNAL(如头为h)
if(ws == Node.SIGNAL) {
// 1.1. CAS设置头节点的ws为0,正常情况是能成功的
// CAS失败可能的情况如:中继节点传播时(如t3传播t4)
// 后继线程可能会占用头,从而头改变了,ws可能变为0,那只能循环下一轮的检查
if(!h.compareAndSetWaitStatus(Node.SIGNAL, 0)) {
continue;
}
// 1.2. 唤醒后继节点,唤醒后的请看1.3.3.
// 可能在这个后,后面有多个线程被唤醒,头会被改变
// 如目前线程是h,那么它的头可能是t3,t4等
unparkSuccessor(h);
}
// 2. ws为0,即,CAS失败,即此时有新节点入队,造成ws变为-1
// 如队列从h -> t3 -> t4变为t4, t4的ws为0,此时新结点入队,ws变为-1
else if (ws == 0 && !h.compareAndSetWaitStatus(0, Node.PROPGATE)) {
continue;
}
}
// 3. 若head没变,说明头节点没被后面的线程占有,那么直接退出
// 若head变化,说明头节点被后面线程占有,而继续下一步循环,从而可以继续激活更后面的线程,以提高吞吐量
if(h == head) {
break;
}
}
}
2. CyclicBarrier
和CountDownLatch
类似,不过可以循环使用。
它基于Condition
实现。
它最重要的就是await
方法:
- 每调用一次,计数-1,并阻塞
- 当为0后,所有线程终止阻塞,并恢复计数
2.1. 数据结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class CyclicBarrier {
// 因为可重复使用,每轮使用当成一个周期
private static class Generation {
boolean broken = false;
}
// 栅栏本身
private final ReentrantLock lock = new ReentrantLock();
// 等待通过栅栏的条件
private final Condition trip = lock.newCondition();
// 参与总线程数
private final int parties;
// 越过栅栏前先执行的操作
private final Runnable barrierCommand;
// 处于哪个周期
private Generation generation = new Generation();
// 引用计数: 没到达的线程数
// 即: parties = count + 已到达的线程数
private int count;
}
2.2. 打破栅栏,开启下一轮周期
打破栅栏调用breakBarrier
,若不修复,则无法继续使用该栅栏(抛异常):
- 设置
generation.broken
为true
- 恢复引用计数
count
- 放行所有等待
trip
的线程
开启下一轮周期调用nextGeneration
,其它和breakBarrier
一样,除了:
- 生成新的
Generation
实例
2.3. await
实际上调用的是带超时版本的doAwait
,并能响应中断。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException, TimeoutException {
final ReentrantLock lock = this.lock;
// 1. 获取锁,因为要使用Condition
lock.lock();
try {
final Generation g = generation;
// 2. 若栅栏破了,抛异常
if(g.broken) {
throw new BrokenBarrierException();
}
// 3. 响应中断,需要打破栅栏,释放等待的线程,并抛异常
if(Thread.interrpted()) {
breakBarrier();
throw new InterruptedException();
}
// 4. 递减引用计数
int index = --count;
// 5. 若为0,则要释放所有等待线程
if(index == 0) {
boolean ranAction = false;
try {
// 5.1. 先执行命令
final Runnable command = barrierCommand;
if(command != null) {
command.run();
}
// 若命令出异常,这一步走不到
ranAction = true;
// 5.2. 唤醒线程,开启下一轮
nextGeneration();
return 0;
} finally {
if(!ranAction) {
// 5.3. 这里,指定命令出现异常,需要打破栅栏并释放线程,异常则通过命令抛出
breakBarrier();
}
}
}
// 6. 若引用计数大于0,则需要等待,进入循环
// 出循环: trip条件满足、中断、栅栏破坏、超时
for(;;) {
try {
// 6.1. 阻塞线程,这里阻塞会响应中断
if(!timed) {
// a) 无超时
trip.await();
} else if(nanos > 0L) {
// b) 超时
nanos = trip.awaitNanos(nanos);
}
} catch(InterruptedException ie) {
// 6.2. await时线程中断,处理错误
if(g == generation && !g.broken) {
// a) 处于同一周期,需要打破栅栏
breakBarrier();
throw ie;
} else {
// b) 新一代产生,即执行到这里,本轮的所有线程已经被放行了,则不需要抛出异常,只需记录中断信息
// 或栅栏被打破,不需要抛中断异常,而是抛栅栏破坏的异常
Thread.currentThread().interrupt();
}
}
// 7. 唤醒后还需检查栅栏是否完好
if(g.broken) {
throw new BrokenBarrierException();
}
// 8. 这种情况下,所有线程都已经要被放行了,最后一个线程会更新generation,并释放锁
// 而到这里,线程获得锁,可知generation必定不等于g,因而在这个条件下直接返回即可
// 不满足条件的情景:
// a) barrierCommand异常,栅栏被毁,线程被唤醒,到这一步,g == generation,只改变了g.broken
// b) 超时情景,即上述trip.waitNanos(nanos)不是被外部唤醒,而是自己唤醒。由于只有外部唤醒才能改变generation,因此自己唤醒的情况(即超时)下,g == generation。超时在第9步处理
if(g != generation) {
return index;
}
// 9. 超时,打破栅栏,抛出异常
if(timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
3. Semaphore
信号量大家都很熟悉,不再多描述,它内部也有自己实现的Sync
。
主要的PV操作有:
acquire
release
下面对关键方法分析,有些方法不会响应中断,有些方法会响应超时,但基本没多少代码的变化。
3.1. acquire
实际上调用sync.acquireSharedInterrptibly(permits)
。
1
2
3
4
5
6
7
8
9
10
11
12
public final void acquireSharedInterrptibly(int arg)
throws InterruptedException {
// 0. 检查中断
if(Thread.interrpted()) {
throw new InterruptedException();
}
// 1. 获取共享状态(先尝试,后可能阻塞并再次获取)
// 小于0则表示尝试获取失败,可能需要阻塞
if(tryAcquireShared(arg) < 0) {
doAcquireSharedInterrptibly(arg);
}
}
逻辑和CountDownLatch
一模一样,主要差别在于tryAcquireShared
(其它都是一样的,可参考CountDownLatch
)
3.1.1. tryAcquireShared
类似ReentrantLock
,有公平和非公平之分,代码差别比较小,主要是检查前驱节点。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
protected int tryAcquiredShared(int acquires) {
for(;;) {
// 0. 若公平,则需检查是否有前驱,若有,则返回-1,需要阻塞
if(hasQueuedPredecessors()) {
return -1;
}
// 1. 获取原有的引用计数
int available = getState();
// 2. 计算后面的引用计数
int remaining = available - acquires;
// 3. CAS设置引用计数
// 返回remaining,若remaining小于0,则需要阻塞(没获取到这个信号量)
// 注意这里remaining<0不会被CAS设置,后文会说
if(remaining < 0 ||
compareAndSetState(available, remaining)) {
return remaining;
}
}
}
注意这里remaining < 0
不会被CAS设置,后文会说。
3.2. release
实际上调用sync.releaseShared(arg)
。
1
2
3
4
5
6
7
8
9
10
public final boolean releaseShared(int arg) {
// 1. 尝试更改引用计数,以释放锁状态
if (tryReleaseShared(arg)) {
// 2. 一般而言上一步都会返回true
// 具体说明见下文所述
doReleaseShared();
return true;
}
return false;
}
实际上,变化的也还是tryReleaseShared(arg)
。
3.2.1. tryReleaseShared
代码也是很简单的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
protected final boolean tryReleaseShared(int releases) {
for (;;) {
// 1. 计算之后的引用计数
int current = getState();
int next = current + releases;
// 溢出
if (next < current) {
throw new Error("Maximum permit count exceeded");
}
// 2. 循环CAS设置引用计数
if (compareAndSetState(current, next)) {
return true;
}
}
}
可以发现这个方法会始终返回true
,从而激活后面头节点的线程,这里处理的方式比较巧妙。
假设初始为1,线程t
获取3,线程m
释放1。
对于线程t
:
- 获取3后,
remaining = -2
,从而阻塞,注意permit == 3
对于线程m
:
- 释放1后,
state
被CAS设置成2,并激活线程t
然后对于t
,如1.3.2.所示,被激活,并重新循环获取锁,发现remaining == -1
,再次失败,因而被阻塞了。
假如t
获取的是2,那么再次唤醒并获取锁时,remaining == 0
,获取成功,从而成为头节点,获取锁,解除阻塞状态。