1. 总体说明
AbstractQueuedSynchronizer
类很重要,是 ReentrantLock
、CountDownLatch
、Semaphore
、FutureTask
实现的基础。
本文说明AQS与独占锁(ReentrantLock
)的相关源码,源码基于JDK 11。
1.1. AQS内部数据结构
实际上AQS内部数据结构很简单,简单而言就是维护了一个等待队列。
1
2
3
4
5
6
7
8
9
10
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSyncronizer implements java.io.Serializable {
// 队列头,代表获取锁的线程/节点
private transient volatile Node head;
// 队列尾
private transient volatile Node tail;
// 同步状态:0代表没有被线程持有,>0代表被某个线程持有(代表重入次数)
private volatile int state;
// 继承自父类,代表持有独占锁的线程
private transient Thread exclusiveOwnerThread;
}
1.2. Node
节点
Node
为队列中的节点,代表等待锁的线程(除了head
外)。
锁有独占、共享这2种模式,因此Node
定义了这2个模式,分别如下:
1
2
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
另外还有线程的等待状态,定义如下:
1
2
3
4
5
6
7
8
9
// 线程取消抢占该锁,通常是超时或中断
static final int CANCELLED = 1;
// 对后继节点对应的线程进行唤醒
// 这种情况下,本节点线程释放/取消获取锁后,必须唤醒后继节点
static final int SIGNAL = -1;
// 线程正在一个Condition的队列中等待
static final int CONDITION = -2;
// 表示下一个acquireShared应该无条件传播
static final int PROPAGATE = -3;
整体的Node
定义如下:
1
2
3
4
5
6
7
8
9
10
11
static final class Node {
// 上述的值以及0
// 负数代表正在等待,大于0代表被取消,0是正常同步节点
volatile int waitStatus;
volatile Node prev;
volatile Node next;
// 对应线程
volatile Thread thread;
// 等待某个Condition的下一个Node,或者是Node.SHARE
Node nextWaiter;
}
所以整体结构就是一个双向链表的阻塞队列。
1.3. 独占可重入锁ReentrantLock
通常的使用方式是:
1
2
3
4
5
6
lock.lock();
try {
// Critical Section
} finally {
lock.unlock();
}
它内部使用Sync
类管理锁,分为公平与非公平,后者是默认的:
FairSync
:FIFO顺序获取锁NonfairSync
:先尝试获取锁,若失败则放入队列等待,此时和公平的没有区别
2. 加锁
加锁调用sync.acquire(1)
的操作,逻辑大致如下:
1
2
3
4
5
6
7
8
9
10
public final void aquire(int arg) {
// 1. 首先tryAcquire先尝试获取锁,若OK则不需入队
// 2. 尝试失败,调用addWaiter将独占Node插入队尾
// 3. 最后调用acquireQueued尝试去获取锁
// 4. 若acquireQueued也失败(返回true),则发生中断,当前线程会被中断
if(!tryAquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
selfInterrputed();
}
}
2.1. tryAcquire
——公平与非公平
尝试获取锁,并增加arg
引用计数,并返回是否成功的boolean
值。
这里公平锁与非公平锁有一些不同,以下为非公平锁为例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 1. 首先获取引用计数
int c = getState();
// 2. 若为0,则没有线程抢占,尝试抢占
if (c == 0) {
// 2.0. 对于公平锁,检查队列中是否有前驱节点存在,若没有则尝试抢占,否则必须等待以保证公平
if (/*!hasQueuedPredecessors() &&*/
// 2.1. CAS设置引用计数,成功则抢占成功,失败说明其它线程已经抢占该锁
compareAndSetState(0, acquires)) {
// 2.2. 设置独占owner
setExclusiveOwnerThread(current);
return true;
}
}
// 3. 若不为0,则当前是重入场景时,增加引用计数
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) {
throw new Error("Maximum lock count exceeded");
}
setState(nextc);
return true;
}
return false;
}
很容易看出,公平锁就是添加了队列前驱节点的检查,其它都一样,也就是说:
- 公平锁:检查队列是否有节点,有就乖乖入队,保证FIFO顺序
- 非公平锁:不检查队列是否有节点,首先CAS获取一次锁,若失败,则和公平锁一样
2.2. addWaiter
若tryAcquire
失败,则需要将其放入等待队列。addWaiter
就是这样的操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private Node addWaiter(Node mode) {
// 1. 创建节点
Node node = new Node(mode);
// 2. 进入循环,CAS添加节点到队列
for(;;) {
Node oldTail = tail;
// 2.1. 当尾巴不为空,即队列存在时
if(oldTail != null) {
// 2.1.1 连接新创建node前驱节点
node.setPrevRelaxed(oldTail);
// 2.1.2. CAS设置tail
if(compareAndSetTail(oldTail, node)) {
// 2.1.3. 连接oldTail的后驱节点
oldTail.next = node;
return node;
}
// NOTE:
// 若CAS设置成功,即可说明没有新的Node添加进来,
// 也即说明当前新创建的node.prev,即oldTail并没有改变,
// 我们可以直接设置oldTail.next为node
}
// 2.2. 否则初始化队列(只会在最初的时候发生),然后继续尝试CAS
else {
initializeSyncQueue();
}
}
}
而初始化队列非常简单粗暴,利用CAS,设置head
和tail
为同一个dummy node(即node.thread == null
)
- 这个dummy node代表已有其它线程占用该锁,因为走到这里,是因为之前获取锁(
tryAcquire
)失败,必然有其它线程的占用 - 任何线程占用该锁后,
node
会成为头部head
,它也会变成一个dummy node,所以initializeSyncQueue
方法也标识了:锁正在被其它线程占用
2.3. acquireQueued
最后是在队列中持续获取锁。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
final boolean acquireQueued(final Node node, int arg) {
boolean interrupted = false;
try {
// 1. 循环不停尝试获取锁
for(;;) {
// 1.1. 获取当前node的前驱节点
final Node p = node.predecessor();
// 1.2. 若前驱为head,即当前节点是除head外第一个节点,则尝试获取锁
if(p == head && tryAcquire(arg)) {
// 若成功,则设置当前node为队列头
setHead(node);
p.next = null;
return interrupted;
}
// 1.3. 若不是除head外第一节点,且尝试获取锁失败,则检查本线程是否需要阻塞
if(shouldParkAfterFailedAcquire(p, node)) {
// 若要阻塞,则阻塞线程(park),并检查线程是否中断
// 也会在此被唤醒,从而继续循环重试获取锁
interrupted |= parkAndCheckInterrputed();
}
}
} catch(Throwable t) {
// 2. 若获取锁时,抛出异常,如超时/中断,则取消获取锁
// 取消锁获取,线程不会被阻塞
cancelAcquire(node);
if(interrputed) {
selfInterrupt();
}
throw t;
}
}
上面就是不断循环获取锁,队列中只有head.next
才能获取锁
-
获取到,则设置自己的
node
为队列头(并删除原有的头)所以,线程获取到锁后,其对应的
node
必定是head
-
获取不到,通常则继续阻塞(可能产生重试),直到被唤醒。
上面有2个关键方法:
shouldParkAfterFailedAcquire
parkAndCheckInterrpted
另外还有处理异常的cancelAcquire
,也是很关键的。
2.3.1. shouldParkAfterFailedAcquire
当线程没成功获取锁,则判断它是否需要挂起。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 1. 前驱状态为SIGNAL,则前驱节点正常(一般而言需要这个状态,从而以唤醒后继节点)
// 因此直接挂起当前线程即可
if(ws == Node.SIGNAL) {
return true;
}
// 2. 状态>0,说明前驱线程取消了排队
// 因此需要跳过取消排队的节点,找到第一个正在排队的前驱节点
if(ws > 0) {
do {
node.prev = (pred = pred.prev);
} while(pred.waitStatus > 0);
pred.next = node;
}
// 3. 其它状态,即0,-3(不会是-2,原因以后会说)
// 这时候,设置前驱状态为SIGNAL(-1),以便于唤醒后继(即当前节点)
// 然后重试检查以阻塞当前线程
else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
2.3.2. parkAndCheckInterrpted
很简单,就是阻塞/挂起线程,并检查线程是否被中断。
1
2
3
4
5
private final boolean parkAndCheckInterrupt() {
// 这里挂起线程,底层调用Unsafe#park(isAbsolute, time)
LockSupport.park(this);
return Thread.interrupted();
}
2.3.3. cancelAcquire
出现异常时,线程取消锁的获取。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
private void cancelAcquire(Node node) {
// ...
// 1. 取消线程关联
node.thread = null;
// 2. 跳过CANCELLED的前驱节点
Node pred = node.prev;
while(pred.waitStatus > 0) {
node.prev = (pred = pred.prev);
}
Node predNext = pred.next;
// 3. 设置状态为CANCELLED
node.waitStatus = Node.CANCELLED;
// 4. 若当前节点是尾巴,且CAS设置尾巴为前驱节点成功(即并发下没有节点队列加入)
// 设置前驱节点的next为空
if(node == tail && compareAndSetTail(node, pred)) {
pred.compareAndSetNext(predNext, null);
} else {
// 5. 若node不是尾巴
int ws;
// 5.1. 若node不是head后继节点
// a) 检查并设置前驱节点的状态为SIGNAL
// b) 让node前驱节点的next指向node后继节点
if(pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL) ||
(ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL)) &&
pred.thread != null) {
Node next = node.next;
if(next != null && next.waitStatus <= 0) {
// NOTE: 这里没有设置next.prev,跳过CANCELLED
// 是因为这会被其它线程(即next)做:
// a) 当next获取锁,检查自己是否要挂起时,CANCELLED节点会被跳过
// b) 当next取消获取锁时,也会被跳过,即上面的2
// 注意,next是不可能已经获取到锁,因为next不可能是head.next,更不可能是head
pred.compareAndSetNext(predNext, next);
}
} else {
// 5.2. 否则node为head的后继节点
// 则直接唤醒自己的后继节点
unparkSuccessor(node);
}
node.next = node;
}
}
上述的4~6操作,node
就会被分离出队列,从而取消了锁获取。
而锁获取取消后,线程就不会被阻塞,所以取消获取锁后,通常需要抛出异常。
3. 解锁
解锁调用sync.release(1)
的操作,逻辑大致如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public final boolean release(int arg) {
// 1. 尝试释放锁
if(tryRelease(arg)) {
// 2. 若成功释放(引用计数为0),若自己的状态不为0(?)
// 则需要唤醒后继节点
// 因为后续节点加入后,会改变自己的waitStatus,比如2.3.1.
Node h = head;
if(h != null && h.waitStatus != 0) {
unparkSuccessor(h);
}
return true;
}
return false;
}
关键方法有:
tryRelease
unparkSuccessor
3.1. tryRelease
尝试释放锁,具体逻辑如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
protected final boolean tryRelease(int releases) {
// 1. 计算新的引用计数
int c = getState() - releases;
if(Thread.currentThread() != getExclusiveOwnerThread()) {
throw new IllegalMonitorStateException();
}
// 2. 若为0,则释放线程所有权
boolean free = false;
if(c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
// 3. 设置新的引用计数
setState(c);
return free;
}
仅仅是一个减去引用计数而已。
3.2. unparkSuccessor
唤醒节点的后继者,这里的节点就是head
,也就是自己。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// 1. 若状态小于0,修改为0
if(ws < 0) {
node.compareAndSetWaitStatus(ws, 0);
}
// 2. 寻找没取消的后继节点
// 2.1. 先找node的后继节点
Node s = node.next;
// 2.2. 若当前后继节点没被取消(CANCELLED)
// 则从尾巴向后扫描,找到最后一个没取消的后继节点
if(s == null || s.waitStatus > 0) {
s = null;
for(Node p = tail; p != node && p != null; p = p.prev) {
if(p.waitStatus <= 0) {
s = p;
}
}
}
// 3. 若找到没取消的后继节点,唤醒对应的线程
if(s != null) {
// 底层调用Unsafe#unpark(thread)
LockSupport.unpark(s.thread);
}
}
4. 总结
- AQS实际上是一个双向FIFO队列,在独占锁实现中:
head
代表获取锁的线程- 其余的节点代表等待的线程,只有第一个节点才能成功抢占锁
state
引用计数,可用于实现重入,计数使用CAS更改
-
锁使用一定得有
lock()
于unlock()
的匹配 -
大量使用CAS(如
tryAcquire
),低并发下,能一定程度上降低上锁的开销;而高并发下,似乎还是得入队 -
底层使用
Unsafe#park/unpark
,相比于synchronized
,jstack
中:synchronized
:BLOCKED
状态park
:WAITING/TIMED_WAITING
状态
但是一般情况下(如Linux),OS级别对应的是同一种状态
-
对于公平锁/非公平锁,后面还会提及
- 公平:获取时检查队列是否有节点,因此获取顺序是FIFO的
- 非公平:没有检查队列的过程,可能在某个线程
release
后,后继节点被唤醒并尝试获取锁前,这个间隙中,线程可以抢占并获取到锁;但是入队了后,队列中线程的获取顺序依旧是FIFO,即和公平锁一样