13

(juc系列)传输队列(TransferQueue)及其实现

 2 years ago
source link: http://huyan.couplecoders.tech/2021/11/04/java/juc/2021-11-04-(juc%E7%B3%BB%E5%88%97)%E4%BC%A0%E8%BE%93%E9%98%9F%E5%88%97(TransferQueue)%E5%8F%8A%E5%85%B6%E5%AE%9E%E7%8E%B0/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

(juc系列)传输队列(TransferQueue)及其实现

本文源码基于: JDK13

TransferQueue 接口

官方注释翻译

一个支持让生产者阻塞等待消费者获取元素的阻塞队列. 可能用在消息传递系统中, 生产者有时候需要等待消费者调用take或者poll来获取元素,另外一些时候,入队元素可以不用等待消费者获取.

非阻塞的和超时阻塞的版本也是提供了的,使用tryTransfer.

一个TransferQueue也可以查询当前的消费者数量,这与peek是一个相反的操作.

像其他的阻塞队列一样,TransferQueue也可以是有界的。这种情况下,一个尝试传输的操作可能会首先阻塞等待可用的空间,然后阻塞等待对应的消费者. 注意,在一个容量为0的传输队列中,puttransfer
操作实际上都是同步的.

  • tryTransfer 尝试传输
  • transfer 传输
  • tryTransfer 尝试传输
  • hasWaitingConsumer 是否有等待的消费者
  • getWaitingConsumerCount 等待的消费者的数量

LinkedTransferQueue 链表实现的一个传输队列

官方注释翻译

一个无界的TransferQueue,基于链表实现. 这个队列严格按照FIFO的顺序排序元素.

队头元素是在队里时间最长的生产者,队尾元素是在队列里时间最短的生产者。

注意,和大多数集合类不一样的是, size方法并不是一个常量时间的方法. 由于队列的异步特定,确定当前元素的数量需要遍历一遍元素,如果在遍历期间有修改的动作,那么拿到的集合可能是不准确的.

批量操作,比如add,remove,examine等,不保证原子性,比如,一个foreach的便利操作,可能会值添加了部分元素.

这个类和他的迭代器,实现了CollectionIterator的全部可选方法.


public class LinkedTransferQueue<E> extends AbstractQueue<E>
implements TransferQueue<E>, java.io.Serializable {

一个基本的队列,还是一个传输队列.

内部节点 Node

传输队列,首先要看一下内部的抽象节点.

static final class Node {
final boolean isData; // false if this is a request node
volatile Object item; // initially non-null if isData; CASed to match
volatile Node next;
volatile Thread waiter; // null when not waiting for a match

保存了节点的属性(生产者/消费者), 节点的元素,下一个节点的指针,等待的线程.

LinkedTransferQueue 属性


transient volatile Node head;

private transient volatile Node tail;

private transient volatile int sweepVotes;

保存了链表的头结点和尾节点,链表的常见结构.


public LinkedTransferQueue() {
head = tail = new Node();
}

public LinkedTransferQueue(Collection<? extends E> c) {
Node h = null, t = null;
for (E e : c) {
Node newNode = new Node(Objects.requireNonNull(e));
if (h == null)
h = t = newNode;
else
t.appendRelaxed(t = newNode);
}
if (h == null)
h = t = new Node();
head = h;
tail = t;
}

提供了两个构造方法,分别创建一个空的传输队列,和将给定集合的所有元素添加到队列中.

入队方法 生产者


public void put(E e) {
xfer(e, true, ASYNC, 0);
}

public boolean offer(E e, long timeout, TimeUnit unit) {
xfer(e, true, ASYNC, 0);
return true;
}

public boolean offer(E e) {
xfer(e, true, ASYNC, 0);
return true;
}

public boolean add(E e) {
xfer(e, true, ASYNC, 0);
return true;
}

出队方法 消费者


public E take() throws InterruptedException {
E e = xfer(null, false, SYNC, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E e = xfer(null, false, TIMED, unit.toNanos(timeout));
if (e != null || !Thread.interrupted())
return e;
throw new InterruptedException();
}

public E poll() {
return xfer(null, false, NOW, 0);
}

可以看到,基本上也都一致,调用的xfer方法,这就是核心了.

尝试传输方法


public boolean tryTransfer(E e) {
return xfer(e, true, NOW, 0) == null;
}

public void transfer(E e) throws InterruptedException {
if (xfer(e, true, SYNC, 0) != null) {
Thread.interrupted(); // failure possible only due to interrupt
throw new InterruptedException();
}
}

public boolean tryTransfer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
return true;
if (!Thread.interrupted())
return false;
throw new InterruptedException();
}

也是全部调用了xfer方法,这个方法就是这个类的核心了.

是核心,但是不想看了.

思路是: 维护量一个队列,队列中的元素有两种状态,生产者或者消费者.

每一个请求到来之后,从队列头部开始匹配,如果成功,就返回. 失败就匹配下一个,如果匹配到队列末尾还没有匹配成功,则将其添加到队列末尾,进行阻塞等待.

请求到来之后,如果队首的元素类型和当前的都不一致,那就不用匹配了,直接开始阻塞等待即可.

详细的代码放在这里,啥时候有耐心看懂再看.


private E xfer(E e, boolean haveData, int how, long nanos) {
// 生产者元素为空,不接受
if (haveData && (e == null))
throw new NullPointerException();

// 外层自旋
restart: for (Node s = null, t = null, h = null;;) {

// 从头结点开始匹配
for (Node p = (t != (t = tail) && t.isData == haveData) ? t
: (h = head);; ) {
final Node q; final Object item;
// 如果头结点和属性和给定的不一致
if (p.isData != haveData
&& haveData == ((item = p.item) == null)) {
if (h == null) h = head;
// 尝试匹配
if (p.tryMatch(item, e)) {
if (h != p) skipDeadNodesNearHead(h, p);
// 返回匹配结果
return (E) item;
}
}
if ((q = p.next) == null) {
if (how == NOW) return e;
if (s == null) s = new Node(e);
if (!p.casNext(null, s)) continue;
if (p != t) casTail(t, s);
if (how == ASYNC) return e;
return awaitMatch(s, p, e, (how == TIMED), nanos);
}
if (p == (p = q)) continue restart;
}
}
}
  1. LinkedTransferQueue可以看作LinkedBlockingQueue、SynchronousQueue(公平模式)、ConcurrentLinkedQueue三者的集合体;
  2. 不管是取元素还是放元素都会入队;
  3. 先尝试跟头节点比较,如果二者模式不一样,就匹配它们,组成CP,然后返回对方的值;
  4. 如果二者模式一样,就入队,并自旋或阻塞等待被唤醒;
  5. LinkedTransferQueue全程都没有使用synchronized、重入锁等比较重的锁,基本是通过 自旋+CAS 实现;
  6. 对于入队之后,先自旋一定次数后再调用LockSupport.park()或LockSupport.parkNanos阻塞;

最后,欢迎关注我的个人公众号【 呼延十 】,会不定期更新很多后端工程师的学习笔记。
也欢迎直接公众号私信或者邮箱联系我,一定知无不言,言无不尽。

%E6%89%AB%E7%A0%81_%E6%90%9C%E7%B4%A2%E8%81%94%E5%90%88%E4%BC%A0%E6%92%AD%E6%A0%B7%E5%BC%8F-%E6%A0%87%E5%87%86%E8%89%B2%E7%89%88.png

以上皆为个人所思所得,如有错误欢迎评论区指正。

欢迎转载,烦请署名并保留原文链接。

联系邮箱:[email protected]

更多学习笔记见个人博客或关注微信公众号 <呼延十 >——>呼延十


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK