48

并发编程之并发队列

 4 years ago
source link: https://www.tuicool.com/articles/qyumi2e
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

一、并发队列

在并发队列上JDK提供了两套实现,

一个是以ConcurrentLinkedQueue为代表的高性能队列非阻塞,

一个是以BlockingQueue接口为代表的阻塞队列,无论哪种都继承自Queue。

1、阻塞队列与非阻塞队

阻塞队列与普通队列的区别在于:

阻塞队列:

  • 当队列是空的时,从队列中获取元素的操作将会被阻塞,试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列插入新的元素;
  • 当队列是满时,往队列里添加元素的操作会被阻塞。试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他的线程使队列重新变得空闲起来,如从队列中移除一个或者多个元素,或者完全清空队列.

2、ConcurrentLinkedQeque

ConcurrentLinkedQueue : 是一个适用于高并发场景下的队列,通过无锁的方式,实现

了高并发状态下的高性能,通常ConcurrentLinkedQueue性能好于BlockingQueue.它

是一个基于链接节点的无界线程安全队列。该队列的元素遵循先进先出的原则。头是最先

加入的,尾是最近加入的,该队列不允许null元素。

// 非阻塞式队列,无界队列
ConcurrentLinkedDeque q = new ConcurrentLinkedDeque();
    q.offer("张三");
    q.offer("李四");
    q.offer("王五");
    //从头获取元素,删除该元素
    System.out.println(q.poll());
    //从头获取元素,不刪除该元素
    System.out.println(q.peek());
    //获取总长度
    System.out.println(q.size());

3、BlockingQueue

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:

  • 在队列为空时,获取元素的线程会等待队列变为非空。
  • 当队列满时,存储元素的线程会等待队列可用。

在Java中,BlockingQueue的接口位于java.util.concurrent 包中(在Java5版本开始提供),由上面介绍的阻塞队列的特性可知,阻塞队列是线程安全的。

1)、ArrayBlockingQueue

ArrayBlockingQueue是一个有边界的阻塞队列,它的内部实现是一个数组。有边界的意思是它的容量是有限的,我们必须在其初始化的时候指定它的容量大小,容量大小一旦指定就不可改变。

ArrayBlockingQueue是以先进先出的方式存储数据,最新插入的对象是尾部,最新移出的对象是头部。下面

是一个初始化和使用ArrayBlockingQueue的例子:

<String> arrays = new ArrayBlockingQueue<String>(3);
    arrays.offer("张三");
     arrays.offer("李四");
    arrays.offer("王五");
    arrays.offer("666", 3, TimeUnit.SECONDS); // 队列满了,阻塞3秒后向下执行
    System.out.println(arrays.poll()); // 张三
    System.out.println(arrays.poll()); // 李四
    System.out.println(arrays.poll()); // 王五
    System.out.println(arrays.poll(3, TimeUnit.SECONDS)); //队列为空,阻塞3秒后结束

2)、LinkedBlockingQueue

LinkedBlockingQueue阻塞队列大小的配置是可选的,如果我们初始化时指定一个大小,它就是有边界的,如果不指定,它就是无边界的。说是无边界,其实是采用了默认大小为Integer.MAX_VALUE的容量 。它的内部实现是一个链表。

和ArrayBlockingQueue一样,LinkedBlockingQueue 也是以先进先出的方式存储数据,最新插入的对象是尾部,最新移出的对象是头部。下面是一个初始化和使LinkedBlockingQueue的例子:

LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(3);
linkedBlockingQueue.add("张三");
linkedBlockingQueue.add("李四");
linkedBlockingQueue.add("李四");
System.out.println(linkedBlockingQueue.size()); // 3

3)、PriorityBlockingQueue(有界,快满时自动扩容,看似无界)

PriorityBlockingQueue是一个没有边界的队列,它的排序规则和 java.util.PriorityQueue一样。需要注意,PriorityBlockingQueue中允许插入null对象。

所有插入PriorityBlockingQueue的对象必须实现 java.lang.Comparable接口,队列优先级的排序规则就

是按照我们对这个接口的实现来定义的。

另外,我们可以从PriorityBlockingQueue获得一个迭代器Iterator,但这个迭代器并不保证按照优先级顺序进行迭代。

4)、SynchronousQueue

SynchronousQueue队列内部仅允许容纳一个元素。当一个线程插入一个元素后会被阻塞,除非这个元素被另一个线程消费。

5)、使用BlockingQueue模拟生产者与消费者

class ProducerThread implements Runnable {
    private BlockingQueue<String> blockingQueue;
    private AtomicInteger count = new AtomicInteger();
    private volatile boolean FLAG = true;

    public ProducerThread(BlockingQueue<String> blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + "生产者开始启动....");
        while (FLAG) {
            String data = count.incrementAndGet() + "";
            try {
                boolean offer = blockingQueue.offer(data, 2, TimeUnit.SECONDS);
                if (offer) {
                    System.out.println(Thread.currentThread().getName() + ",生产队列" + data + "成功..");
                } else {
                    System.out.println(Thread.currentThread().getName() + ",生产队列" + data + "失败..");
                }
                Thread.sleep(1000);
            } catch (Exception e) {

            }
        }
        System.out.println(Thread.currentThread().getName() + ",生产者线程停止...");
    }

    public void stop() {
        this.FLAG = false;
    }

}

class ConsumerThread implements Runnable {
    private volatile boolean FLAG = true;
    private BlockingQueue<String> blockingQueue;

    public ConsumerThread(BlockingQueue<String> blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + "消费者开始启动....");
        while (FLAG) {
            try {
                String data = blockingQueue.poll(2, TimeUnit.SECONDS);
                if (data == null || data == "") {
                    FLAG = false;
                    System.out.println("消费者超过2秒时间未获取到消息.");
                    return;
                }
                System.out.println("消费者获取到队列信息成功,data:" + data);

            } catch (Exception e) {
                // TODO: handle exception
            }
        }
    }

}

public class Test0008 {

    public static void main(String[] args) {
        BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>(3);
        ProducerThread producerThread = new ProducerThread(blockingQueue);
        ConsumerThread consumerThread = new ConsumerThread(blockingQueue);
        Thread t1 = new Thread(producerThread);
        Thread t2 = new Thread(consumerThread);
        t1.start();
        t2.start();
        //10秒后 停止线程..
        try {
            Thread.sleep(10*1000);
            producerThread.stop();
        } catch (Exception e) {
            // TODO: handle exception
        }
    }

}
  1. ArrayDeque, (数组双端队列) 
  2. PriorityQueue, (优先级队列) 
  3. ConcurrentLinkedQueue, (基于链表的并发队列) 
  4. DelayQueue, (延期阻塞队列)(阻塞队列实现了BlockingQueue接口) 
  5. ArrayBlockingQueue, 常用(基于数组的并发阻塞队列) 
  6. LinkedBlockingQueue, 常用(基于链表的FIFO阻塞队列) 
  7. LinkedBlockingDeque, (基于链表的FIFO双端阻塞队列) 
  8. PriorityBlockingQueue,常用 (带优先级的无界阻塞队列,) 
  9. SynchronousQueue常用 (并发同步阻塞队列)

本文由博客一文多发平台 OpenWrite 发布!


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK