Java 中队列同步器 AQS(AbstractQueuedSynchronizer)实现原理
source link: http://www.mghio.cn/post/4b00e13c.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
Java 中队列同步器 AQS(AbstractQueuedSynchronizer)实现原理
在 Java
中通过 锁
来控制多个线程对共享资源的访问,使用 Java
编程语言开发的朋友都知道,可以通过 synchronized
关键字来实现锁的功能,它可以隐式的获取锁,也就是说我们使用该关键字并不需要去关心锁的获取和释放过程,但是在提供方便的同时也意味着其灵活性的下降。例如,有这样的一个场景,先获取锁 A,然后再获取锁 B,当锁 B 获取到之后,释放锁 A 同时获取锁 C,当获取锁 C 后,再释放锁 B 同时获取锁 D,依次类推,像这种比较复杂的场景,使用 synchronized
关键字就比较难实现了。
在 Java SE 5
之后,新增加了 Lock
接口和一系列的实现类来提供和 synchronized
关键字一样的功能,它需要我们显示的进行锁的获取和释放,除此之外还提供了可响应中断的锁获取操作以及超时获取锁等同步特性。JDK
中提供的 Lock
接口实现类大部分都是聚合一个同步器 AQS 的子类来实现多线程的访问控制的,下面我们看看这个构建锁和其它同步组件的基础框架——队列同步器 AQS(AbstractQueuedSynchronizer)
。
AQS 基础数据结构
队列同步器 AQS
(下文简称为同步器)主要是依赖于内部的一个 FIFO(first-in-first-out)双向队列来对同步状态进行管理的,当线程获取同步状态失败时,同步器会将当前线程和当前等待状态等信息封装成一个内部定义的节点 Node
,然后将其加入队列,同时阻塞当前线程;当同步状态释放时,会将同步队列中首节点唤醒,让其再次尝试去获取同步状态。同步队列的基本结构如下:
队列节点 Node
同步队列使用同步器中的静态内部类 Node
用来保存获取同步状态的线程的引用、线程的等待状态、前驱节点和后继节点。
同步队列中 Node
节点的属性名称和具体含义如下表所示:
SHARED
常量
每个节点线程都有两种锁模式,分别为 SHARED
表示线程以共享的模式等待锁,EXCLUSIVE
表示线程以独占的方式等待锁。同时每个节点的等待状态 waitStatus
只能取以下表中的枚举值:
Condition
上,等待被其它线程唤醒
PROPAGATE
值为 -3,表示下一次共享同步状态获取会无限进行下去,只在 SHARED
情况下使用
0
值为 0,初始状态,初始化的默认值
同步状态 state
同步器内部使用了一个名为 state
的 int
类型的变量表示同步状态,同步器的主要使用方式是通过继承,子类通过继承并实现它的抽象方法来管理同步状态,同步器给我们提供了如下三个方法来对同步状态进行更改。
CAS
设置当前状态,该方法能够保证状态设置的原子性
在独享锁中同步状态 state
这个值通常是 0 或者 1(如果是重入锁的话 state
值就是重入的次数),在共享锁中 state
就是持有锁的数量。
独占式同步状态获取与释放
同步器中提供了 acquire(int arg)
方法来进行独占式同步状态的获取,获取到了同步状态也就是获取到了锁,该方法源码如下所示:
1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
方法首先会调用 tryAcquire
方法尝试去获取锁,查看方法的源码可以发现,同步器并未对该方法进行实现(只是抛出一个不支持操作异常 UnsupportedOperationException
),这个方法是需要后续同步组件的开发人员自己去实现的,如果方法返回 true
则表示当前线程成功获取到锁,调用 selfInterrupt()
中断当前线程(PS:这里留给大家一个问题:为什么获取了锁以后还要中断线程呢?
),方法结束返回,如果方法返回 false
则表示当前线程获取锁失败,也就是说有其它线程先前已经获取到了锁,此时就需要把当前线程以及等待状态等信息添加到同步队列中,下面来看看同步器在线程未获取到锁时具体是如何实现。
通过源码发现,当获取锁失败时,会执行判断条件与操作的后半部分 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
,首先指定锁模式为 Node.EXCLUSIVE
调用 addWaiter
方法,该方法源码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
通过方法参数指定的锁模式(共享锁 or 独占锁)和当前线程构造出一个 Node
节点,如果同步队列已经初始化,那么首先会进行一次从尾部加入队列的尝试,使用 compareAndSetTail
方法保证原子性,进入该方法源码可以发现是基于 sun.misc
包下提供的 Unsafe
类来实现的。如果首次尝试加入同步队列失败,会再次调用 enq
方法进行入队操作,继续跟进 enq
方法源码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
通过其源码可以发现和第一次尝试加入队列的代码类似,只是该方法里面加了同步队列初始化判断,使用 compareAndSetHead
方法保证设置头节点的原子性,同样它底层也是基于 Unsafe
类,然后外层套了一个 for (;;)
死循环,循环唯一的退出条件是从队尾入队成功,也就是说如果从该方法成功返回了就表示已经入队成功了,至此,addWaiter
执行完毕返回当前 Node
节点。然后以该节点作为 acquireQueued
方法的入参继续进行其它步骤,该方法如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
可以看到,该方法本质上也是通过一个死循环(自旋)去获取锁并且支持中断,在循环体外面定义两个标记变量,failed
标记是否成功获取到锁,interrupted
标记在等待的过程中是否被中断过。方法首先通过 predecessor
获取当前节点的前驱节点,当当前节点的前驱节点是 head
头节点时就调用 tryAcquire
尝试获取锁,也就是第二个节点则尝试获取锁,这里为什么要从第二个节点才尝试获取锁呢?是因为同步队列本质上是一个双向链表
,在双向链表中,第一个节点并不存储任何数据是虚节点,只是起到一个占位的作用,真正存储数据的节点是从第二个节点开始的。如果成功获取锁,也就是 tryAcquire
方法返回 true
后,将 head
指向当前节点并把之前找到的头节点 p
从队列中移除,修改是否成功获取到锁标记,结束方法返回中断标记。
如果当前节点的前驱节点 p
不是头节点或者前驱节点 p
是头节点但是获取锁操作失败,那么会调用 shouldParkAfterFailedAcquire
方法判断当前 node
节点是否需要被阻塞,这里的阻塞判断主要是为了防止长时间自旋给 CPU
带来非常大的执行开销,浪费资源。该方法源码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
方法参数为当前节点的前驱节点以及当前节点,主要是靠前驱节点来判断是否需要进行阻塞,首先获取到前驱节点的等待状态 ws
,如果节点状态 ws
为 SIGNAL
,表示前驱节点的线程已经准备完毕,等待资源释放,方法返回 true
表示可以阻塞,如果 ws > 0
,通过上文可以知道节点只有一个状态 CANCELLED(值为 1)
满足该条件,表示该节点线程获取锁的请求已经取消了,会通过一个 do-while
循环向前查找 CANCELLED
状态的节点并将其从同步队列中移除,否则进入 else
分支,使用 compareAndSetWaitStatus
原子操作将前驱节点的等待状态修改为 SIGNAL
,以上这两种情况都不需要进行阻塞方法返回 false
。
当经过判断后需要阻塞的话,也就是 compareAndSetWaitStatus
方法返回 true
时,会通过 parkAndCheckInterrupt
方法阻塞挂起当前线程,并返回当前线程的中断标识。方法如下:
1
2
3
4
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
线程阻塞是通过 LockSupport
这个工具类实现的,深入其源码可以发现它底层也是基于 Unsafe
类实现的。如果以上两个方法都返回 true
的话就更新中断标记。这里还有一个问题就是什么时候会将一个节点的等待状态 waitStatus
修改为 CANCELLED
节点线程获取锁的请求取消状态呢?细心的朋友可能已经发现了,在上文贴出的 acquireQueued
方法源码中的 finally
块中会根据 failed
标记来决定是否调用 cancelAcquire
方法,这个方法就是用来将节点状态修改为 CANCELLED
的,方法的具体实现留给大家去探索。至此 AQS
独占式同步状态获取锁的流程就完成了,下面通过一个流程图来看看整体流程:
下面再看看独占式锁释放的过程,同步器使用 release
方法来让我们进行独占式锁的释放,其方法源码如下:
1
2
3
4
5
6
7
8
9
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
首先调用 tryRelease
方法尝试进行锁释放操作,继续跟进该方法发现同步器只是抛出了一个不支持操作异常 UnsupportedOperationException
,这里和上文独占锁获取中 tryAcquire
方法是一样的套路,需要开发者自己定义锁释放操作。
通过其 JavaDoc
可以得知,如果返回 false
,则表示释放锁失败,方法结束。该方法如果返回 true
,则表示当前线程释放锁成功,需要通知队列中等待获取锁的线程进行锁获取操作。首先获取头节点 head
,如果当前头节点不为 null
,并且其等待状态不是初始状态(0),则解除线程阻塞挂起状态,通过 unparkSuccessor
方法实现,该方法源码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
首先获取头节点的等待状态 ws
,如果状态值为负数(Node.SIGNAL or Node.PROPAGATE),则通过 CAS 操作将其改为初始状态(0),然后获取头节点的后继节点,如果后继节点为 null
或者后继节点状态为 CANCELLED
(获取锁请求已取消),就从队列尾部开始寻找第一个状态为非 CANCELLED
的节点,如果该节点不为空则使用 LockSupport
的 unpark
方法将其唤醒,该方法底层是通过 Unsafe
类的 unpark
实现的。这里需要从队尾查找非 CANCELLED
状态的节点的原因是,在之前的获取独占锁失败时的入队 addWaiter
方法实现中,该方法如下:
假设一个线程执行到了上图中的 ① 处,② 处还没有执行,此时另一个线程恰好执行了 unparkSuccessor
方法,那么就无法通过从前向后查找了,因为节点的后继指针 next
还没赋值呢,所以需要从后往前进行查找。至此,独占式锁释放操作就结束了,同样的,最后我们也通过一个流程图来看看整个锁释放的过程:
独占式可中断同步状态获取
同步器提供了 acquireInterruptibly
方法来进行可响应中断的获取锁操作,方法实现源码如下:
1
2
3
4
5
6
7
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
方法首先检查当前线程的中断状态,如果已中断,则直接抛出中断异常 InterruptedException
即响应中断,否则调用 tryAcquire
方法尝试获取锁,如果获取成功则方法结束返回,获取失败调用 doAcquireInterruptibly
方法,跟进该方法如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
仔细观察可以发现该方法实现源码和上文中 acquireQueued
方法的实现基本上类似,只是这里把入队操作 addWaiter
放到了方法里面了,还有一个区别就是当在循环体内判断需要进行中断时会直接抛出异常来响应中断,两个方法的对比如下:
其它步骤和独占式锁获取一致,流程图大体上和不响应中断的锁获取差不多,只是在最开始多了一步线程中断状态检查和循环是会抛出中断异常而已。
独占式超时获取同步状态
同步器提供了 tryAcquireNanos
方法可以超时获取同步状态(也就是锁
),该方法提供了之前 synchronized
关键字不支持的超时获取的特性,通过该方法我们可以在指定时间段 nanosTimeout
内获取锁,如果获取到锁则返回 true
,否则,返回 false
。方法源码如下:
1
2
3
4
5
6
7
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
首先会调用 tryAcquire
方法尝试获取一次锁,如果获取锁成功则立即返回,否则调用 doAcquireNanos
方法进入超时获取锁流程。通过上文可以得知,同步器的 acquireInterruptibly
方法在等待获取同步状态时,如果当前线程被中断了,会抛出中断异常 InterruptedException
并立刻返回。超时获取锁的流程其实是在响应中断的基础上增加了超时获取的特性,doAcquireNanos
方法的源码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
由以上方法实现源码可以看出,针对超时获取这里主要实现思路是:先使用当前时间加上参数传入的超时时间间隔 deadline
计算出超时的时间点,然后每次进行循环的时候使用超时时间点 deadline
减去当前时间得到剩余的时间 nanosTimeout
,如果剩余时间小于 0 则证明当前获取锁操作已经超时,方法结束返回 false
,反如果剩余时间大于 0。
可以看到在里面执行自旋的时候和上面独占式同步获取锁状态 acquireQueued
方法那里是一样的套路,即当当前节点的前驱节点为头节点时调用 tryAcquire
尝试获取锁,如果获取成功则返回。
除了超时时间计算那里不同外,还有个不同的地方就是在超时获取锁失败之后的操作,如果当前线程获取锁失败,则判断剩余超时时间 nanosTimeout
是否小于 0,如果小于 0 则表示已经超时方法立即返回,反之则会判断是否需要进行阻塞挂起当前线程,如果通过 shouldParkAfterFailedAcquire
方法判断需要挂起阻塞当前线程,还要进一步比较超时剩余时间 nanosTimeout
和 spinForTimeoutThreshold
的大小,如果小于等于 spinForTimeoutThreshold
值(1000 纳秒)的话,将不会使当前线程进行超时等待,而是再次进行自旋过程。
加后面这个判断的主要原因在于,在非常短(小于 1000 纳秒)的时间内的等待无法做到十分精确,如果这时还进行超时等待的话,反而会让我们指定 nanosTimeout
的超时从整体上给人感觉反而不太精确,因此,在剩余超时时间非常短的情况下,同步器会再次自旋进行超时获取锁的过程,独占式超时获取锁整个过程如下所示:
共享式同步状态获取与释放
共享锁
顾名思义就是可以多个线程共用一个锁,在同步器中使用 acquireShared
来获取共享锁(同步状态),方法源码如下:
1
2
3
4
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
首先通过 tryAcquireShared
尝试获取共享锁,该方法是一个模板方法在同步器中只是抛出一个不支持操作异常,需要开发人员自己去实现,同时方法的返回值有三种不同的类型分别代表三种不同的状态,其含义如下:
- 小于 0 表示当前线程获取锁失败
- 等于 0 表示当前线程获取锁成功,但是之后的线程在没有锁释放的情况下获取锁将失败,也就是说这个锁是共享模式下的最后一把锁了
- 大于 0 表示当前线程获取锁成功,并且还有剩余的锁可以获取
当方法 tryAcquireShared
返回值小于 0 时,也就是获取锁失败,将会执行方法 doAcquireShared
,继续跟进该方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
方法首先调用 addWaiter
方法封装当前线程和等待状态为共享模块的节点并将其添加到等待同步队列中,可以发现在共享模式下节点的 nextWaiter
属性是固定值 Node.SHARED
。然后循环获取当前节点的前驱节点,如果前驱节点是头节点的话就尝试获取共享锁,如果返回值大于等于 0 表示获取共享锁成功,则调用 setHeadAndPropagate
方法,更新头节点同时如果有可用资源,则向后传播,唤醒后继节点,接下来会检查一下中断标识,如果已经中断则中断当前线程,方法结束返回。如果返回值小于 0,则表示获取锁失败,需要挂起阻塞当前线程或者继续自旋获取共享锁。下面看看 setHeadAndPropagate
方法的具体实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
首先将当前获取到锁的节点设置为头节点,然后方法参数 propagate > 0
时表示之前 tryAcquireShared
方法的返回值大于 0,也就是说当前还有剩余的共享锁可以获取,则获取当前节点的后继节点并且后继节点是共享节点时唤醒节点去尝试获取锁,doReleaseShared
方法是同步器共享锁释放的主要逻辑。
同步器提供了 releaseShared
方法来进行共享锁的释放,方法源码如下所示:
1
2
3
4
5
6
7
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
首先调用 tryReleaseShared
方法尝试释放共享锁,方法返回 false
代表锁释放失败,方法结束返回 false
,否则就表示成功释放锁,然后执行 doReleaseShared
方法,进行唤醒后继节点并检查它是否可以向后传播等操作。继续跟进该方法如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
可以看到和独占式锁释放不同的是,在共享模式下,状态同步和释放可以同时执行,其原子性由 CAS
来保证,如果头节点改变了也会继续循环。每次共享节点在共享模式下唤醒时,头节点都会指向它,这样就可以保证可以获取到共享锁的所有后续节点都可以唤醒了。
如何自定义同步组件
在 JDK
中基于同步器实现的一些类绝大部分都是聚合了一个或多个继承了同步器的类,使用同步器提供的模板方法自定义内部同步状态的管理,然后通过这个内部类去实现同步状态管理
的功能,其实这从某种程度上来说使用了 模板模式
。比如 JDK
中可重入锁 ReentrantLock
、读写锁 ReentrantReadWriteLock
、信号量 Semaphore
以及同步工具类 CountDownLatch
等,其源码部分截图如下:
通过上文可以知道,我们基于同步器可以分别自定义独占锁同步组件和共享锁同步组件,下面以实现一个在同一个时刻最多只允许 3 个线程访问,其它线程的访问将被阻塞的同步工具 TripletsLock
为例,很显然这个工具是共享锁模式,主要思路就是去实现一个 JDk
中的 Lock
接口来提供面向使用者的方法,比如,调用 lock
方法获取锁,使用 unlock
来对锁进行释放等,在 TripletsLock
类内部有一个自定义同步器 Sync
继承自同步器 AQS,用来对线程的访问和同步状态进行控制,当线程调用 lock
方法获取锁时,自定义同步器 Sync
先计算出获取到锁后的同步状态,然后使用 Unsafe
类操作来保证同步状态更新的原子性,由于同一时刻只能 3 个线程访问,这里我们可以将同步状态 state
的初始值设置为 3,表示当前可用的同步资源数量,当有线程成功获取到锁时将同步状态 state
减 1,有线程成功释放锁时将同步状态加 1
,同步状态的取值范围为 0、1、2、3,同步状态为 0 时表示没有可用同步资源,这个时候如果有线程访问将被阻塞。下面来看看这个自定义同步组件的实现代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/**
* @author mghio
* @date: 2020-06-13
* @version: 1.0
* @description:
* @since JDK 1.8
*/
public class TripletsLock implements Lock {
private final Sync sync = new Sync(3);
private static final class Sync extends AbstractQueuedSynchronizer {
public Sync(int state) {
setState(state);
}
Condition newCondition() {
return new ConditionObject();
}
@Override
protected int tryAcquireShared(int reduceCount) {
for (; ;) {
int currentState = getState();
int newState = currentState - reduceCount;
if (newState < 0 || compareAndSetState(currentState, newState)) {
return newState;
}
}
}
@Override
protected boolean tryReleaseShared(int count) {
for (; ;) {
int currentState = getState();
int newState = currentState + count;
if (compareAndSetState(currentState, newState)) {
return true;
}
}
}
}
@Override
public void lock() {
sync.acquireShared(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquireShared(1) > 0;
}
@Override
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
@Override
public void unlock() {
sync.releaseShared(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
}
下面启动 20 个线程测试看看自定义同步同步工具类 TripletsLock
是否达到我们的预期。测试代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/**
* @author mghio
* @date: 2020-06-13
* @version: 1.0
* @description:
* @since JDK 1.8
*/
public class TripletsLockTest {
private final Lock lock = new TripletsLock();
private final DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
@Test
public void testTripletsLock() {
// 启动 20 个线程
for (int i = 0; i < 20; i++) {
Thread worker = new Runner();
worker.setDaemon(true);
worker.start();
}
for (int i = 0; i < 20; i++) {
second(2);
System.out.println();
}
}
private class Runner extends Thread {
@Override
public void run() {
for (; ;) {
lock.lock();
try {
second(1);
System.out.println(dateFormat.format(new Date()) + " ----> " + Thread.currentThread().getName());
second(1);
} finally {
lock.unlock();
}
}
}
}
private static void second(long seconds) {
try {
TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
测试结果如下:
从以上测试结果可以发现,同一时刻只有三个线程可以获取到锁,符合预期,这里需要明确的是这个锁获取过程是非公平的。
本文主要是对同步器中的基础数据结构、独占式与共享式同步状态获取与释放过程做了简要分析,由于水平有限如有错误之处还请留言讨论。队列同步器 AbstractQueuedSynchronizer
是 JDK
中很多的一些多线程并发工具类的实现基础框架,对其深入学习理解有助于我们更好的去使用其特性和相关工具类。
Java并发编程的艺术
Java Synchronizer - AQS Learning
从 ReentrantLock 的实现看 AQS 的原理及应用
The java.util.concurrent Synchronizer Framework
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK