33

BlockingQueue 与 Condition 原理解析

 5 years ago
source link: https://mp.weixin.qq.com/s/28se8ditCeEWe1fIPZmjjg?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

我在前段时间写了一篇关于AQS源码解析的文章 AbstractQueuedSynchronizer超详细原理解析  ,在文章里边我说  JUC 包中的大部分多线程相关的类都和  AQS 相关,今天我们就学习一下依赖于  AQS 来实现的阻塞队列  BlockingQueue 的实现原理。本文中的源码未加说明即来自于以  ArrayBlockingQueue

阻塞队列

相信大多数同学在学习线程池时会了解阻塞队列的概念,熟记各种类型的阻塞队列对线程池初始化的影响。当从阻塞队列获取元素但是队列为空时,当前线程会阻塞直到另一个线程向阻塞队列中添加一个元素;类似的,当向一个阻塞队列加入元素时,如果队列已经满了,当前线程也会阻塞直到另外一个线程从队列中读取一个元素。阻塞队列一般都是先进先出的,用来实现生产者和消费者模式。当发生上述两种情况时,阻塞队列有四种不同的处理方式,这四种方式分别为抛出异常,返回特殊值(null或在是false),阻塞当前线程直到执行结束,最后一种是只阻塞固定时间,到时后还无法执行成功就放弃操作。这些方法都总结在下边这种表中了。

RVvYFbr.jpg!web

我们就只分析 put 和  take 方法。

put和take函数

我们都知道,使用同步队列可以很轻松的实现生产者-消费者模式,其实,同步队列就是按照生产者-消费者的模式来实现的,我们可以将 put 函数看作生产者的操作,  take 是消费者的操作。

我们首先看一下 ArrayListBlock 的构造函数。它初始化了  put 和  take 函数中使用到的关键成员变量,分别是  ReentrantLock 和  Condition

ReentrantLock是 AQS 的子类,其  newCondition 函数返回的  Condition 接口实例是定义在AQS类内部的  ConditionObject 实现类。它可以直接调用  AQS 相关的函数。

NB7FVvM.jpg!web

put 函数会在队列末尾添加元素,如果队列已经满了,无法添加元素的话,就一直阻塞等待到可以加入为止。函数的源码如下所示。

我们会发现put函数使用了wait/notify的机制。与一般生产者-消费者的实现方式不同,同步队列使用 ReentrantLock 和  Condition 相结合的先获得锁,再等待的机制;而不是  Synchronized 和  Object.wait 的机制。这里的区别我们下一节再详细讲解。 看完了生产者相关的  put 函数,我们再来看一下消费者调用的  take 函数。  take 函数在队列为空时会被阻塞,一直到阻塞队列加入了新的元素。

await操作

我们发现 ArrayBlockingList 并没有使用  Object.wait ,而是使用的  Condition.await ,这是为什么呢?其中又有哪些原因呢?   Condition 对象可以提供和  Object 的  wait 和  notify 一样的行为,但是后者必须先获取  synchronized 这个内置的monitor锁,才能调用;而  Condition 则必须先获取  ReentrantLock 。这两种方式在阻塞等待时都会将相应的锁释放掉,但是  Condition 的等待可以中断,这是二者唯一的区别。

我们先来看一下 Condition 的  wait 函数,  wait 函数的流程大致如下图所示。

jQVzIzY.jpg!web

wait 函数主要有三个步骤。一是调用  addConditionWaiter 函数,在condition wait queue队列中添加一个节点,代表当前线程在等待一个消息。然后调用  fullyRelease 函数,将持有的锁释放掉,调用的是AQS的函数,不清楚的同学可以查看本篇开头的介绍的文章。最后一直调用  isOnSyncQueue 函数判断节点是否被转移到  sync queue 队列上,也就是AQS中等待获取锁的队列。如果没有,则进入阻塞状态,如果已经在队列上,则调用  acquireQueued 函数重新获取锁。

signal操作

signal 函数将  condition wait queue 队列中队首的线程节点转移等待获取锁的  sync queue 队列中。这样的话,  wait 函数中调用  isOnSyncQueue 函数就会返回true,导致  wait 函数进入最后一步重新获取锁的状态。

我们这里来详细解析一下 condition wait queue 和  sync queue 两个队列的设计原理。  condition wait queue 是等待消息的队列,因为阻塞队列为空而进入阻塞状态的  take 函数操作就是在等待阻塞队列不为空的消息。而  sync queue 队列则是等待获取锁的队列,take函数获得了消息,就可以运行了,但是它还必须等待获取锁之后才能真正进行运行状态。

signal 函数的示意图如下所示。

Bf6juyV.jpg!web

signal 函数其实就做了一件事情,就是不断尝试调用  transferForSignal 函数,将  condition wait queue 队首的一个节点转移到  sync queue 队列中,直到转移成功。因为一次转移成功,就代表这个消息被成功通知到了等待消息的节点。

后记

后边一篇文章主要讲解如何自己使用 AQS 来创建符合自己业务需求的锁,请大家继续关注我的文章啦.一起进步偶。

JZneumn.jpg!web


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK