

(juc系列)并发集合之concurrentlinkeddeque源码
source link: http://huyan.couplecoders.tech/java/juc/concurrentlinkeddeque/2021/11/05/(juc%E7%B3%BB%E5%88%97)%E5%B9%B6%E5%8F%91%E9%9B%86%E5%90%88%E4%B9%8BConcurrentLinkedDeque%E6%BA%90%E7%A0%81/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

本文源码基于: JDK13
ConcurrentLinkedDeque
官方注释翻译
一个无界的,并发的双端队列,使用链表实现. 多线程间的并发写入,移除,访问操作,可以保证安全.当有很多线程共享一个公共集合时,ConcurrentLinkedDeque
是一个不错的选择. 像其他的并发集合一样,这个类不接受null元素.
迭代器是弱一致的.
需要注意的是,和其他大多数集合不同,size
方法不是常量时间的操作. 因为队列的异步特性,决定了计数当前的元素需要遍历所有元素,因此如果有别的线程正在更改,size
方法可能返回不准确的数字.
批量操作不保证原子性,比如addAll
等. 当foreach
和addAll
一起运行时,可能foreach
只能观察到部分的元素.
这个类和他的迭代器实现了Queue
和Iterator
的所有可选方法.
public class ConcurrentLinkedDeque<E>
extends AbstractCollection<E>
implements Deque<E>, java.io.Serializable {
一个双端队列.
内部链表节点
static final class Node<E> {
volatile Node<E> prev;
volatile E item;
volatile Node<E> next;
}
前后结点的指针,以及当前节点的元素.
private transient volatile Node<E> head;
private transient volatile Node<E> tail;
保存了头尾节点
public ConcurrentLinkedDeque() {
head = tail = new Node<E>();
}
public ConcurrentLinkedDeque(Collection<? extends E> c) {
// Copy c into a private chain of Nodes
Node<E> h = null, t = null;
for (E e : c) {
Node<E> newNode = newNode(Objects.requireNonNull(e));
if (h == null)
h = t = newNode;
else {
NEXT.set(t, newNode);
PREV.set(newNode, t);
t = newNode;
}
}
initHeadTail(h, t);
}
两个构造方法,一个构造空的队列,一个将给定集合初始化到队列中.
public void addFirst(E e) {
linkFirst(e);
}
public void addLast(E e) {
linkLast(e);
}
public boolean offerFirst(E e) {
linkFirst(e);
return true;
}
public boolean offerLast(E e) {
linkLast(e);
return true;
}
支持队头和队尾的添加操作,具体调用的是linkFirst
和linkLast
.
- linkFirst
private void linkFirst(E e) {
// 创建当前节点
final Node<E> newNode = newNode(Objects.requireNonNull(e));
restartFromHead:
for (;;)
for (Node<E> h = head, p = h, q;;) {
// 如果节点的前置节点不为空,更新p节点
if ((q = p.prev) != null &&
(q = (p = q).prev) != null)
// Check for head updates every other hop.
// If p == q, we are sure to follow head instead.
p = (h != (h = head)) ? h : q;
// p节点出队了重新从头开始
else if (p.next == p) // PREV_TERMINATOR
continue restartFromHead;
else {
// p is first node
// 将当前节点设置为第一个.
NEXT.set(newNode, p); // CAS piggyback
// cas 更新相关属性, 原有头结点的前置属性,以及新的头结点等.
if (PREV.compareAndSet(p, null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this deque,
// and for newNode to become "live".
if (p != h) // hop two nodes at a time; failure is OK
HEAD.weakCompareAndSet(this, h, newNode);
return;
}
// Lost CAS race to another thread; re-read prev
}
}
}
将当前节点,设置为第一个节点,采用CAS+自旋实现,当发现已有头结点出队后,重新找头结点.
- linkLast
链接为节点,和头结点思路一致.
private void linkLast(E e) {
final Node<E> newNode = newNode(Objects.requireNonNull(e));
restartFromTail:
for (;;)
for (Node<E> t = tail, p = t, q;;) {
if ((q = p.next) != null &&
(q = (p = q).next) != null)
// Check for tail updates every other hop.
// If p == q, we are sure to follow tail instead.
p = (t != (t = tail)) ? t : q;
else if (p.prev == p) // NEXT_TERMINATOR
continue restartFromTail;
else {
// p is last node
PREV.set(newNode, p); // CAS piggyback
if (NEXT.compareAndSet(p, null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this deque,
// and for newNode to become "live".
if (p != t) // hop two nodes at a time; failure is OK
TAIL.weakCompareAndSet(this, t, newNode);
return;
}
// Lost CAS race to another thread; re-read next
}
}
}
public E pollFirst() {
restart: for (;;) {
for (Node<E> first = first(), p = first;;) {
// 队头节点, cas更改属性
final E item;
if ((item = p.item) != null) {
// recheck for linearizability
if (first.prev != null) continue restart;
if (ITEM.compareAndSet(p, item, null)) {
unlink(p);
return item;
}
}
// 已出队,重新开始
if (p == (p = p.next)) continue restart;
// p为空,队列为空,返回空
if (p == null) {
if (first.prev != null) continue restart;
return null;
}
}
}
}
public E pollLast() {
restart: for (;;) {
for (Node<E> last = last(), p = last;;) {
final E item;
if ((item = p.item) != null) {
// recheck for linearizability
if (last.next != null) continue restart;
if (ITEM.compareAndSet(p, item, null)) {
unlink(p);
return item;
}
}
if (p == (p = p.prev)) continue restart;
if (p == null) {
if (last.next != null) continue restart;
return null;
}
}
}
}
与入队对应的,将队首或者队尾进行弹出. 思路一致.
普通队列的操作
双端队列可以向普通队列一样,提供入队出队操作,此时他是一个FIFO
的队列,也就是入队添加到队尾,出队从队头获取元素.
和ConcurrentLinkedQueue
思路一致,使用CAS+自旋实现. 只是提供了双端队列相关的方法.
最后,欢迎关注我的个人公众号【 呼延十 】,会不定期更新很多后端工程师的学习笔记。
也欢迎直接公众号私信或者邮箱联系我,一定知无不言,言无不尽。
以上皆为个人所思所得,如有错误欢迎评论区指正。
欢迎转载,烦请署名并保留原文链接。
联系邮箱:[email protected]
更多学习笔记见个人博客或关注微信公众号 <呼延十 >——>呼延十
Recommend
-
11
ConcurrentLinkedDeque offer() method in Java with Examples Related Articles ...
-
4
(juc系列)countdownlatch源码阅读 - 呼延十的博客 | HuYan Blog本文源码基于: JDK13 为了巩固AQS. 看一下CountDownLatch的源码. 大部分都是直接翻译的官方代码注释,嘻嘻 一个同步器, 允许一个或者多个线程等待, 知道其他...
-
6
(juc系列)semaphore源码阅读 - 呼延十的博客 | HuYan Blog本文源码基于: JDK13 为了巩固AQS. 看一下Semaphore的源码. 大部分都是直接翻译的官方代码注释,嘻嘻 一个计数的信号量. 概念上讲,信号量维护了一个许...
-
10
(juc系列)cyclicbarrier源码阅读 - 呼延十的博客 | HuYan Blog本文源码基于: JDK13 为了巩固AQS. 看一下CyclicBarrier的源码. 大部分都是直接翻译的官方代码注释,嘻嘻 一个允许一系列线程互相等待,到达一个公共屏障点的...
-
6
本文源码基于: JDK13 JUC是Java提供的一个并发工具包,提供了很多并发工具. 本文主要将AQS. java.util.concurrent.locks.AbstractQueuedSynchronizer. 是一个基类,也可以理解为一个框架. 它提供了对于同步状态的控制...
-
2
本文源码基于: JDK13 上一篇文章讲了AQS的基本原理,其中两个关键的操作: 获取/释放. 依赖于子类的实现,本文就借着学习ReentrantLock的同时,继续巩固一下AQS. ReentrantLock支持公平锁以及非公平锁,实现源于内部不同的AQS子类同步...
-
9
JUC系列提供的又一个线程池,采用分治思想,及工作窃取策略,能获得更高的并发性能. 通过将大任务,切割成小任务并发执行,由每一个任务等待所有子任务的返回. 大概可以理解为递归的思路. 比如要计算1~100的累加和. 那么任务:
-
4
(juc系列)exchanger源码阅读 - 呼延十的博客 | HuYan Blog本文源码基于: JDK13 一个用于让线程之间配对和交换元素的同步点. 每个线程拿出一个元素,匹配另外一个伙伴线程, 互相交换. Exchanger可以看做是一个双向的
-
10
(juc系列)threadpoolexecutor源码学习 - 呼延十的博客 | HuYan Blog本文源码基于: JDK13 其实早在19年,就简单的写过ThreadPoolExecutor. 但是只涉及到了其中两个参数,理解也不深刻,今天重新看一下代码。 这个类是Java中常...
-
9
本文源码基于: JDK13 ConcurrentLinkedQueue 官方注释翻译 一个用链表实现的无界的线程安全的队列. 这个队列提供FIFO的元素顺序. 当多个线程需要共享一个集合的访问时, ConcurrentLinkedQueue...
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK