AQS 概念
AbstractQueuedSynchronizer(AQS) 是 Java 并发包 (java.util.concurrent.locks
) 的核心基础框架,它的实现关键是先进先出 (FIFO) 等待队列和一个用volatile修饰的锁状态status。具体实现有 : ReentrantLock、Semaphore、CountDownLatch、ReentrantReadWriteLock 等常用的同步工具类。
AQS 的核心思想
AQS 的核心思想是 : 如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制 AQS 是用 CLH 队列锁 实现的,即将暂时获取不到锁的线程加入到队列中。
CLH 队列:Craig, Landin, and Hagersten (CLH) locks,是一种基于链表的可扩展、高性能、公平的自旋锁。AQS 中的队列是 CLH 变体的虚拟双向队列(FIFO)。
AQS 的核心架构
- AQS 使用一个 volatile int state 成员变量来表示同步状态
- 通过内置的 FIFO 队列 来完成资源获取线程的排队工作。
- Node中的thread变量用来存放进入AQS队列里面的线程,Node节点内部:
- prev记录当前节点的前驱节点
- next 记录当前节点的后继节点
- SHARED用来标记该线程是获取共享资源时被阻塞挂起后放入AQS队列的
- EXCLUSIVE用来标记线程是获取独占资源时被挂起后放入AQS队列的
- waitStatus 记录当前线程等待状态,可以为
- CANCELLED (线程被取消了)
- SIGNAL(线程需要被唤醒)
- CONDITION(线程在CONDITION条件队列里面等待)
- PROPAGATE(释放共享资源时需要要通知其他节点);
图中源码解释 :
static final int WAITING = 1; // 等待状态,必须为1
static final int CANCELLED = 0x80000000; // 取消状态,必须为负数(最高位为1)
static final int COND = 2; // 在条件等待中/** CLH 节点 */
abstract static class Node {volatile Node prev; // 前驱节点,最初通过 casTail 附加volatile Node next; // 当可发出信号时 visibly nonnullThread waiter; // 当入队时 visibly nonnullvolatile int status; // 由所有者写入,其他线程通过原子位操作读取// 用于 cleanQueue 的比较并交换前驱节点final boolean casPrev(Node c, Node v) {return U.weakCompareAndSetReference(this, PREV, c, v);}// 用于 cleanQueue 的比较并交换后继节点final boolean casNext(Node c, Node v) {return U.weakCompareAndSetReference(this, NEXT, c, v);}// 用于信号传递的获取并清除状态位final int getAndUnsetStatus(int v) {return U.getAndBitwiseAndInt(this, STATUS, ~v);}// 用于离队赋值的宽松设置前驱节点final void setPrevRelaxed(Node p) {U.putReference(this, PREV, p);}// 用于离队赋值的宽松设置状态final void setStatusRelaxed(int s) {U.putInt(this, STATUS, s);}// 用于减少不必要信号的状态清除final void clearStatus() {U.putIntOpaque(this, STATUS, 0);}// 内存偏移量常量private static final long STATUS = U.objectFieldOffset(Node.class, "status");private static final long NEXT = U.objectFieldOffset(Node.class, "next");private static final long PREV = U.objectFieldOffset(Node.class, "prev");
}// 按类型标记的具体节点类
static final class ExclusiveNode extends Node { } // 独占模式节点
static final class SharedNode extends Node { } // 共享模式节点// 条件节点,实现 ManagedBlocker 接口用于 ForkJoinPool
static final class ConditionNode extends Node implements ForkJoinPool.ManagedBlocker {ConditionNode nextWaiter; // 链接到下一个等待节点/*** 允许条件在 ForkJoinPools 中使用,而不会冒险耗尽固定池。* 这仅适用于非定时条件等待,不适用于定时版本。*/public final boolean isReleasable() {return status <= 1 || Thread.currentThread().isInterrupted();}public final boolean block() {while (!isReleasable()) LockSupport.park();return true;}
}// 等待队列头节点,延迟初始化
private transient volatile Node head;// 等待队列尾节点。初始化后,仅通过 casTail 修改
private transient volatile Node tail;// 同步状态
private volatile int state;/*** 返回同步状态的当前值。* @return 当前状态值*/
protected final int getState() {return state;
}
AQS 底层数据结构
state
state
是 AQS 的核心字段,表示共享资源的状态。不同的同步器对 state
的解释不同:
- ReentrantLock:
state
表示当前线程获取锁的可重入次数(如果发现可以可重入,则state+1; 执行完成则-1; 为0时可以释放) - Semaphore:
state
表示当前可用信号的个数 - CountDownLatch:
state
表示计数器当前的值
AQS 提供了一系列访问 state 的方法:
// 获取当前同步状态
protected final int getState() {return state;
}// 设置同步状态
protected final void setState(int newState) {state = newState;
}// 使用CAS原子性地设置同步状态
protected final boolean compareAndSetState(int expect, int update) {return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
同步队列:Node 节点
AQS 内部维护了一个 FIFO 双向队列,队列中的每个元素都是一个 Node
对象:
static final class Node {// 节点模式:共享模式static final Node SHARED = new Node();// 节点模式:独占模式static final Node EXCLUSIVE = null;// 等待状态:线程已取消static final int CANCELLED = 1;// 等待状态:后继节点的线程需要被唤醒static final int SIGNAL = -1;// 等待状态:节点在条件队列中等待static final int CONDITION = -2;// 等待状态:下一次acquireShared应无条件传播static final int PROPAGATE = -3;// 当前节点的等待状态volatile int waitStatus;// 前驱节点volatile Node prev;// 后继节点volatile Node next;// 节点关联的线程volatile Thread thread;// 指向下一个等待条件或共享模式的节点Node nextWaiter;// 判断节点是否在共享模式下等待final boolean isShared() {return nextWaiter == SHARED;}// 获取前驱节点final Node predecessor() throws NullPointerException {Node p = prev;if (p == null)throw new NullPointerException();elsereturn p;}// 构造方法Node() { // Used to establish initial head or SHARED marker}Node(Thread thread, Node mode) { // Used by addWaiterthis.nextWaiter = mode;this.thread = thread;}Node(Thread thread, int waitStatus) { // Used by Conditionthis.waitStatus = waitStatus;this.thread = thread;}
}
AQS 的整体结构
AQS 的整体结构如下图所示:
线程获取锁失败时的 AQS 队列变化
初始状态
当没有线程竞争锁时,AQS 的同步队列处于初始状态,head
和 tail
都为 null
:
// 初始状态
head = null;
tail = null;
第一个线程获取锁失败
当第一个线程尝试获取锁失败时,AQS 会初始化同步队列:
- 创建空节点(dummy node)作为头节点
- 将当前线程包装成 Node 节点添加到队列尾部
// addWaiter 方法:将当前线程包装成Node并加入队列
private Node addWaiter(Node mode) {// 将当前线程包装成Node节点Node node = new Node(Thread.currentThread(), mode);// 尝试快速入队Node pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}// 快速入队失败,使用enq方法自旋入队enq(node);return node;
}// enq方法:通过自旋CAS确保节点成功入队
private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // 队列未初始化// 创建空节点(dummy node)作为头节点if (compareAndSetHead(new Node()))tail = head; // 头尾都指向空节点} else {// 将新节点添加到队列尾部node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}
}
初始化后的队列结构:
head → [dummy node] ← tail↑新加入的节点
后续线程获取锁失败
当后续线程尝试获取锁失败时,它们会被添加到队列的尾部:
// acquireQueued方法:线程在队列中自旋获取资源或阻塞
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {// 获取前驱节点final Node p = node.predecessor();// 如果前驱是头节点,尝试获取资源if (p == head && tryAcquire(arg)) {// 获取成功,设置当前节点为头节点setHead(node);p.next = null; // help GC (方便垃圾回收)failed = false;return interrupted;}// 判断是否应该阻塞,并在需要时安全地阻塞if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}
}
多个线程竞争后的队列结构 :
线程被唤醒时的 AQS 队列变化
锁释放与线程唤醒
当持有锁的线程释放资源时,会唤醒队列中的下一个线程:
// release方法:释放独占模式下的资源
public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h); // 唤醒后继节点return true;}return false;
}// unparkSuccessor方法:唤醒指定节点的后继节点
private void unparkSuccessor(Node node) {int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0); // 清除状态// 查找下一个需要唤醒的节点Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;// 从尾部向前查找,找到队列中最前面且未取消的节点for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread); // 唤醒线程
}
4.2 线程被唤醒后的队列变化
当队列中的线程被唤醒后,它会尝试获取锁,如果获取成功,会将自己设置为新的头节点:
// setHead方法:将节点设置为头节点
private void setHead(Node node) {head = node;node.thread = null; // 清空线程引用node.prev = null;
}
原来的头节点(dummy node)会被垃圾回收,新头节点的 thread
字段被设置为 null
,表示它不再关联任何线程。
公平锁与非公平锁
AQS 支持两种锁获取模式:公平锁和非公平锁。
非公平锁(如 ReentrantLock
的默认实现):
final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {// 不管队列中是否有等待线程,直接尝试获取锁if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}// 重入逻辑...
}
公平锁:
protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {// 只有队列中没有等待线程或当前线程是队列中的第一个时才获取锁if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}// 重入逻辑...
}
hasQueuedPredecessors()
方法检查队列中是否有比当前线程等待时间更长的线程,这是实现公平性的关键。
AQS 的设计模式
模板方法模式
AQS 使用了模板方法模式,定义了同步器的主要逻辑骨架,而将一些特定操作留给子类实现。以下是需要子类实现的方法:
方法名 | 描述 |
---|---|
boolean tryAcquire(int arg) | 尝试以独占方式获取资源 |
boolean tryRelease(int arg) | 尝试释放独占资源 |
int tryAcquireShared(int arg) | 尝试以共享方式获取资源 |
boolean tryReleaseShared(int arg) | 尝试释放共享资源 |
boolean isHeldExclusively() | 判断当前线程是否独占资源 |
以 ReentrantLock 为例的实现
ReentrantLock
中的同步器 Sync
继承自 AQS,并实现了相关方法:
abstract static class Sync extends AbstractQueuedSynchronizer {// 尝试获取锁abstract void lock();// 非公平尝试获取final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}// 尝试释放锁protected final boolean tryRelease(int releases) {int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;}// 判断当前线程是否独占资源protected final boolean isHeldExclusively() {return getExclusiveOwnerThread() == Thread.currentThread();}
}
优势
- 代码复用:AQS 提供了通用的同步框架,多个同步器可以复用相同的队列管理逻辑
- 职责分离:AQS 负责线程管理,子类专注于状态管理
装饰器模式
装饰器模式动态地给一个对象添加一些额外的职责,就增加功能来说,装饰器模式相比生成子类更为灵活。
在 AQS 中的实现
AQS 中的 Node
类可以看作是对 Thread
的装饰,它添加了同步所需的额外信息:
static final class Node {// 节点模式:共享或独占volatile Node nextWaiter;// 等待状态volatile int waitStatus;// 前驱和后继指针volatile Node prev;volatile Node next;// 被装饰的线程对象volatile Thread thread;// 构造方法 - 装饰线程Node(Thread thread, Node mode) { // Used by addWaiterthis.nextWaiter = mode;this.thread = thread;}
}
优势
- 功能扩展:在不修改
Thread
类的情况下为其添加同步功能 - 灵活性:可以根据需要为线程添加不同的同步属性
状态模式
状态模式允许一个对象在其内部状态改变时改变它的行为。
在 AQS 中的实现
AQS 中节点的 waitStatus
字段代表了不同的状态,每种状态对应不同的行为:
static final class Node {// 状态常量static final int CANCELLED = 1; // 取消状态static final int SIGNAL = -1; // 需要唤醒后继节点static final int CONDITION = -2; // 在条件队列中等待static final int PROPAGATE = -3; // 传播唤醒操作// 状态字段volatile int waitStatus;
}
根据状态的不同,AQS 采取不同的处理策略:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;if (ws == Node.SIGNAL) // 根据状态决定行为return true;if (ws > 0) { // 处理取消状态do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;
}
AQS 的使用
ReentrantLock 的实现
ReentrantLock
通过内部类 Sync
继承 AQS,实现了可重入的独占锁:
public class ReentrantLock implements Lock, java.io.Serializable {private final Sync sync;abstract static class Sync extends AbstractQueuedSynchronizer {}// 非公平锁实现static final class NonfairSync extends Sync {final void lock() {if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}}// 公平锁实现static final class FairSync extends Sync {final void lock() {acquire(1);}protected final boolean tryAcquire(int acquires) {// 公平获取逻辑}}
}
CountDownLatch 的实现
CountDownLatch
通过内部类 Sync
继承 AQS,实现了共享模式的同步器:
public class CountDownLatch {private static final class Sync extends AbstractQueuedSynchronizer {Sync(int count) {setState(count);}int getCount() {return getState();}protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}protected boolean tryReleaseShared(int releases) {for (;;) {int c = getState();if (c == 0)return false;int nextc = c - 1;if (compareAndSetState(c, nextc))return nextc == 0;}}}
}