源码阅读-AQS与共享锁

Posted by keys961 on June 18, 2019

AQS的共享模式应用于许多类,最主要的是这3个:CountDownLatch, CyclicBarrerSemaphore

1. CountDownLatch

1.1. 使用示例

非常简单,假定一个线程等待N个任务的完成:

  • 初始化一个计数为NCountDownLatch
  • N个任务线程调用latch.countDown()
  • 外部线程调用latch.await()等待所有任务完成

1.2. 数据结构

CountDownLatch内部实际上就实现了自己的Sync类(继承AQS),用于支持共享模式,实质上是一个共享锁。初始化需要初始化一个引用计数count,实际上就是AQS的state

可通过setState/getState设置或读取引用计数

CountDownLatch主要的方法就2个:

  • countDown:引用计数-1
  • await:阻塞至引用计数为0(且它响应中断)

1.3. await

实际上调用了sync.acquireSharedInterruptibly(1),而里面的代表如下:

1
2
3
4
5
6
7
8
9
10
11
12
public final void acquireSharedInterruptibly(int arg) 
				throws InterruptedException {
    // 0. 检查中断
    if(Thread.interrupted()) {
        throw new InterruptedException();
    }
    // 1. 获取共享状态(先尝试,后可能阻塞并再次获取)
    // 小于0则表示尝试获取失败,可能需要阻塞
    if(tryAcquireShared(arg) < 0) {
        doAcquireSharedInterrptibly(arg);
    }
}

里面关键2个方法:

  • tryAcquireShared
  • doAcquireSharedInterrptibly

1.3.1. tryAcquireShared

这里的实现就是:判断引用计数是否为0

  • 是,则返回1,表明获取锁成功
  • 否,则返回-1,表明获取锁失败,说明可能要进入阻塞状态并再次获取

1.3.2. doAcquireSharedInterruptibly

CountDownLatch下,当引用计数不为0,即大于0,说明await线程是不能获取到锁的。

因此需要进入该方法,进入同步队列,阻塞并等待被激活,进而获取同步状态。

代码逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    // 1. 入队
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        // 2. 循环尝试获取锁
        for(;;) {
            final Node p = node.predecessor();
            // 2.1 前驱为head时,尝试获取
            if(p == head) {               
                int r = tryAcquireShared(arg);
                if(r >= 0) {
                    // 2.2. 获取成功,则重新设置头,并向后传播
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    return;
                }
            }
            // 2.3. 前驱不为head,或获取锁失败,
            // 则检查线程是否需要阻塞,阻塞并检查中断
            // 前面的方法之前提及过:
            // a) 若前驱状态不是SIGNAL,并置状态为SIGNAL,返回false
            // b) 跳过CANCELLED前驱节点,返回false
            // c) a,b下,下一次循环时,检查后返回true,从而能够阻塞线程
            if(shouldParkAfterFailedAcquire(p, node) && 
               parkAndCheckInterrpt()) {
                throw new InterruptedException();
            }
        }
    } catch(Throwable t) {
        // 3. 异常则取消锁的获取,将本节点出队
        // NOTE: 之前提及,
        // 出队还需要shouldParkAfterFailedAcquire以及cancelAcquire本身的帮助
        cancelAcquire(node);
        throw t;
    }
}

很多方法都已经在之前的文章中说明了,如下面这些,详见这里

  • addWaiter
  • shouldParkAfterFailedAcquire
  • cancelAcquire

这里,当线程调用await时,会查看引用计数(state是否为0):

  • 如果是0,tryAcquireShared就会成功,从而获得到锁
  • 如果大于0,tryAcquireShared就会失败,进入同步队列,进行阻塞

所以这里只有1个方法需要说明,那就是线程获取锁成功时设置头节点的问题,setHeadAndPropagate

1.3.3. setHeadAndPropagate

可以先看1.4.节再看本节

被唤醒后,进入该方法,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head;
    // 1. 重新设置头节点
    // 依旧只有1个线程被激活(多个需要激活的线程是传播激活的),因此线程安全
    setHead(node);
    // 2. 传播唤醒,这里propagate > 0
    if(propagate > 0 || h == null || h.waitStatus < 0 ||
       (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if(s == null || s.isShared()) {
            // 本线程已被唤醒,进而传播,唤醒后继线程
            // 详见1.4.2.
            doReleaseShared();
        }
    }
}

1.4. countDown

本方法关键调用是sync.releaseShared(1),代码如下:

1
2
3
4
5
6
7
8
9
public final boolean releaseShared(int arg) {
    // 1. 尝试更改引用计数,以释放锁状态
    if (tryReleaseShared(arg)) {
        // 2. 若成功,则真正释放锁,唤醒后继阻塞的线程
        doReleaseShared();
        return true;
    }
    return false;
}

1.4.1. tryReleaseShared

CountDownLatch中调用countDown,会调用这个方法。

它仅仅对引用计数-1:

  • 若引用计数变为0,则返回true,可以释放锁并唤醒后面的线程
  • 若引用计数大于0,则返回false,锁状态依旧保持

1.4.2. doReleaseShared

tryReleaseShared成功后,会调用该方法。

CountDownLatch中,引用计数为0,需要唤醒等待的await线程。

代码逻辑如下,if条件也有说明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 假设h -> t3 -> t4
private void doReleaseShared() {
    // 循环
    for(;;) {
        Node h = head;
        if(h != null && h != tail) {
            int ws = h.waitStatus;
            // 1. 正常情况,头节点是SIGNAL(如头为h)
            if(ws == Node.SIGNAL) {
                // 1.1. CAS设置头节点的ws为0,正常情况是能成功的
                // CAS失败可能的情况如:中继节点传播时(如t3传播t4)
                // 后继线程可能会占用头,从而头改变了,ws可能变为0,那只能循环下一轮的检查
                if(!h.compareAndSetWaitStatus(Node.SIGNAL, 0)) {
                    continue;
                }
                // 1.2. 唤醒后继节点,唤醒后的请看1.3.3.
                // 可能在这个后,后面有多个线程被唤醒,头会被改变
                // 如目前线程是h,那么它的头可能是t3,t4等
                unparkSuccessor(h);
            }
            // 2. ws为0,即,CAS失败,即此时有新节点入队,造成ws变为-1
            // 如队列从h -> t3 -> t4变为t4, t4的ws为0,此时新结点入队,ws变为-1
            else if (ws == 0 && !h.compareAndSetWaitStatus(0, Node.PROPGATE)) {
                continue;
            }
        }
        // 3. 若head没变,说明头节点没被后面的线程占有,那么直接退出
        // 若head变化,说明头节点被后面线程占有,而继续下一步循环,从而可以继续激活更后面的线程,以提高吞吐量
        if(h == head) {
            break;
        }
    }
}

2. CyclicBarrier

CountDownLatch类似,不过可以循环使用。

它基于Condition实现。

它最重要的就是await方法:

  • 每调用一次,计数-1,并阻塞
  • 当为0后,所有线程终止阻塞,并恢复计数

2.1. 数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class CyclicBarrier {
    // 因为可重复使用,每轮使用当成一个周期
    private static class Generation {
        boolean broken = false;
    }
    // 栅栏本身
    private final ReentrantLock lock = new ReentrantLock();
    // 等待通过栅栏的条件
    private final Condition trip = lock.newCondition();
    // 参与总线程数
    private final int parties;
    // 越过栅栏前先执行的操作
    private final Runnable barrierCommand;
    // 处于哪个周期
    private Generation generation = new Generation();
    // 引用计数: 没到达的线程数
    // 即: parties = count + 已到达的线程数
    private int count;
}

2.2. 打破栅栏,开启下一轮周期

打破栅栏调用breakBarrier,若不修复,则无法继续使用该栅栏(抛异常):

  • 设置generation.brokentrue
  • 恢复引用计数count
  • 放行所有等待trip的线程

开启下一轮周期调用nextGeneration,其它和breakBarrier一样,除了:

  • 生成新的Generation实例

2.3. await

实际上调用的是带超时版本的doAwait,并能响应中断。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
private int dowait(boolean timed, long nanos) 
    throws InterruptedException, BrokenBarrierException, TimeoutException {
    final ReentrantLock lock = this.lock;
    // 1. 获取锁,因为要使用Condition
    lock.lock();
    try {
    	final Generation g = generation;
        // 2. 若栅栏破了,抛异常
        if(g.broken) {
            throw new BrokenBarrierException();
        }
        // 3. 响应中断,需要打破栅栏,释放等待的线程,并抛异常
        if(Thread.interrpted()) {
            breakBarrier();
            throw new InterruptedException();
        }
        // 4. 递减引用计数
        int index = --count;
        // 5. 若为0,则要释放所有等待线程
        if(index == 0) {
            boolean ranAction = false;
            try {
                // 5.1. 先执行命令
                final Runnable command = barrierCommand;
                if(command != null) {
                    command.run();
                }
                // 若命令出异常,这一步走不到
                ranAction = true;
                // 5.2. 唤醒线程,开启下一轮
                nextGeneration();
                return 0;
            } finally {                
                if(!ranAction) {
                   // 5.3. 这里,指定命令出现异常,需要打破栅栏并释放线程,异常则通过命令抛出
                    breakBarrier();
                }
            }
        }
        
        // 6. 若引用计数大于0,则需要等待,进入循环
        // 出循环: trip条件满足、中断、栅栏破坏、超时
        for(;;) {
            try {
                // 6.1. 阻塞线程,这里阻塞会响应中断
                if(!timed) {
                    // a) 无超时
                    trip.await();
                } else if(nanos > 0L) {
                    // b) 超时
                    nanos = trip.awaitNanos(nanos);
                }
            } catch(InterruptedException ie) {
                // 6.2. await时线程中断,处理错误
                if(g == generation && !g.broken) {
                    // a) 处于同一周期,需要打破栅栏
                    breakBarrier();
                    throw ie;
                } else {
                    // b) 新一代产生,即执行到这里,本轮的所有线程已经被放行了,则不需要抛出异常,只需记录中断信息
                    // 或栅栏被打破,不需要抛中断异常,而是抛栅栏破坏的异常
                    Thread.currentThread().interrupt();
                }
            }
            // 7. 唤醒后还需检查栅栏是否完好
            if(g.broken) {
                throw new BrokenBarrierException();
            }
            // 8. 这种情况下,所有线程都已经要被放行了,最后一个线程会更新generation,并释放锁
            // 而到这里,线程获得锁,可知generation必定不等于g,因而在这个条件下直接返回即可
            // 不满足条件的情景: 
            // a) barrierCommand异常,栅栏被毁,线程被唤醒,到这一步,g == generation,只改变了g.broken
            // b) 超时情景,即上述trip.waitNanos(nanos)不是被外部唤醒,而是自己唤醒。由于只有外部唤醒才能改变generation,因此自己唤醒的情况(即超时)下,g == generation。超时在第9步处理
            if(g != generation) {
            	return index;
            }
            // 9. 超时,打破栅栏,抛出异常
            if(timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

3. Semaphore

信号量大家都很熟悉,不再多描述,它内部也有自己实现的Sync

主要的PV操作有:

  • acquire
  • release

下面对关键方法分析,有些方法不会响应中断,有些方法会响应超时,但基本没多少代码的变化。

3.1. acquire

实际上调用sync.acquireSharedInterrptibly(permits)

1
2
3
4
5
6
7
8
9
10
11
12
public final void acquireSharedInterrptibly(int arg)
    throws InterruptedException {
    // 0. 检查中断
    if(Thread.interrpted()) {
        throw new InterruptedException();
    }
    // 1. 获取共享状态(先尝试,后可能阻塞并再次获取)
    // 小于0则表示尝试获取失败,可能需要阻塞 
    if(tryAcquireShared(arg) < 0) {
        doAcquireSharedInterrptibly(arg);
    }
}

逻辑和CountDownLatch一模一样,主要差别在于tryAcquireShared(其它都是一样的,可参考CountDownLatch

3.1.1. tryAcquireShared

类似ReentrantLock,有公平和非公平之分,代码差别比较小,主要是检查前驱节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
protected int tryAcquiredShared(int acquires) {
    for(;;) {
        // 0. 若公平,则需检查是否有前驱,若有,则返回-1,需要阻塞
        if(hasQueuedPredecessors()) {
			return -1;
        }
        // 1. 获取原有的引用计数
        int available = getState();
        // 2. 计算后面的引用计数
        int remaining = available - acquires;
        // 3. CAS设置引用计数
        // 返回remaining,若remaining小于0,则需要阻塞(没获取到这个信号量)
        // 注意这里remaining<0不会被CAS设置,后文会说
        if(remaining < 0 ||
           compareAndSetState(available, remaining)) {
            return remaining;
        }
    }
}

注意这里remaining < 0不会被CAS设置,后文会说。

3.2. release

实际上调用sync.releaseShared(arg)

1
2
3
4
5
6
7
8
9
10
public final boolean releaseShared(int arg) {
    // 1. 尝试更改引用计数,以释放锁状态
    if (tryReleaseShared(arg)) {
        // 2. 一般而言上一步都会返回true
        // 具体说明见下文所述
        doReleaseShared();
        return true;
    }
    return false;
}

实际上,变化的也还是tryReleaseShared(arg)

3.2.1. tryReleaseShared

代码也是很简单的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        // 1. 计算之后的引用计数
        int current = getState();
        int next = current + releases;
        // 溢出
        if (next < current) {
            throw new Error("Maximum permit count exceeded");
        }
        // 2. 循环CAS设置引用计数
        if (compareAndSetState(current, next)) {
            return true;
        }
    }
}

可以发现这个方法会始终返回true,从而激活后面头节点的线程,这里处理的方式比较巧妙。

假设初始为1,线程t获取3,线程m释放1。

对于线程t

  • 获取3后,remaining = -2,从而阻塞,注意permit == 3

对于线程m

  • 释放1后,state被CAS设置成2,并激活线程t

然后对于t,如1.3.2.所示,被激活,并重新循环获取锁,发现remaining == -1,再次失败,因而被阻塞了。

假如t获取的是2,那么再次唤醒并获取锁时,remaining == 0,获取成功,从而成为头节点,获取锁,解除阻塞状态。