发布于

AQS 源码解析

作者

AQS(AbstractQueuedSynchronizer)是由 Doug Lea 大神实现的一套基于模板模式(Template Pattern)的同步框架,其只在逻辑上实现了锁同步机制,具体效果如何取决于子类根据需要对预留的模板方法的实现。一个典型的例子是 ReentrantLock 内部持有了一个实现 AQS 部分预留方法的子类 Sync,该子类根据 ReentrantLock 的特性实现了公平锁和非公平锁的逻辑,ReentrantLock 在加锁和释放锁时会调用其内部持有的 Sync 类的相关方法。J.U.C 包中大部分软件层面实现的锁机制都是基于 AQS 实现的,代码质量很高,非常值得一看。

为了简明性,下面对于 AQS 源码的解析中并没有对 AQS 队列的 Node 节点定义、Node 节点组成的双向链表,state 的一些特殊状态等细枝末节进行分析,而是从抽象角度进行介绍,如果想要了解具体细节可以尝试自行阅读相关代码。

AQS 包含两个主要部分:一个由双向链表实现的阻塞队列,以及一个用来表示当前锁状态的全局状态信息。线程在尝试获取锁时首先会通过 CAS 操作来设置锁的状态信息,如果设置成功则将当前线程和 AQS 绑定,如果设置失败则会将当前线程加入到阻塞队列中。我们之前提到,AQS 是一个同步框架,只在上层规定了加锁和解锁的逻辑,具体实现依赖于子类,比如对于 tryAcquiretryRelease 等方法,其默认抛出 UnsupportedOperationException() 异常:

// AbstractQueuedSynchronizer.tryAcquire
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}
// AbstractQueuedSynchronizer.tryRelease
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

基于此,下面我们对 AQS 源码的分析主要基于 ReentrantLock。首先看 ReentrantLock 是如何实现加锁逻辑,即 lock 方法的:

public ReentrantLock() {
    sync = new NonfairSync();
}

public void lock() {
    sync.lock();
}

可以看到 ReentrantLock 默认的加锁逻辑是非公平的,在加锁时会调用 Sync 类(AQS 的子类)的 lock 方法:

abstract static class Sync extends AbstractQueuedSynchronizer {
    // 省略了部分代码逻辑,只保留了 Sync 类的核心代码
    abstract void lock();

    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }
}

static final class NonfairSync extends Sync {
    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

static final class FairSync extends Sync {

    final void lock() {
        acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

可以看到首先 NonfairSync 类实现了 Sync 类的 lock 方法,会先通过 CAS 的方式尝试通过修改 AQS 的状态以获取锁,如果失败则执行,而 FairSync 类在加锁时候少了一次 CAS 尝试的过程,并没有发生插队,因而是公平的,这一点在文章后续内容中有所体现。

下面我们来看 acquire 方法的实现细节:

// AbstractQueuedSynchronizer.acquire
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

可以看到 acquire 方法(属于 AQS 定义的模板方法,只调用接口,具体细节依赖于子类,这里的继承链是 NonfairSync -> Sync -> AQS)比较复杂,一个 if 语句里面套了 4 个方法,我们一个一个来看,首先是 tryAcquire 方法,这里调用的是 NonfairSync 类的具体实现:

// NonfairSync.tryAcquire
protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

接着看nonfairTryAcquire() 方法:

// Sync.nonfairTryAcquire
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState(); // 获取 AQS 当前状态,如果是 0 则表明目前没有被加锁,如果是 state >= 1 则表明已被加锁
    if (c == 0) {
        // 尝试通过 CAS 的方式竞争修改 AQS 状态以竞争锁
        if (compareAndSetState(0, acquires)) {
            // 竞争锁成功,设置持有 AQS 锁的线程为当前线程
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        // 如果当前线程想要再加一次锁,则需要将 state 添加一定的数值,这里是 1
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

可以看到,线程在尝试获取 ReentrantLock 时首先会检查当前 ReentrantLock 内部的 AQS 的状态,如果是 0 则表明目前没有被加锁,如果是 state >= 1 则表明已被加锁,此外由于要实现 ReentrantLock 可重入的特性,如果当前 AQS state > 0,则判断当前持有锁的线程是否是自己,如果是表明是对锁进行了重入,这里将 state 变为了 state + acquires,这里的 acquires 是 1(可以类比生产者消费者模型里面,生产者每生产一个消息都会将信号量 + 1,这里的 state 就承担的信号量的作用,当然 acquires 也可以是其他值,根据实现的锁逻辑可以设置不同的数值)。因此每次线程重复加锁时都会将 ReentrantLock 内部 AQS 的 state + 1。这里加锁成功会返回 true,失败会返回 false。

如果 tryAcquire 获取锁失败,会执行 addWaiter 方法将当前线程加入到 AQS 阻塞队列中:

// AbstractQueuedSynchronizer.addWaiter
private Node addWaiter(Node mode) {
    // 首先创建当前线程在 AQS 队列中对应的 Node 对象
    Node node = new Node(Thread.currentThread(), mode);

    // 获取 AQS 队列的尾节点
    Node pred = tail;
    // 如果当前队列不为空,则尝试将队列放到 AQS 队列的末尾
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 如果入队失败则继续尝试入队
    enq(node);
    return node;
}

// AbstractQueuedSynchronizer.enq
private Node enq(final Node node) {
    // 自旋,直到入队成功
    for (;;) {
        Node t = tail;
        // 如果当前队列为空,即未初始化,则尝试将当前节点设置 AQS 队列的头结点
        if (t == null) {
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 否则尝试将当前节点设为 AQS 队列的尾结点
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

可以看到这里首先会创建当前线程在 AQS 队列中对应的 Node 对象,然后尝试将当前队列加入到 AQS 队列,如果失败则加入到 enq 方法进行自旋尝试,直到入队成功,这里其实可以发现两个操作实际上是重复的,这里可能是为了性能考量,先以乐观的方式尝试入队,如果成功就直接返回,失败了的话就通过 enq 方法自旋入队。自旋入队的代码也很有意思,这里并没有任何锁的逻辑来处理多个线程同时入队的情况,而是每轮尝试入队前都先获取当前 AQS 队列最新的尾结点,如果通过 CAS 操作入队成功,则直接返回,入队失败则在下一轮入队时更新当前队列的尾结点,通过这种方式保证了只要线程入队成功,AQS 队列的指针状态是正确的,巧妙的避免了多线程并发修改队列尾结点可能导致的问题。

addWaiter 方法将当前线程入队后会返回当前线程在 AQS 队列中对应的 Node 节点,并将该 Node 节点作为参数传入 acquireQueued 方法:

// AbstractQueuedSynchronizer.acquireQueued
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
   		// 自旋
        for (;;) {
      		// 当前节点的前置节点
            final Node p = node.predecessor();
            // 如果当前节点的上一个节点是 AQS 的队头,且通过 tryAcquire 方法获取锁成功,
            // 则将当前节点设为 AQS 队列的头结点并返回
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 如果获取锁失败则会通过 park 方法阻塞当前线程
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

可以看到,入队后节点会进入一个自旋的 for 循环中并被 park 方法阻塞,至此完成了加锁的全部流程,如果线程被 unpark 方法唤醒后则会继续尝试获取锁,直到获取锁成功。这里有一点需要注意的是,unpark 方法在调用时需要指定唤醒的线程,理论上指定唤醒的线程应该是 AQS 队列的队头的节点,这里为何要多此一举,通过自旋的方式调用 tryAcquire 获取锁而不是直接默认头结点为获取到锁的节点?还记得我们上面提到的非公平锁的加锁逻辑吗,其加锁时会先 CAS 尝试获取锁,如果失败了才去按照公平锁的方式排队。那么,在 AQS 刚释放锁且在 AQS 队列排队的下一个节点被 unpark 唤醒尝试获取锁的间隔中有一个刚来的线程通过非公平的方式获取锁,那么此时竞争锁的对象就有两种,一种是队列中刚被唤醒的排队节点,一种是第一次尝试获取锁的节点,这里对于第一次尝试获取锁的节点而言,其先试着插了一次队,这个插队的动作实际上就实现了非公平锁的加锁逻辑。

下面我们来看 ReentrantLock 释放锁的方法实现 unlock

// ReentrantLock.unlock
public void unlock() {
    sync.release(1);
}

// Sync.release
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

// Sync.tryRelease
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    // 没有持有锁的线程调用了释放锁的逻辑,抛出异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    // 如果 AQS 队列状态为 0,表明锁已经释放成功
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

释放锁的逻辑相对而言就很简单了,持有锁的线程尝试修改 AQS 的状态 state = state - releases,对于重入锁而言,需要在锁重入的逻辑执行完成后调用一次 release 方法,否则 AQS 队列的状态无法减到 0,进而导致锁无法被正常释放。如果 tryRelease 成功,则会调用 unparkSuccessor 方法唤醒当前队列中最靠前的那个节点。

至此,我们通过跟踪 ReentrantLock 的加锁和解锁的方法调用链解析了 AQS 核心代码的工作逻辑。可以看到,AQS 代码的实现还是非常精妙的,在 AQS 框架的基础上可以轻松扩展实现各类加锁/解锁逻辑。这里其实有一个挺有意思的点可以思考一下:为何 AQS 不采用 Java 原本存在的线程阻塞和唤醒机制,比如 waitnotify/notifyAll 来阻塞和唤醒 AQS 中的节点,而是另外在 JVM 中实现了一套基于信号量的 parkunpark 机制?理由其实很简单,首先 waitnotify 的实现是基于 monitor 的,需要配合 synchronized 关键字来使用。其次,waitnotify 无法指定唤醒哪一个被阻塞线程,进而无法实现 AQS 的队列机制(unpark 则在调用时必须指定唤醒某个特定线程)。