1. Condition
使用场景
以生产者-消费者为例,必须注意:
Condition
从Lock
中创建- 使用
Condition
必须获取到Lock
(因为从Condition
中阻塞要释放Lock
) Condition
的await
和signal
必须成对
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
Lock lock = new ReentrantLock();
Condition notFull = lock.newCondition();
Condition notEmpty = lock.newCondition();
Buffer buffer = new Buffer(CAPACITY);
// Producer
void put(Object x) {
lock.lock();
try {
while(buffer.isFull()) {
notFull.await(); // 等待notFull
}
buffer.put(x); // 生产
notEmpty.signal(); // 发通知激活notEmpty
} finally {
lock.unlock();
}
}
// Consumer
Object take(Object x) {
lock.lock();
try {
while(buffer.isEmpty()) {
notEmpty.await(); // 等待notEmpty
}
Object x = buffer.take(); // 消费
notFull.signal(); // 发通知激活notFull
return x;
} finally {
lock.unlock();
}
}
2. Condition
数据结构
这里参考AQS定义的ConditionObject
类:
1
2
3
4
5
6
public class ConditionObject implements Condition, java.io.Serializable {
//...
// 队列头和队列尾
private transient Node firstWaiter;
private transient Node lastWaiter;
}
而Node
中的字段nextWaiter
,在Condition
使用时,线程一个单向链表:
1
2
3
4
final static class Node {
//...
Node nextWaiter; // Condition下形成的单向链表
}
因此,同样引入了“队列”,为了区分AQS中的同步队列,这里定义为:条件队列
这里引用这里的图,说明队列示意图:
3. Condition
工作流程
Condition
工作大致流程为:
condition.await()
后,将自己的Node
加入条件队列,释放锁,然后阻塞condition.signal()
后,将条件队列头Node
添加到同步队列尾,等待获取锁(只有获取到锁后,才能从await
中返回)
3.1. await
代码流程如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public final void await() throws InterruptedException {
// 0. 因为要响应中断,因此要检查中断状态
if(Thread.interrupted()) {
throw new InterruptedException();
}
// 1. 将node加入条件队列(*)
Node node = addConditionWaiter();
// 2. 完全释放锁,获取之前的引用计数(*)
int savedState = fullyRelease(node);
int interrupteMode = 0;
// 3. 能进入循环,进入时肯定获取锁,不在同步队列(*)
// 而再次进入循环判断后,肯定是被signal激活了(会被加入同步队列)
// 若进入了同步队列,跳出循环;之后会说,它不会不在同步队列
while(!isOnSyncQueue(node)) {
// 3.1. 阻塞线程
LockSupport.park(this);
// 3.2. 出来时检查中断状态,若中断(即不为0),要跳出循环
interrputMode = checkInterruptWhileWaiting(node);
if(interruptMode != 0) {
break;
}
}
// 4. 激活进入阻塞队列后,尝试获取锁,再次阻塞(*)
if(acquireQueued(node, saveState) && interruptMode != THROW_IE) {
// 4.1. 当线程发生中断时,要重新设置中断mode
interruptMode = REINTERRUPT;
}
// 5. 条件队列中该节点有后继,将CANCELLED的节点从队列中移除(*)
if(node.nextWaiter != null) {
unlinkCancelledWaiters();
}
// 6. 处理中断(*)
// THROW_IE则抛出异常,REINTERRUPT则设置线程中断位
if(interruptMode != 0) {
reportInterruptAfterWait(interruptMode);
}
}
下面对上面6步进行说明。
3.1.1. 将Node
加入条件队列
在addConditionWaiter
中实现。
之前提及,当前线程是获取到锁的,因此同步队列的头Node
是一个dummy node,因此需要创建一个新Node
,并设置waitStatus
为CONDITION
。
此外,由于只有1个线程获取到锁,因此该方法是线程安全的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private Node addConditionWaiter() {
if(!isHeldExclusively()) {
throw new IllegalMonitorStateException();
}
Node t = lastWaiter;
// 1. 当队列尾部被CANCELLED,扫描整个条件队列,清除取消的节点
if(t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 2. 创建当前线程的Node
Node node = new Node(Node.CONDITION);
// 3. 插入队尾
if(t == null) {
firstWaiter = node;
} else {
t.nextWaiter = node;
}
lastWaiter = node;
return node;
}
而上面有一个清除CANCELLED
节点的方法:unlinkCancelledWaiters
。它是纯链表的操作,会扫描整个条件队列,若节点状态不是Node.CONDITION
(即是被取消的节点),则需要将改节点移出队列。
这里的取消指:不需要获取
Condition
的同步状态
3.1.2. 完全释放锁
在fullyRelease
中实现。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
final int fullyRelease(Node node) {
try {
// 1. 获取当前引用计数
int savedState = getState();
// 2. 释放当前线程持有的锁
if(release(savedState)) {
return savedState;
}
throw new IllegalMonitorStateException();
} catch(Throwable t) {
// 异常,则取消锁的获取,标记并抛出异常
node.waitStatus = Node.CANCELLED;
throw t;
}
}
假如线程没持有锁,却调用了Condition#await
,那么进入fullyRelease
后的第2步会失败,因而直接抛出异常。
所以使用condition
时,一定要先获取锁!
3.1.3. 阻塞,激活后在同步队列获取锁
即下面这个循环:
1
2
3
4
5
6
7
8
9
10
// 3.0. 若不再同步队列中
while(!isOnSyncQueue(node)) {
// 3.1. 阻塞线程
LockSupport.park(this);
// 3.2. 出来时检查中断状态,若中断(即不为0),要跳出循环
interrputMode = checkInterruptWhileWaiting(node);
if(interruptMode != 0) {
break;
}
}
首先是isOnSyncQueue
方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 初始时,node.waitStatus == Node.CONDITION
final boolean isOnSyncQueue(Node node) {
// 移到同步队列后,node.waitStatus会为0
// 此外node.prev为空时,肯定不在队列中(只有header才行)
// 1. 遇到上述2情形时,返回false
if(node.waitStatus == Node.CONDITION || node.prev == null) {
return false;
}
// 2. 若node有后继节点,肯定在同步队列中,返回true
if(node.next != null) {
return true;
}
// 3. 从尾巴扫描,查看是否在同步队列中
return findNodeFromTail(node);
}
线程阻塞,被Condition#signal
激活后,需要检查中断状态,这是checkInterruptWhileWaiting
。
而其中有一个非常重要的方法transferAfterCancelledWait
,判断中断是在signal
前还是之后,并保证node
进入同步队列:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 1. 若signal之前中断,返回THROW_IE
// 2. 若signal之后中断,返回REINTERRUPT
// 3. 无中断,返回0
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
}
// 此时,线程已被中断,判断中断是在signal前还是后
// 此外,该方法可以保证node一定进入了同步队列
final boolean transferAfterCancelledWait(Node node) {
// 1. CAS设置waitStatus状态为0,即恢复
// 若成功,中断肯定在signal之前发生,因为signal会将waitStatus置为0
if(node.compareAndSetWaitStatus(Node.CONDITION, 0)) {
// 1.1. 将节点放入同步队列,即中断后,依旧回到同步队列
enq(node);
return true;
}
// 2. CAS失败,中断在signal之后完成
// 自旋等待signal将其推入同步队列
while(!isOnSyncQueue(node)) {
Thread.yield();
}
return false;
}
从上可知,线程发生了中断,节点依旧在同步队列中。
3.1.4. 再次尝试获取锁
代码片段为:
1
2
3
4
5
// 4. 激活进入阻塞队列后,尝试获取锁,再次阻塞(*)
if(acquireQueued(node, saveState) && interruptMode != THROW_IE) {
// 4.1. 当线程发生中断时,要重新设置中断mode
interruptMode = REINTERRUPT;
}
3.1.3.节的中断检查,以及Condition#signal
,会将节点推入同步队列,因此直接调用acquireQueue(node, saveState)
来获取锁即可。
而acquireQueue
返回true
,表示线程是否被中断,所以进入这个if
块需要重置interruptMode
,用于之后的处理。
3.1.5. 清理CANCELLED
节点
代码片段为:
1
2
3
if(node.nextWaiter != null) {
unlinkCancelledWaiters();
}
为什么判断条件是node.nextWaiter != null
?
- 正常情况,节点会转移到同步队列,同时脱离条件队列,即
node.nextWaiter == null
- 中断在
signal
之后,同样signal
将其转移到同步队列,同时脱离条件队列,即node.nextWaiter == null
- 中断在
signal
之前时,enq
操作不会断链,而它的waitStatus
会置为0(transferAfterCancelledWait
的CAS),需要脱离条件队列,因此要被清理
3.1.6. 处理中断状态
最后在await
被唤醒后,处理中断状态,使用interruptMode
:
THROW_IE
:await
返回后,要抛出中断异常0
: 正常,无中断发生REINTERRUPT
:await
返回后,需要重新设置中断
处理在reportInterrptAfterWait
实现,逻辑很简单,这里略过。
JVM线程的中断只是一个标识位
参照
Condition
的做法,当一个阻塞方法需要处理中断时:
- 开头检查标志位
- 唤醒后检查标志位
若被标识,可选择抛出中断异常
3.2. signal
它用于:
- 唤醒等待
Condition
上的线程 - 将其节点移出条件队列
- 将其节点移入同步队列队尾
代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
public final void signal() {
// 同样要获得锁
if(!isHeldExclusively()) {
throw new IllegalMonitorStateException();
}
// 1. 获取队列头
Node first = firstWaiter;
// 2. 唤醒头节点
if(first != null) {
doSignal(first);
}
}
这里关键在于doSignal
3.2.1. doSignal
代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
private void doSignal(Node first) {
do {
// 1. 若只有一个节点,先置尾巴为null,因为头节点要移出队列
if((firstWaiter = first.nextWaiter) == null) {
lastWaiter = null;
}
// 2. 由于first头节点要出队,因此先移出队列
first.nextWaiter = null;
} while(!transferForSignal(first) &&
(first = firstWaiter) != null);
// 3. 转移并唤醒first头节点,失败就重试
}
可见唤醒、入同步队列的过程在transferForSignal
中,而本方法实现了出条件队列的功能。
3.2.2. transferForSignal
实现:唤醒线程、入同步队列(转移)。
代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
final boolean transferForSignal(Node node) {
// 1. CAS更新waitStatus
// 若失败,则节点已被取消(即不在条件队列中,不需被转移),直接返回转移下一个节点即可
if (!node.compareAndSetWaitStatus(Node.CONDITION, 0)) {
return false;
}
// 2. 入队尾,返回其前驱节点
Node p = enq(node);
int ws = p.waitStatus;
// 3. 假如前驱节点取消获取锁,或者CAS设置前驱节点状态为SIGNAL失败
if(ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)) {
// 唤醒node的线程
LockSupport.unpark(node.thread);
}
return true;
}
一个问题是第3步的条件:ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)
:
-
正常情况下,
ws <= 0
,CAS也会成功,node
线程依旧被阻塞,但是已经推入同步队列队尾。这时,只需等待前驱节点唤醒(unpark
)即可,在acquireQueue
的第6行直接获得锁,从而继续执行;实际上,正常情况下,也可以唤醒,这不影响正确性。但是没有必要。
-
异常情况下,
ws > 0
(前驱被取消),唤醒node
线程以再次同步,会进入acquireQueue
第11行,跳过前驱的取消节点,并可能再次阻塞;这里取消指:不获取锁,而不是获取
Condition
-
异常情况下,CAS设置失败,说明同步队列有新线程加入,因此也需要唤醒
node
线程以再次同步,也会进入acquireQueue
第11行,再次阻塞。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
final boolean acquireQueued(final Node node, int arg) {
boolean interrupted = false;
try {
for(;;) {
final Node p = node.predecessor();
if(p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
return interrupted;
}
if(shouldParkAfterFailedAcquire(p, node)) {
interrupted |= parkAndCheckInterrputed();
}
}
} catch(Throwable t) {
cancelAcquire(node);
if(interrputed) {
selfInterrupt();
}
throw t;
}
}
3.3. 设置超时的场景
Condition
开放了超时的机制,最后会落入await(long time, TimeUnit unit)
方法。
实际上,这个方法和标准的方法没有太大的差别,只是增加了时间超时的判断:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public final boolean await(long time, TimeUnit unit) throws InterrptedException {
long nanoTimeout = unit.toNanos(time);
// 0. 检查中断
// ...
final long deadline = System.nanoTime() + nanoTimeout;
// 1. 入条件队列
// 2. 释放锁
// 3. 阻塞,激活后在同步队列获取锁
while(!isOnSyncQueue(node)) {
// a) 超时,就取消获取Condition
if(nanosTimeout <= 0L) {
// 之前所述,这里保证node进入同步队列
// true: 说明signal没发生
// false: 说明signal已发生,也就是没超时
timeout = transferAfterCancelledWait(node);
break;
}
// b) 当时间超过1ms,直接阻塞,否则就自旋,以提高性能
if(nanosTimeout >= SPIN_FOR_TIMEOUT_THRESHOLD) {
LockSupport.parkNanos(this, nanoTimeout);
}
nanaoTimeout = deadline - System.nanoTime();
}
// 4. 队列中重新获取锁,设置中断mode
// 5. 处理取消节点
// 6. 处理中断
return timeout;
}
3.4. 不检测中断的场景
Condition
开放了不检查中断的机制,最后会落入awaitUninterruptibly()
方法。
代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public final void awaitUninterruptibly() {
// 1. 入条件队列
Node node = addConditionWaiter();
// 2. 释放锁
int savedState = fullyRelease(node);
boolean interrupted = false;
// 3. 阻塞,等待被激活并进入同步队列
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if (Thread.interrupted()) {
interrupted = true;
}
}
// 4. 同步队列中获取锁
if (acquireQueued(node, savedState) || interrupted) {
selfInterrupt();
}
}
实际上,和原有的await
相比,去掉了中断检查以及中断处理的代码,其它基本一样。
4. 总结
对于Condition
,有如下总结:
Condition
实际上维护了一个单向链表,即条件队列,线程在这个队列中等待- 所有
Condition
操作都需要预先获取锁,否则会抛异常 - 对于
await
,主要是将线程Node
从同步队列头转移到条件队列尾,并阻塞 - 对于
signal
,主要是将线程Node
从条件队列头转移到同步队列尾,并等待前驱节点的激活并获取到锁,从await
中离开
而Condition
有很多中断标志的处理,简而言之,就是:
- 开头检查标志位
- 阻塞被唤醒后,还需要检查标志位
- 最后处理标志位