源码阅读-AQS与Condition

Posted by keys961 on June 14, 2019

1. Condition使用场景

以生产者-消费者为例,必须注意:

  • ConditionLock中创建
  • 使用Condition必须获取到Lock(因为从Condition中阻塞要释放Lock
  • Conditionawaitsignal必须成对
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
Lock lock = new ReentrantLock();
Condition notFull = lock.newCondition();
Condition notEmpty = lock.newCondition();
Buffer buffer = new Buffer(CAPACITY);

// Producer
void put(Object x) {
    lock.lock();
    try {
        while(buffer.isFull()) {
            notFull.await(); // 等待notFull
        }
        buffer.put(x); // 生产
        notEmpty.signal(); // 发通知激活notEmpty
    } finally {
        lock.unlock();
    }
}

// Consumer
Object take(Object x) {
    lock.lock();
    try {
        while(buffer.isEmpty()) {
            notEmpty.await(); // 等待notEmpty
        }
        Object x = buffer.take(); // 消费
        notFull.signal(); // 发通知激活notFull
        return x;
    } finally {
        lock.unlock();
    }
}

2. Condition数据结构

这里参考AQS定义的ConditionObject类:

1
2
3
4
5
6
public class ConditionObject implements Condition, java.io.Serializable {
    //...
    // 队列头和队列尾
    private transient Node firstWaiter;
    private transient Node lastWaiter;
}

Node中的字段nextWaiter,在Condition使用时,线程一个单向链表:

1
2
3
4
final static class Node {
    //...
    Node nextWaiter; // Condition下形成的单向链表
}

因此,同样引入了“队列”,为了区分AQS中的同步队列,这里定义为:条件队列

这里引用这里的图,说明队列示意图:

queue

3. Condition工作流程

Condition工作大致流程为

  1. condition.await()后,将自己的Node加入条件队列,释放锁,然后阻塞
  2. condition.signal()后,将条件队列头Node添加到同步队列尾,等待获取锁(只有获取到锁后,才能从await中返回)

3.1. await

代码流程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public final void await() throws InterruptedException {
    // 0. 因为要响应中断,因此要检查中断状态
    if(Thread.interrupted()) {
        throw new InterruptedException();
    }
    // 1. 将node加入条件队列(*)
    Node node = addConditionWaiter();
    // 2. 完全释放锁,获取之前的引用计数(*)
    int savedState = fullyRelease(node);
    int interrupteMode = 0;
    // 3. 能进入循环,进入时肯定获取锁,不在同步队列(*)
    // 而再次进入循环判断后,肯定是被signal激活了(会被加入同步队列)
    // 若进入了同步队列,跳出循环;之后会说,它不会不在同步队列
    while(!isOnSyncQueue(node)) {
        // 3.1. 阻塞线程
        LockSupport.park(this);
        // 3.2. 出来时检查中断状态,若中断(即不为0),要跳出循环
        interrputMode = checkInterruptWhileWaiting(node);
        if(interruptMode != 0) {
            break;
        }
    }
    // 4. 激活进入阻塞队列后,尝试获取锁,再次阻塞(*)
    if(acquireQueued(node, saveState) && interruptMode != THROW_IE) {
        // 4.1. 当线程发生中断时,要重新设置中断mode
        interruptMode = REINTERRUPT;
    }
    // 5. 条件队列中该节点有后继,将CANCELLED的节点从队列中移除(*)
    if(node.nextWaiter != null) {
        unlinkCancelledWaiters();
    }
    // 6. 处理中断(*)
    // THROW_IE则抛出异常,REINTERRUPT则设置线程中断位
    if(interruptMode != 0) {
        reportInterruptAfterWait(interruptMode);
    }
}

下面对上面6步进行说明。

3.1.1. 将Node加入条件队列

addConditionWaiter中实现。

之前提及,当前线程是获取到锁的,因此同步队列的头Node是一个dummy node,因此需要创建一个新Node,并设置waitStatusCONDITION

此外,由于只有1个线程获取到锁,因此该方法是线程安全的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private Node addConditionWaiter() {
    if(!isHeldExclusively()) {
        throw new IllegalMonitorStateException();
    }
    Node t = lastWaiter;
    // 1. 当队列尾部被CANCELLED,扫描整个条件队列,清除取消的节点
    if(t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    // 2. 创建当前线程的Node
    Node node = new Node(Node.CONDITION);
    // 3. 插入队尾
    if(t == null) {
        firstWaiter = node;
    } else {
        t.nextWaiter = node;
    }
    lastWaiter = node;
    return node;
}

而上面有一个清除CANCELLED节点的方法:unlinkCancelledWaiters。它是纯链表的操作,会扫描整个条件队列,若节点状态不是Node.CONDITION(即是被取消的节点),则需要将改节点移出队列

这里的取消指:不需要获取Condition的同步状态

3.1.2. 完全释放锁

fullyRelease中实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
final int fullyRelease(Node node) {
    try {
        // 1. 获取当前引用计数
        int savedState = getState();
        // 2. 释放当前线程持有的锁
        if(release(savedState)) {
            return savedState;
        }
        throw new IllegalMonitorStateException();
    } catch(Throwable t) {
        // 异常,则取消锁的获取,标记并抛出异常
        node.waitStatus = Node.CANCELLED;
        throw t;
    }
}

假如线程没持有锁,却调用了Condition#await,那么进入fullyRelease后的第2步会失败,因而直接抛出异常。

所以使用condition时,一定要先获取锁!

3.1.3. 阻塞,激活后在同步队列获取锁

即下面这个循环:

1
2
3
4
5
6
7
8
9
10
// 3.0. 若不再同步队列中
while(!isOnSyncQueue(node)) {
    // 3.1. 阻塞线程
    LockSupport.park(this);
    // 3.2. 出来时检查中断状态,若中断(即不为0),要跳出循环
    interrputMode = checkInterruptWhileWaiting(node);
    if(interruptMode != 0) {
        break;
    }
}

首先是isOnSyncQueue方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 初始时,node.waitStatus == Node.CONDITION
final boolean isOnSyncQueue(Node node) {
    // 移到同步队列后,node.waitStatus会为0
    // 此外node.prev为空时,肯定不在队列中(只有header才行)
    // 1. 遇到上述2情形时,返回false
    if(node.waitStatus == Node.CONDITION || node.prev == null) {
        return false;
    }
    // 2. 若node有后继节点,肯定在同步队列中,返回true
    if(node.next != null) {
        return true;
    }
    // 3. 从尾巴扫描,查看是否在同步队列中
    return findNodeFromTail(node);
}

线程阻塞,被Condition#signal激活后,需要检查中断状态,这是checkInterruptWhileWaiting

而其中有一个非常重要的方法transferAfterCancelledWait判断中断是在signal前还是之后,并保证node进入同步队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 1. 若signal之前中断,返回THROW_IE
// 2. 若signal之后中断,返回REINTERRUPT
// 3. 无中断,返回0
private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
}

// 此时,线程已被中断,判断中断是在signal前还是后
// 此外,该方法可以保证node一定进入了同步队列
final boolean transferAfterCancelledWait(Node node) {
    // 1. CAS设置waitStatus状态为0,即恢复
    // 若成功,中断肯定在signal之前发生,因为signal会将waitStatus置为0
    if(node.compareAndSetWaitStatus(Node.CONDITION, 0)) {
        // 1.1. 将节点放入同步队列,即中断后,依旧回到同步队列
        enq(node);
        return true;
    }
    // 2. CAS失败,中断在signal之后完成
    // 自旋等待signal将其推入同步队列
    while(!isOnSyncQueue(node)) {
        Thread.yield();
    }
    return false;
}

从上可知,线程发生了中断,节点依旧在同步队列中

3.1.4. 再次尝试获取锁

代码片段为:

1
2
3
4
5
// 4. 激活进入阻塞队列后,尝试获取锁,再次阻塞(*)
if(acquireQueued(node, saveState) && interruptMode != THROW_IE) {
    // 4.1. 当线程发生中断时,要重新设置中断mode
    interruptMode = REINTERRUPT;
}

3.1.3.节的中断检查,以及Condition#signal,会将节点推入同步队列,因此直接调用acquireQueue(node, saveState)来获取锁即可。

acquireQueue返回true,表示线程是否被中断,所以进入这个if块需要重置interruptMode,用于之后的处理。

3.1.5. 清理CANCELLED节点

代码片段为:

1
2
3
if(node.nextWaiter != null) {
    unlinkCancelledWaiters();
}

为什么判断条件是node.nextWaiter != null

  • 正常情况,节点会转移到同步队列,同时脱离条件队列,即node.nextWaiter == null
  • 中断在signal之后,同样signal将其转移到同步队列,同时脱离条件队列,即node.nextWaiter == null
  • 中断在signal之前时,enq操作不会断链,而它的waitStatus会置为0(transferAfterCancelledWait的CAS),需要脱离条件队列,因此要被清理

3.1.6. 处理中断状态

最后在await被唤醒后,处理中断状态,使用interruptMode

  • THROW_IE: await返回后,要抛出中断异常
  • 0: 正常,无中断发生
  • REINTERRUPT: await返回后,需要重新设置中断

处理在reportInterrptAfterWait实现,逻辑很简单,这里略过。

JVM线程的中断只是一个标识位

参照Condition的做法,当一个阻塞方法需要处理中断时:

  • 开头检查标志位
  • 唤醒后检查标志位

若被标识,可选择抛出中断异常

3.2. signal

它用于:

  • 唤醒等待Condition上的线程
  • 将其节点移出条件队列
  • 将其节点移入同步队列队尾

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
public final void signal() {
    // 同样要获得锁
    if(!isHeldExclusively()) {
        throw new IllegalMonitorStateException();
    }
    // 1. 获取队列头
    Node first = firstWaiter;
    // 2. 唤醒头节点
    if(first != null) {
        doSignal(first);
    }
}

这里关键在于doSignal

3.2.1. doSignal

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
private void doSignal(Node first) {
    do {
        // 1. 若只有一个节点,先置尾巴为null,因为头节点要移出队列
        if((firstWaiter = first.nextWaiter) == null) {
            lastWaiter = null;
        }
        // 2. 由于first头节点要出队,因此先移出队列
        first.nextWaiter = null;
    } while(!transferForSignal(first) && 
           first = firstWaiter) != null);
    // 3. 转移并唤醒first头节点,失败就重试
}

可见唤醒入同步队列的过程在transferForSignal中,而本方法实现了出条件队列的功能。

3.2.2. transferForSignal

实现:唤醒线程入同步队列(转移)

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
final boolean transferForSignal(Node node) {
    // 1. CAS更新waitStatus
    // 若失败,则节点已被取消(即不在条件队列中,不需被转移),直接返回转移下一个节点即可
    if (!node.compareAndSetWaitStatus(Node.CONDITION, 0)) {
        return false;
    }
    // 2. 入队尾,返回其前驱节点
    Node p = enq(node);
    int ws = p.waitStatus;
    // 3. 假如前驱节点取消获取锁,或者CAS设置前驱节点状态为SIGNAL失败
    if(ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)) {
        // 唤醒node的线程
        LockSupport.unpark(node.thread);
    }
    return true;
}

一个问题是第3步的条件:ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)

  • 正常情况下,ws <= 0,CAS也会成功,node线程依旧被阻塞,但是已经推入同步队列队尾。这时,只需等待前驱节点唤醒(unpark)即可,在acquireQueue的第6行直接获得锁,从而继续执行;

    实际上,正常情况下,也可以唤醒,这不影响正确性。但是没有必要。

  • 异常情况下,ws > 0(前驱被取消),唤醒node线程以再次同步,会进入acquireQueue第11行,跳过前驱的取消节点,并可能再次阻塞;

    这里取消指:不获取锁,而不是获取Condition

  • 异常情况下,CAS设置失败,说明同步队列有新线程加入,因此也需要唤醒node线程以再次同步,也会进入acquireQueue第11行,再次阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
final boolean acquireQueued(final Node node, int arg) {
    boolean interrupted = false;
    try {
        for(;;) {
            final Node p = node.predecessor();
            if(p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null;
                return interrupted;
            }
            if(shouldParkAfterFailedAcquire(p, node)) {
                interrupted |= parkAndCheckInterrputed();
            }
        }
    } catch(Throwable t) {
        cancelAcquire(node);
        if(interrputed) {
            selfInterrupt();
        }
        throw t;
    }
}

3.3. 设置超时的场景

Condition开放了超时的机制,最后会落入await(long time, TimeUnit unit)方法。

实际上,这个方法和标准的方法没有太大的差别,只是增加了时间超时的判断:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public final boolean await(long time, TimeUnit unit) throws InterrptedException {
    long nanoTimeout = unit.toNanos(time);
    // 0. 检查中断
    // ...
    final long deadline = System.nanoTime() + nanoTimeout;
    // 1. 入条件队列
    // 2. 释放锁
    // 3. 阻塞,激活后在同步队列获取锁
    while(!isOnSyncQueue(node)) {
        // a) 超时,就取消获取Condition
        if(nanosTimeout <= 0L) {
            // 之前所述,这里保证node进入同步队列
            // true: 说明signal没发生
            // false: 说明signal已发生,也就是没超时
            timeout = transferAfterCancelledWait(node);
            break;
        }
        // b) 当时间超过1ms,直接阻塞,否则就自旋,以提高性能
        if(nanosTimeout >= SPIN_FOR_TIMEOUT_THRESHOLD) {
            LockSupport.parkNanos(this, nanoTimeout);
        }
        nanaoTimeout = deadline - System.nanoTime();
    }
    // 4. 队列中重新获取锁,设置中断mode
    // 5. 处理取消节点
    // 6. 处理中断
    return timeout;
}

3.4. 不检测中断的场景

Condition开放了不检查中断的机制,最后会落入awaitUninterruptibly()方法。

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public final void awaitUninterruptibly() {
    // 1. 入条件队列
    Node node = addConditionWaiter();
    // 2. 释放锁
    int savedState = fullyRelease(node);
    boolean interrupted = false;
    // 3. 阻塞,等待被激活并进入同步队列
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if (Thread.interrupted()) {
            interrupted = true;
        }
    }
    // 4. 同步队列中获取锁
    if (acquireQueued(node, savedState) || interrupted) {
        selfInterrupt();
    }
}

实际上,和原有的await相比,去掉了中断检查以及中断处理的代码,其它基本一样。

4. 总结

对于Condition,有如下总结:

  • Condition实际上维护了一个单向链表,即条件队列,线程在这个队列中等待
  • 所有Condition操作都需要预先获取锁,否则会抛异常
  • 对于await,主要是将线程Node从同步队列头转移到条件队列尾,并阻塞
  • 对于signal,主要是将线程Node从条件队列头转移到同步队列尾,并等待前驱节点的激活并获取到锁,从await中离开

Condition有很多中断标志的处理,简而言之,就是:

  • 开头检查标志位
  • 阻塞被唤醒后,还需要检查标志位
  • 最后处理标志位