10

(juc系列)Condition条件队列源码解析

 2 years ago
source link: http://huyan.couplecoders.tech/2021/11/04/java/juc/2021-11-04-(juc%E7%B3%BB%E5%88%97)Condition%E6%9D%A1%E4%BB%B6%E9%98%9F%E5%88%97%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

(juc系列)Condition条件队列源码解析

本文源码基于: JDK13

Condition

官方注释翻译

ConditionObject身上的方法wait,notify,nitofyAll等分解为不同的对象,通过将他们与任意的锁实现结合,实现了每个对象有多个等待集的效果. 锁代替了同步方法和语句,条件代替了对象对象监视器方法的使用.

Conditions(或者称为条件队列或者条件变量) 提供一个方法,让一个线程暂停wait,直到被其他线程通知,说某个等待条件可能为真. 因为共享状态的
访问在不同的线程中,因此必须保护它,与某种形式的锁相关联. 等待条件提供的关键属性是: 他原子性的释放关联的锁并挂起当前的线程,就像Object.wait().

Condition本质上是绑定到锁上的,要获取给定锁的条件实例,请调用locl.newCondition方法.

比如,假设我们有一个支持puttake的有界缓冲区. 如果对一个空的缓冲区进行take操作,线程将会阻塞,知道有元素可用. 如果对一个满的缓冲去进行put操作,线程会阻塞直到缓冲区有空间,也就是阻塞队列的语义.

我们希望在单独的等待集中保持生产者和消费者的线程,这样我们可以在缓冲区有空间或者缓冲区不为空时,只唤醒一部分线程. 这可以使用两个Condition实例来实现.

class BoundedBuffer<E> {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();

final Object[] items = new Object[100];
int putptr, takeptr, count;

public void put(E x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}

public E take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
E x = (E) items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}

如果你看过BlockingQueue相关的代码, 就会发现上面的代码兼职太眼熟了我的天.

Condition的实现类可以提供与Object的监控方法不同的行为和语义,比如保证通知的顺序,或者在执行通知时不需要持有锁. 如果实现提供了这样专门
的语义,那必须记录下来.

注意: Condition对象只是普通的对象,他们可以用做同步语句中的目标,并且可以调用他们自己的等待和通知方法,获取Condition实例的监视器所,或者使用他的监视器方法,与获取与Condition关联的锁之间没有什么关系.
为了避免混淆,建议永远不要这么高.

除非特别说明,否则传递任何null都会导致NPE。

  • await 等待
  • awaitUninterruptibly 不可中断的等待
  • awaitNanos 等待指定毫秒
  • await(time,unit) 等待指定时间
  • awaitUntil 等待知道deakline到来
  • signal 通知一个等待线程
  • signalAll 通知全部等待线程

AQS中的ConditionObject


public class ConditionObject implements Condition, java.io.Serializable {

朴实无华,一个条件.


private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;

等待的可能有多个线程,因此总是需要一个队列来保存等待者的,这里使用链表实现,保存了链表的首和尾.

Node节点保存了:


static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;

static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;

volatile int waitStatus;

volatile Node prev;

volatile Node next;

volatile Thread thread;

Node nextWaiter;

保存了等待的状态,以及前后节点,还有当前节点的线程.


public ConditionObject() { }

创建一个空的条件队列.

await 系列

既然实现了Condition接口,就按照接口的方法来看. 由于有多个关于时间控制的等待方法,为了避免冗余,我们只看一下await(time,unit)方法,比较有代表性.


public final boolean await(long time, TimeUnit unit)
throws InterruptedException {
// 计算纳秒时间
long nanosTimeout = unit.toNanos(time);
// 中断返回
if (Thread.interrupted())
throw new InterruptedException();
// We don't check for nanosTimeout <= 0L here, to allow
// await(0, unit) as a way to "yield the lock".
// 计算结束时间
final long deadline = System.nanoTime() + nanosTimeout;
// 将当前节点添加到等待队列中
Node node = addConditionWaiter();

int savedState = fullyRelease(node);
boolean timedout = false;
int interruptMode = 0;
// 状态ok就自旋
while (!isOnSyncQueue(node)) {
// 时间到了,结束
if (nanosTimeout <= 0L) {
timedout = transferAfterCancelledWait(node);
break;
}
// 休眠给定时间
if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
LockSupport.parkNanos(this, nanosTimeout);
// 是否中断
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
// 状态OK?是否中断了
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 如果当前节点还有后续节点,解除一些取消掉的节点
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
// 返回结果
return !timedout;
}

其实这种方法很难讲,代码写的很复杂,但是核心其实就一个LockSupport.park(),完事. 根据需要将当前线程休眠指定时间.

signal 唤醒系列

以signal为例,因为唤醒单个会了,唤醒全部大不了for循环~.


public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}

private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}


final boolean transferForSignal(Node node) {
if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
return false;

Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}

从队头找到第一个等待线程,验证当前状态之后,进行唤醒.

ReentrantLock 中的Condition

其实ReentrantLock是使用AQS实现的,为啥还要单独看呢?

因为在学习BlockingQueue时,对于两个条件分别控制生产者等待和消费者等待印象深刻,而ArrayListBlockingQueue是使用ReentrantLock实现的,因此单独看一下。

ReentrantLock中,初始化一个Condition,使用:


final ConditionObject newCondition() {
return new ConditionObject();
}

朴实无华,直接使用了AQS的条件队列.

Condition定义了一个接口,允许线程在它的实例上阻塞,互相唤醒.

我们已经有了Object提供了相关方法,为啥还需要Condition呢?

就是ArrayListBlockingQueue的情况了,Object对象,只允许所有的线程因为同样的原因阻塞.

而我们需要不同的线程群根据不同的条件阻塞,条件满足时,部分唤醒. 因此需要Condition

同时,有了Condition,我们还可以自定义很多逻辑,比如线程的唤醒顺序,或者添加更多自定义的hook方法等等,更加灵活.

AQSCondition为所有基于AQS实现的类,提供了默认的ConditionObject.

他内部使用链表来保存等待线程,使用CAS来保证更新的原子性. 因为在ArrayListBlockingQueue中,使用两个条件队列才能那么丝滑.

最后,欢迎关注我的个人公众号【 呼延十 】,会不定期更新很多后端工程师的学习笔记。
也欢迎直接公众号私信或者邮箱联系我,一定知无不言,言无不尽。

%E6%89%AB%E7%A0%81_%E6%90%9C%E7%B4%A2%E8%81%94%E5%90%88%E4%BC%A0%E6%92%AD%E6%A0%B7%E5%BC%8F-%E6%A0%87%E5%87%86%E8%89%B2%E7%89%88.png

以上皆为个人所思所得,如有错误欢迎评论区指正。

欢迎转载,烦请署名并保留原文链接。

联系邮箱:[email protected]

更多学习笔记见个人博客或关注微信公众号 <呼延十 >——>呼延十


Recommend

  • 33

    本文基于JDK-8u261源码分析 1 简介 因为CLH队列中的线程,什么线程获取到锁,什么线程进入队列排队...

  • 8
    • huyan.couplecoders.tech 3 years ago
    • Cache

    (juc系列)semaphore源码阅读

    (juc系列)semaphore源码阅读 - 呼延十的博客 | HuYan Blog本文源码基于: JDK13 为了巩固AQS. 看一下Semaphore的源码. 大部分都是直接翻译的官方代码注释,嘻嘻 一个计数的信号量. 概念上讲,信号量维护了一个许...

  • 13
    • huyan.couplecoders.tech 3 years ago
    • Cache

    (juc系列)cyclicbarrier源码阅读

    (juc系列)cyclicbarrier源码阅读 - 呼延十的博客 | HuYan Blog本文源码基于: JDK13 为了巩固AQS. 看一下CyclicBarrier的源码. 大部分都是直接翻译的官方代码注释,嘻嘻 一个允许一系列线程互相等待,到达一个公共屏障点的...

  • 11

    本文源码基于: JDK13 CopyOnWriteArrayList 官方注释翻译 ArrayList的一个线程安全的变体,所有可变的操作(比如add/set等)都使用底层数组的一个拷贝. 这经常是非常昂贵的,但是在便利操作远远大于更该操作的...

  • 12

    本文源码基于: JDK13 PriorityBlockingQueue 优先级阻塞队列 官方注释翻译 一个无界的阻塞队列,使用相同的排队规则PriorityQueue并且提供阻塞的操作. 因为这个队列逻辑上是误解的,尝试添加操作可能会失败,由于资环...

  • 7
    • huyan.couplecoders.tech 3 years ago
    • Cache

    (juc系列)延迟队列delayqueue

    本文源码基于: JDK13 DelayQueue 延迟队列 官方注释翻译 用于延迟元素的一个无界的阻塞队列实现. 延迟元素只有在他的延迟过期之后,才可以被获取. 队头的元素,是队列中过期最早的元素。如果没有元素过期,那么将没有队头...

  • 11
    • huyan.couplecoders.tech 3 years ago
    • Cache

    (juc系列)同步队列synchronousqueue

    本文源码基于: JDK13 SynchronousQueue 官方注释翻译 一个阻塞队列的实现,他的插入操作必须等待对应的移除操作. 反之亦然. 一个同步队列没有内部的容量限制. 不能执行peek()操作,因此只有当你...

  • 13
    • huyan.couplecoders.tech 3 years ago
    • Cache

    (juc系列)阻塞队列(blockingqueue)及其实现

    本文源码基于: JDK13 整体的类图: BlockingQueue 接口 官方注释翻译 什么是阻塞队列? 一个队列,它支持:...

  • 12

    本文源码基于: JDK13 PriorityBlockingQueue 优先级阻塞队列官方注释翻译一个无界的阻塞队列,使用相同的排队规则PriorityQueue并且提供阻塞的操作. 因为这个队...

  • 13
    • huyan.couplecoders.tech 2 years ago
    • Cache

    (juc系列)传输队列(TransferQueue)及其实现

    (juc系列)传输队列(TransferQueue)及其实现 本文源码基于: JDK13 TransferQueue 接口官方...

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK