源码阅读-AQS与独占锁

Posted by keys961 on June 13, 2019

1. 总体说明

AbstractQueuedSynchronizer类很重要,是 ReentrantLockCountDownLatchSemaphoreFutureTask实现的基础。

本文说明AQS与独占锁(ReentrantLock)的相关源码,源码基于JDK 11。

1.1. AQS内部数据结构

实际上AQS内部数据结构很简单,简单而言就是维护了一个等待队列

1
2
3
4
5
6
7
8
9
10
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSyncronizer implements java.io.Serializable {
    // 队列头,代表获取锁的线程/节点
    private transient volatile Node head;
    // 队列尾
    private transient volatile Node tail;
    // 同步状态:0代表没有被线程持有,>0代表被某个线程持有(代表重入次数)
    private volatile int state;
    // 继承自父类,代表持有独占锁的线程
    private transient Thread exclusiveOwnerThread;
}

1.2. Node节点

Node为队列中的节点,代表等待锁的线程(除了head外)。

锁有独占、共享这2种模式,因此Node定义了这2个模式,分别如下:

1
2
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;

另外还有线程的等待状态,定义如下:

1
2
3
4
5
6
7
8
9
// 线程取消抢占该锁,通常是超时或中断
static final int CANCELLED = 1;
// 对后继节点对应的线程进行唤醒
// 这种情况下,本节点线程释放/取消获取锁后,必须唤醒后继节点
static final int SIGNAL = -1;
// 线程正在一个Condition的队列中等待
static final int CONDITION = -2;
// 表示下一个acquireShared应该无条件传播
static final int PROPAGATE = -3;

整体的Node定义如下:

1
2
3
4
5
6
7
8
9
10
11
static final class Node {
    // 上述的值以及0
    // 负数代表正在等待,大于0代表被取消,0是正常同步节点
    volatile int waitStatus;
    volatile Node prev;
    volatile Node next;
    // 对应线程
    volatile Thread thread;
    // 等待某个Condition的下一个Node,或者是Node.SHARE
    Node nextWaiter; 
}

所以整体结构就是一个双向链表的阻塞队列。

1.3. 独占可重入锁ReentrantLock

通常的使用方式是:

1
2
3
4
5
6
lock.lock();
try {
    // Critical Section
} finally {
    lock.unlock();
}

它内部使用Sync类管理锁,分为公平与非公平,后者是默认的:

  • FairSync:FIFO顺序获取锁
  • NonfairSync:先尝试获取锁,若失败则放入队列等待,此时和公平的没有区别

2. 加锁

加锁调用sync.acquire(1)的操作,逻辑大致如下:

1
2
3
4
5
6
7
8
9
10
public final void aquire(int arg) {
    // 1. 首先tryAcquire先尝试获取锁,若OK则不需入队
    // 2. 尝试失败,调用addWaiter将独占Node插入队尾
    // 3. 最后调用acquireQueued尝试去获取锁
    // 4. 若acquireQueued也失败(返回true),则发生中断,当前线程会被中断
    if(!tryAquire(arg) &&
       acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
        selfInterrputed();
    }
}

2.1. tryAcquire——公平与非公平

尝试获取锁,并增加arg引用计数,并返回是否成功的boolean值。

这里公平锁与非公平锁有一些不同,以下为非公平锁为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    // 1. 首先获取引用计数
    int c = getState();
    // 2. 若为0,则没有线程抢占,尝试抢占
	if (c == 0) {
        // 2.0. 对于公平锁,检查队列中是否有前驱节点存在,若没有则尝试抢占,否则必须等待以保证公平
        if (/*!hasQueuedPredecessors() &&*/
            // 2.1. CAS设置引用计数,成功则抢占成功,失败说明其它线程已经抢占该锁
            compareAndSetState(0, acquires)) {
            // 2.2. 设置独占owner
            setExclusiveOwnerThread(current);
            return true;
        }
    } 
    // 3. 若不为0,则当前是重入场景时,增加引用计数
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) {
            throw new Error("Maximum lock count exceeded");
        }
        setState(nextc);
        return true;
    }
   return false;
}

很容易看出,公平锁就是添加了队列前驱节点的检查,其它都一样,也就是说:

  • 公平锁:检查队列是否有节点,有就乖乖入队,保证FIFO顺序
  • 非公平锁:不检查队列是否有节点,首先CAS获取一次锁,若失败,则和公平锁一样

2.2. addWaiter

tryAcquire失败,则需要将其放入等待队列。addWaiter就是这样的操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private Node addWaiter(Node mode) {
    // 1. 创建节点
    Node node = new Node(mode);
    // 2. 进入循环,CAS添加节点到队列
    for(;;) {
        Node oldTail = tail;
        // 2.1. 当尾巴不为空,即队列存在时
        if(oldTail != null) {
            // 2.1.1 连接新创建node前驱节点
            node.setPrevRelaxed(oldTail); 
            // 2.1.2. CAS设置tail
            if(compareAndSetTail(oldTail, node)) {
                // 2.1.3. 连接oldTail的后驱节点
                oldTail.next = node;
                return node;
            }
            // NOTE: 
            // 若CAS设置成功,即可说明没有新的Node添加进来,
            // 也即说明当前新创建的node.prev,即oldTail并没有改变,
            // 我们可以直接设置oldTail.next为node
        }
        // 2.2. 否则初始化队列(只会在最初的时候发生),然后继续尝试CAS
        else {            
            initializeSyncQueue();
        }
    }
}

而初始化队列非常简单粗暴,利用CAS,设置headtail为同一个dummy node(即node.thread == null

  • 这个dummy node代表已有其它线程占用该锁,因为走到这里,是因为之前获取锁(tryAcquire)失败,必然有其它线程的占用
  • 任何线程占用该锁后,node会成为头部head,它也会变成一个dummy node,所以initializeSyncQueue方法也标识了:锁正在被其它线程占用

2.3. acquireQueued

最后是在队列中持续获取锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
final boolean acquireQueued(final Node node, int arg) {
    boolean interrupted = false;
    try {
        // 1. 循环不停尝试获取锁
        for(;;) {
            // 1.1. 获取当前node的前驱节点
            final Node p = node.predecessor();
            // 1.2. 若前驱为head,即当前节点是除head外第一个节点,则尝试获取锁
            if(p == head && tryAcquire(arg)) {
                // 若成功,则设置当前node为队列头
                setHead(node);
                p.next = null;
                return interrupted;
            }
            // 1.3. 若不是除head外第一节点,且尝试获取锁失败,则检查本线程是否需要阻塞
            if(shouldParkAfterFailedAcquire(p, node)) {
                // 若要阻塞,则阻塞线程(park),并检查线程是否中断
                // 也会在此被唤醒,从而继续循环重试获取锁
                interrupted |= parkAndCheckInterrputed();
            }
        }
    } catch(Throwable t) {
        // 2. 若获取锁时,抛出异常,如超时/中断,则取消获取锁
        // 取消锁获取,线程不会被阻塞
        cancelAcquire(node);
        if(interrputed) {
            selfInterrupt();
        }
        throw t;
    }
}

上面就是不断循环获取锁,队列中只有head.next才能获取锁

  • 获取到,则设置自己的node为队列头(并删除原有的头)

    所以,线程获取到锁后,其对应的node必定是head

  • 获取不到,通常则继续阻塞(可能产生重试),直到被唤醒。

上面有2个关键方法:

  • shouldParkAfterFailedAcquire
  • parkAndCheckInterrpted

另外还有处理异常的cancelAcquire,也是很关键的。

2.3.1. shouldParkAfterFailedAcquire

当线程没成功获取锁,则判断它是否需要挂起。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    // 1. 前驱状态为SIGNAL,则前驱节点正常(一般而言需要这个状态,从而以唤醒后继节点)
    // 因此直接挂起当前线程即可
    if(ws == Node.SIGNAL) {
    	return true;
    }
    // 2. 状态>0,说明前驱线程取消了排队
    // 因此需要跳过取消排队的节点,找到第一个正在排队的前驱节点
    if(ws > 0) {
        do {
            node.prev = (pred = pred.prev);
        } while(pred.waitStatus > 0);
        pred.next = node;
    } 
    // 3. 其它状态,即0,-3(不会是-2,原因以后会说)
    // 这时候,设置前驱状态为SIGNAL(-1),以便于唤醒后继(即当前节点)
    // 然后重试检查以阻塞当前线程
    else {
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

2.3.2. parkAndCheckInterrpted

很简单,就是阻塞/挂起线程,并检查线程是否被中断。

1
2
3
4
5
private final boolean parkAndCheckInterrupt() {
     // 这里挂起线程,底层调用Unsafe#park(isAbsolute, time)
    LockSupport.park(this);
    return Thread.interrupted();
}

2.3.3. cancelAcquire

出现异常时,线程取消锁的获取。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
private void cancelAcquire(Node node) {
    // ...
    // 1. 取消线程关联
    node.thread = null;
    // 2. 跳过CANCELLED的前驱节点
    Node pred = node.prev;
    while(pred.waitStatus > 0) {
        node.prev = (pred = pred.prev);
    }
    Node predNext = pred.next;
    // 3. 设置状态为CANCELLED
    node.waitStatus = Node.CANCELLED;
    // 4. 若当前节点是尾巴,且CAS设置尾巴为前驱节点成功(即并发下没有节点队列加入)
    // 设置前驱节点的next为空
    if(node == tail && compareAndSetTail(node, pred)) {
        pred.compareAndSetNext(predNext, null);
    } else {
        // 5. 若node不是尾巴
        int ws;
        // 5.1. 若node不是head后继节点
        // a) 检查并设置前驱节点的状态为SIGNAL
        // b) 让node前驱节点的next指向node后继节点
        if(pred != head &&
          ((ws = pred.waitStatus) == Node.SIGNAL) ||
           (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL)) && 
           pred.thread != null) {
            Node next = node.next;
            if(next != null && next.waitStatus <= 0) {
                // NOTE: 这里没有设置next.prev,跳过CANCELLED
                // 是因为这会被其它线程(即next)做:
                // a) 当next获取锁,检查自己是否要挂起时,CANCELLED节点会被跳过
                // b) 当next取消获取锁时,也会被跳过,即上面的2
                // 注意,next是不可能已经获取到锁,因为next不可能是head.next,更不可能是head
                pred.compareAndSetNext(predNext, next);
            }
        } else {
            // 5.2. 否则node为head的后继节点
            // 则直接唤醒自己的后继节点
            unparkSuccessor(node);
        }
        node.next = node;
    }
}

上述的4~6操作,node就会被分离出队列,从而取消了锁获取。

而锁获取取消后,线程就不会被阻塞,所以取消获取锁后,通常需要抛出异常

3. 解锁

解锁调用sync.release(1)的操作,逻辑大致如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public final boolean release(int arg) {
    // 1. 尝试释放锁
    if(tryRelease(arg)) {
        // 2. 若成功释放(引用计数为0),若自己的状态不为0(?)
        // 则需要唤醒后继节点
        // 因为后续节点加入后,会改变自己的waitStatus,比如2.3.1.
        Node h = head;
        if(h != null && h.waitStatus != 0) {
            unparkSuccessor(h);
        }
        return true;
    }
    return false;
}

关键方法有:

  • tryRelease
  • unparkSuccessor

3.1. tryRelease

尝试释放锁,具体逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
protected final boolean tryRelease(int releases) {
    // 1. 计算新的引用计数
    int c = getState() - releases;
    if(Thread.currentThread() != getExclusiveOwnerThread()) {
        throw new IllegalMonitorStateException();
    }
    // 2. 若为0,则释放线程所有权
    boolean free = false;
    if(c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    // 3. 设置新的引用计数
    setState(c);
    return free;
}

仅仅是一个减去引用计数而已。

3.2. unparkSuccessor

唤醒节点的后继者,这里的节点就是head,也就是自己。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    // 1. 若状态小于0,修改为0
    if(ws < 0) {
        node.compareAndSetWaitStatus(ws, 0);
    }
    // 2. 寻找没取消的后继节点
    // 2.1. 先找node的后继节点
    Node s = node.next;
    // 2.2. 若当前后继节点没被取消(CANCELLED)
    // 则从尾巴向后扫描,找到最后一个没取消的后继节点
    if(s == null || s.waitStatus > 0) {
        s = null;
        for(Node p = tail; p != node && p != null; p = p.prev) {
            if(p.waitStatus <= 0) {
                s = p;
            }
        }
    }
    // 3. 若找到没取消的后继节点,唤醒对应的线程
    if(s != null) {
        // 底层调用Unsafe#unpark(thread)
        LockSupport.unpark(s.thread);
    }
}

4. 总结

  1. AQS实际上是一个双向FIFO队列,在独占锁实现中:
    • head代表获取锁的线程
    • 其余的节点代表等待的线程,只有第一个节点才能成功抢占锁
    • state引用计数,可用于实现重入,计数使用CAS更改
  2. 锁使用一定得有lock()unlock()的匹配

  3. 大量使用CAS(如tryAcquire),低并发下,能一定程度上降低上锁的开销;而高并发下,似乎还是得入队

  4. 底层使用Unsafe#park/unpark,相比于synchronizedjstack中:

    • synchronizedBLOCKED状态
    • parkWAITING/TIMED_WAITING状态

    但是一般情况下(如Linux),OS级别对应的是同一种状态

  5. 对于公平锁/非公平锁,后面还会提及

    • 公平:获取时检查队列是否有节点,因此获取顺序是FIFO的
    • 非公平:没有检查队列的过程,可能在某个线程release后,后继节点被唤醒并尝试获取锁前,这个间隙中,线程可以抢占并获取到锁;但是入队了后,队列中线程的获取顺序依旧是FIFO,即和公平锁一样