

【java】生产者消费者模式的实现 | iTimeTraveler
source link: https://itimetraveler.github.io/2017/11/10/%E3%80%90Java%E3%80%91%E7%94%9F%E4%BA%A7%E8%80%85%E6%B6%88%E8%B4%B9%E8%80%85%E6%A8%A1%E5%BC%8F%E7%9A%84%E5%AE%9E%E7%8E%B0/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

生产者消费者问题是线程模型中的经典问题:生产者和消费者在同一时间段内共用同一存储空间,生产者向空间里生产数据,而消费者取走数据。
阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
wait/notify方法
首先,我们搞清楚Thread.sleep()方法和Object.wait()、Object.notify()方法的区别。根据这篇文章java sleep和wait的区别的疑惑?
sleep()
是Thread类的方法;而wait()
,notify()
,notifyAll()
是Object类中定义的方法;尽管这两个方法都会影响线程的执行行为,但是本质上是有区别的。Thread.sleep()
不会导致锁行为的改变,如果当前线程是拥有锁的,那么Thread.sleep()
不会让线程释放锁。如果能够帮助你记忆的话,可以简单认为和锁相关的方法都定义在Object类中,因此调用Thread.sleep()
是不会影响锁的相关行为。Thread.sleep
和Object.wait
都会暂停当前的线程,对于CPU资源来说,不管是哪种方式暂停的线程,都表示它暂时不再需要CPU的执行时间。OS会将执行时间分配给其它线程。区别是调用wait后,需要别的线程执行notify/notifyAll才能够重新获得CPU执行时间。
线程状态图:
Thread.sleep()
让线程从 【running】 -> 【阻塞态】 时间结束/interrupt -> 【runnable】Object.wait()
让线程从 【running】 -> 【等待队列】notify -> 【锁池】 -> 【runnable】
实现生产者消费者模型
生产者消费者问题是研究多线程程序时绕不开的经典问题之一,它描述是有一块缓冲区作为仓库,生产者可以将产品放入仓库,消费者则可以从仓库中取走产品。在Java中一共有四种方法支持同步,其中前三个是同步方法,一个是管道方法。
(1)Object的wait() / notify()方法
(2)Lock和Condition的await() / signal()方法
(3)BlockingQueue阻塞队列方法
(4)PipedInputStream / PipedOutputStream
本文只介绍最常用的前三种,第四种暂不做讨论。源代码在这里:Java实现生产者消费者模型
1. 使用Object的wait() / notify()方法
wait()
/ nofity()
方法是基类Object的两个方法,也就意味着所有Java类都会拥有这两个方法,这样,我们就可以为任何对象实现同步机制。
wait()
:当缓冲区已满/空时,生产者/消费者线程停止自己的执行,放弃锁,使自己处于等待状态,让其他线程执行。notify()
:当生产者/消费者向缓冲区放入/取出一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态。
/**
* 生产者消费者模式:使用Object.wait() / notify()方法实现
*/
public class ProducerConsumer {
private static final int CAPACITY = 5;
public static void main(String args[]){
Queue<Integer> queue = new LinkedList<Integer>();
Thread producer1 = new Producer("P-1", queue, CAPACITY);
Thread producer2 = new Producer("P-2", queue, CAPACITY);
Thread consumer1 = new Consumer("C1", queue, CAPACITY);
Thread consumer2 = new Consumer("C2", queue, CAPACITY);
Thread consumer3 = new Consumer("C3", queue, CAPACITY);
producer1.start();
producer2.start();
consumer1.start();
consumer2.start();
consumer3.start();
}
/**
* 生产者
*/
public static class Producer extends Thread{
private Queue<Integer> queue;
String name;
int maxSize;
int i = 0;
public Producer(String name, Queue<Integer> queue, int maxSize){
super(name);
this.name = name;
this.queue = queue;
this.maxSize = maxSize;
}
@Override
public void run(){
while(true){
synchronized(queue){
while(queue.size() == maxSize){
try {
System.out .println("Queue is full, Producer[" + name + "] thread waiting for " + "consumer to take something from queue.");
queue.wait();
} catch (Exception ex) {
ex.printStackTrace();
}
}
System.out.println("[" + name + "] Producing value : +" + i);
queue.offer(i++);
queue.notifyAll();
try {
Thread.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
/**
* 消费者
*/
public static class Consumer extends Thread{
private Queue<Integer> queue;
String name;
int maxSize;
public Consumer(String name, Queue<Integer> queue, int maxSize){
super(name);
this.name = name;
this.queue = queue;
this.maxSize = maxSize;
}
@Override
public void run(){
while(true){
synchronized(queue){
while(queue.isEmpty()){
try {
System.out.println("Queue is empty, Consumer[" + name + "] thread is waiting for Producer");
queue.wait();
} catch (Exception ex) {
ex.printStackTrace();
}
}
int x = queue.poll();
System.out.println("[" + name + "] Consuming value : " + x);
queue.notifyAll();
try {
Thread.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
判断Queue大小为0或者大于等于queueSize时须使用 while (condition) {}
,不能使用 if(condition) {}
。其中 while(condition)
循环,它又被叫做“自旋锁”。自旋锁以及wait()
和notify()
方法在线程通信这篇文章中有更加详细的介绍。为防止该线程没有收到notify()
调用也从wait()
中返回(也称作虚假唤醒),这个线程会重新去检查condition条件以决定当前是否可以安全地继续执行还是需要重新保持等待,而不是认为线程被唤醒了就可以安全地继续执行了。
输出日志如下:
[P-1] Producing value : +0
[P-1] Producing value : +1
[P-1] Producing value : +2
[P-1] Producing value : +3
[P-1] Producing value : +4
Queue is full, Producer[P-1] thread waiting for consumer to take something from queue.
[C3] Consuming value : 0
[C3] Consuming value : 1
[C3] Consuming value : 2
[C3] Consuming value : 3
[C3] Consuming value : 4
Queue is empty, Consumer[C3] thread is waiting for Producer
Queue is empty, Consumer[C2] thread is waiting for Producer
Queue is empty, Consumer[C1] thread is waiting for Producer
[P-2] Producing value : +0
[C1] Consuming value : 0
Queue is empty, Consumer[C1] thread is waiting for Producer
Queue is empty, Consumer[C2] thread is waiting for Producer
Queue is empty, Consumer[C3] thread is waiting for Producer
[P-1] Producing value : +5
[P-1] Producing value : +6
[P-1] Producing value : +7
[P-1] Producing value : +8
[P-1] Producing value : +9
Queue is full, Producer[P-1] thread waiting for consumer to take something from queue.
[C3] Consuming value : 5
[C3] Consuming value : 6
[C3] Consuming value : 7
[C3] Consuming value : 8
[C3] Consuming value : 9
Queue is empty, Consumer[C3] thread is waiting for Producer
Queue is empty, Consumer[C2] thread is waiting for Producer
Queue is empty, Consumer[C1] thread is waiting for Producer
[P-2] Producing value : +1
[C1] Consuming value : 1
Queue is empty, Consumer[C1] thread is waiting for Producer
Queue is empty, Consumer[C2] thread is waiting for Producer
Queue is empty, Consumer[C3] thread is waiting for Producer
[P-1] Producing value : +10
[P-1] Producing value : +11
[P-1] Producing value : +12
[P-1] Producing value : +13
[P-1] Producing value : +14
Queue is full, Producer[P-1] thread waiting for consumer to take something from queue.
[C3] Consuming value : 10
[C3] Consuming value : 11
[C3] Consuming value : 12
[C3] Consuming value : 13
[C3] Consuming value : 14
Queue is empty, Consumer[C3] thread is waiting for Producer
Queue is empty, Consumer[C2] thread is waiting for Producer
Queue is empty, Consumer[C1] thread is waiting for Producer
[P-2] Producing value : +2
[P-2] Producing value : +3
[P-2] Producing value : +4
[P-2] Producing value : +5
[P-2] Producing value : +6
Queue is full, Producer[P-2] thread waiting for consumer to take something from queue.
[C1] Consuming value : 2
[C1] Consuming value : 3
[C1] Consuming value : 4
[C1] Consuming value : 5
[C1] Consuming value : 6
Queue is empty, Consumer[C1] thread is waiting for Producer
Queue is empty, Consumer[C2] thread is waiting for Producer
Queue is empty, Consumer[C3] thread is waiting for Producer
[P-1] Producing value : +15
[C3] Consuming value : 15
Queue is empty, Consumer[C3] thread is waiting for Producer
Queue is empty, Consumer[C2] thread is waiting for Producer
Queue is empty, Consumer[C1] thread is waiting for Producer
[P-2] Producing value : +7
[P-2] Producing value : +8
[P-2] Producing value : +9
2. 使用Lock和Condition的await() / signal()方法
在JDK5.0之后,Java提供了更加健壮的线程处理机制,包括同步、锁定、线程池等,它们可以实现更细粒度的线程控制。Condition接口的await()
和signal()
就是其中用来做同步的两种方法,它们的功能基本上和Object的wait()
/ nofity()
相同,完全可以取代它们,但是它们和新引入的锁定机制Lock
直接挂钩,具有更大的灵活性。通过在Lock
对象上调用newCondition()
方法,将条件变量和一个锁对象进行绑定,进而控制并发程序访问竞争资源的安全。下面来看代码:
/**
* 生产者消费者模式:使用Lock和Condition实现
* {@link java.util.concurrent.locks.Lock}
* {@link java.util.concurrent.locks.Condition}
*/
public class ProducerConsumerByLock {
private static final int CAPACITY = 5;
private static final Lock lock = new ReentrantLock();
private static final Condition fullCondition = lock.newCondition(); //队列满的条件
private static final Condition emptyCondition = lock.newCondition(); //队列空的条件
public static void main(String args[]){
Queue<Integer> queue = new LinkedList<Integer>();
Thread producer1 = new Producer("P-1", queue, CAPACITY);
Thread producer2 = new Producer("P-2", queue, CAPACITY);
Thread consumer1 = new Consumer("C1", queue, CAPACITY);
Thread consumer2 = new Consumer("C2", queue, CAPACITY);
Thread consumer3 = new Consumer("C3", queue, CAPACITY);
producer1.start();
producer2.start();
consumer1.start();
consumer2.start();
consumer3.start();
}
/**
* 生产者
*/
public static class Producer extends Thread{
private Queue<Integer> queue;
String name;
int maxSize;
int i = 0;
public Producer(String name, Queue<Integer> queue, int maxSize){
super(name);
this.name = name;
this.queue = queue;
this.maxSize = maxSize;
}
@Override
public void run(){
while(true){
//获得锁
lock.lock();
while(queue.size() == maxSize){
try {
System.out .println("Queue is full, Producer[" + name + "] thread waiting for " + "consumer to take something from queue.");
//条件不满足,生产阻塞
fullCondition.await();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
System.out.println("[" + name + "] Producing value : +" + i);
queue.offer(i++);
//唤醒其他所有生产者、消费者
fullCondition.signalAll();
emptyCondition.signalAll();
//释放锁
lock.unlock();
try {
Thread.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
* 消费者
*/
public static class Consumer extends Thread{
private Queue<Integer> queue;
String name;
int maxSize;
public Consumer(String name, Queue<Integer> queue, int maxSize){
super(name);
this.name = name;
this.queue = queue;
this.maxSize = maxSize;
}
@Override
public void run(){
while(true){
//获得锁
lock.lock();
while(queue.isEmpty()){
try {
System.out.println("Queue is empty, Consumer[" + name + "] thread is waiting for Producer");
//条件不满足,消费阻塞
emptyCondition.await();
} catch (Exception ex) {
ex.printStackTrace();
}
}
int x = queue.poll();
System.out.println("[" + name + "] Consuming value : " + x);
//唤醒其他所有生产者、消费者
fullCondition.signalAll();
emptyCondition.signalAll();
//释放锁
lock.unlock();
try {
Thread.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
输入日志如下:
[P-1] Producing value : +0
[C1] Consuming value : 0
Queue is empty, Consumer[C3] thread is waiting for Producer
Queue is empty, Consumer[C2] thread is waiting for Producer
[P-2] Producing value : +0
[C3] Consuming value : 0
Queue is empty, Consumer[C2] thread is waiting for Producer
Queue is empty, Consumer[C1] thread is waiting for Producer
[P-2] Producing value : +1
[C2] Consuming value : 1
Queue is empty, Consumer[C1] thread is waiting for Producer
Queue is empty, Consumer[C3] thread is waiting for Producer
[P-1] Producing value : +1
[C1] Consuming value : 1
Queue is empty, Consumer[C3] thread is waiting for Producer
[P-1] Producing value : +2
[C3] Consuming value : 2
Queue is empty, Consumer[C2] thread is waiting for Producer
[P-2] Producing value : +2
[C2] Consuming value : 2
Queue is empty, Consumer[C1] thread is waiting for Producer
Queue is empty, Consumer[C2] thread is waiting for Producer
[P-1] Producing value : +3
[C1] Consuming value : 3
Queue is empty, Consumer[C2] thread is waiting for Producer
Queue is empty, Consumer[C1] thread is waiting for Producer
Queue is empty, Consumer[C3] thread is waiting for Producer
[P-2] Producing value : +3
[C2] Consuming value : 3
Queue is empty, Consumer[C1] thread is waiting for Producer
Queue is empty, Consumer[C3] thread is waiting for Producer
Queue is empty, Consumer[C2] thread is waiting for Producer
[P-1] Producing value : +4
[C1] Consuming value : 4
Queue is empty, Consumer[C3] thread is waiting for Producer
Queue is empty, Consumer[C2] thread is waiting for Producer
Queue is empty, Consumer[C1] thread is waiting for Producer
[P-2] Producing value : +4
[C3] Consuming value : 4
Queue is empty, Consumer[C2] thread is waiting for Producer
Queue is empty, Consumer[C1] thread is waiting for Producer
[P-2] Producing value : +5
[C2] Consuming value : 5
Queue is empty, Consumer[C1] thread is waiting for Producer
Queue is empty, Consumer[C2] thread is waiting for Producer
[P-1] Producing value : +5
[C1] Consuming value : 5
Queue is empty, Consumer[C2] thread is waiting for Producer
Queue is empty, Consumer[C3] thread is waiting for Producer
[P-2] Producing value : +6
[C2] Consuming value : 6
Queue is empty, Consumer[C3] thread is waiting for Producer
[P-1] Producing value : +6
[C3] Consuming value : 6
Queue is empty, Consumer[C3] thread is waiting for Producer
Queue is empty, Consumer[C1] thread is waiting for Producer
[P-2] Producing value : +7
[C3] Consuming value : 7
Queue is empty, Consumer[C1] thread is waiting for Producer
[P-1] Producing value : +7
[C1] Consuming value : 7
Queue is empty, Consumer[C2] thread is waiting for Producer
[P-2] Producing value : +8
[C2] Consuming value : 8
[P-1] Producing value : +8
[C1] Consuming value : 8
[P-2] Producing value : +9
[C3] Consuming value : 9
[P-2] Producing value : +10
[C2] Consuming value : 10
[P-1] Producing value : +9
[P-1] Producing value : +10
[C1] Consuming value : 9
[P-2] Producing value : +11
[C3] Consuming value : 10
[C2] Consuming value : 11
[P-2] Producing value : +12
[C1] Consuming value : 12
[P-1] Producing value : +11
[C3] Consuming value : 11
[P-2] Producing value : +13
[C2] Consuming value : 13
Queue is empty, Consumer[C2] thread is waiting for Producer
Queue is empty, Consumer[C3] thread is waiting for Producer
[P-1] Producing value : +12
[C2] Consuming value : 12
Queue is empty, Consumer[C3] thread is waiting for Producer
[P-1] Producing value : +13
[C3] Consuming value : 13
Queue is empty, Consumer[C1] thread is waiting for Producer
Queue is empty, Consumer[C3] thread is waiting for Producer
[P-2] Producing value : +14
[C1] Consuming value : 14
Queue is empty, Consumer[C3] thread is waiting for Producer
Queue is empty, Consumer[C1] thread is waiting for Producer
[P-1] Producing value : +14
[C3] Consuming value : 14
Queue is empty, Consumer[C1] thread is waiting for Producer
[P-1] Producing value : +15
[C1] Consuming value : 15
[P-2] Producing value : +15
[P-1] Producing value : +16
[C3] Consuming value : 15
[P-2] Producing value : +16
3. 使用BlockingQueue阻塞队列方法
JDK 1.5 以后新增的 java.util.concurrent
包新增了 BlockingQueue
接口。并提供了如下几种阻塞队列实现:
- java.util.concurrent.ArrayBlockingQueue
- java.util.concurrent.LinkedBlockingQueue
- java.util.concurrent.SynchronousQueue
- java.util.concurrent.PriorityBlockingQueue
实现生产者-消费者模型使用 ArrayBlockingQueue
或者 LinkedBlockingQueue
即可。
我们这里使用LinkedBlockingQueue
,它是一个已经在内部实现了同步的队列,实现方式采用的是我们第2种await()
/ signal()
方法。它可以在生成对象时指定容量大小。它用于阻塞操作的是put()和take()方法。
put()
方法:类似于我们上面的生产者线程,容量达到最大时,自动阻塞。take()
方法:类似于我们上面的消费者线程,容量为0时,自动阻塞。
我们可以跟进源码看一下LinkedBlockingQueue
类的put()
方法实现:
/** Main lock guarding all access */
final ReentrantLock lock = new ReentrantLock();
/** Condition for waiting takes */
private final Condition notEmpty = lock.newCondition();
/** Condition for waiting puts */
private final Condition notFull = lock.newCondition();
public void put(E e) throws InterruptedException {
putLast(e);
}
public void putLast(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
while (!linkLast(node))
notFull.await();
} finally {
lock.unlock();
}
}
看到这里证实了它的实现方式采用的是我们第2种await()
/ signal()
方法。下面我们就使用它实现吧。
/**
* 生产者消费者模式:使用{@link java.util.concurrent.BlockingQueue}实现
*/
public class ProducerConsumerByBQ{
private static final int CAPACITY = 5;
public static void main(String args[]){
LinkedBlockingDeque<Integer> blockingQueue = new LinkedBlockingDeque<Integer>(CAPACITY);
Thread producer1 = new Producer("P-1", blockingQueue, CAPACITY);
Thread producer2 = new Producer("P-2", blockingQueue, CAPACITY);
Thread consumer1 = new Consumer("C1", blockingQueue, CAPACITY);
Thread consumer2 = new Consumer("C2", blockingQueue, CAPACITY);
Thread consumer3 = new Consumer("C3", blockingQueue, CAPACITY);
producer1.start();
producer2.start();
consumer1.start();
consumer2.start();
consumer3.start();
}
/**
* 生产者
*/
public static class Producer extends Thread{
private LinkedBlockingDeque<Integer> blockingQueue;
String name;
int maxSize;
int i = 0;
public Producer(String name, LinkedBlockingDeque<Integer> queue, int maxSize){
super(name);
this.name = name;
this.blockingQueue = queue;
this.maxSize = maxSize;
}
@Override
public void run(){
while(true){
try {
blockingQueue.put(i);
System.out.println("[" + name + "] Producing value : +" + i);
i++;
//暂停最多1秒
Thread.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
* 消费者
*/
public static class Consumer extends Thread{
private LinkedBlockingDeque<Integer> blockingQueue;
String name;
int maxSize;
public Consumer(String name, LinkedBlockingDeque<Integer> queue, int maxSize){
super(name);
this.name = name;
this.blockingQueue = queue;
this.maxSize = maxSize;
}
@Override
public void run(){
while(true){
try {
int x = blockingQueue.take();
System.out.println("[" + name + "] Consuming : " + x);
//暂停最多1秒
Thread.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
输出日志如下:
[P-2] Producing value : +0
[P-1] Producing value : +0
[C1] Consuming : 0
[C3] Consuming : 0
[P-2] Producing value : +1
[C2] Consuming : 1
[P-2] Producing value : +2
[C1] Consuming : 2
[P-1] Producing value : +1
[C2] Consuming : 1
[P-1] Producing value : +2
[C3] Consuming : 2
[P-1] Producing value : +3
[C2] Consuming : 3
[P-2] Producing value : +3
[C1] Consuming : 3
[P-1] Producing value : +4
[C2] Consuming : 4
[P-2] Producing value : +4
[C3] Consuming : 4
[P-2] Producing value : +5
[C1] Consuming : 5
[P-1] Producing value : +5
[C2] Consuming : 5
[P-1] Producing value : +6
[C1] Consuming : 6
[P-2] Producing value : +6
[C2] Consuming : 6
[P-2] Producing value : +7
[C2] Consuming : 7
[P-1] Producing value : +7
[C1] Consuming : 7
[P-2] Producing value : +8
[C3] Consuming : 8
[P-2] Producing value : +9
[C2] Consuming : 9
[P-1] Producing value : +8
[C2] Consuming : 8
[P-2] Producing value : +10
[C1] Consuming : 10
[P-1] Producing value : +9
[C3] Consuming : 9
[P-1] Producing value : +10
[C2] Consuming : 10
[P-2] Producing value : +11
[C1] Consuming : 11
[C3] Consuming : 12
[P-2] Producing value : +12
[P-2] Producing value : +13
[C2] Consuming : 13
[P-1] Producing value : +11
[C3] Consuming : 11
[P-1] Producing value : +12
[C3] Consuming : 12
[P-2] Producing value : +14
[C1] Consuming : 14
[P-1] Producing value : +13
[C2] Consuming : 13
[P-2] Producing value : +15
[C3] Consuming : 15
[P-2] Producing value : +16
[C1] Consuming : 16
[P-1] Producing value : +14
[C3] Consuming : 14
[P-2] Producing value : +17
[C2] Consuming : 17
Recommend
-
61
共享零售是要围绕家和生活,着力发展互联网能力,赋能线下运营能力
-
75
想要了解更多关于Java生产者消费者问题的演变吗?那就看看这篇文章吧,我们分别用旧方法和新方法来处理这个问题。
-
97
原文地址 前言 在计算机世界当中,数据在不断产生的同时,也在不停地被处理着。产生数据的一方被我们称作生产者,而另一方则被称为消费者。...
-
23
生产者消费者Java实现
-
8
SpringBoot整合RabbitMQ, 实现生产者与消费者的功能。以及这期间我踩的坑。自然,依赖是少不了的。除了spring-boot-starter-web依赖外。就这个是最主要的依赖了,其他的看着办就是了。我用的是gradle,用maven的看着弄也一样的。无非就是包+包名...
-
8
Java生产者消费者的三种实现 2018-04-20 分类:Java / 编程
-
8
Mr.Feng BlogNLP、深度学习、机器学习、Python、Go基于asyncio的异步模式生产者/消费者模式使用Python实现基于asyncio的生产者/消费者模式。 asyncio实现...
-
5
MQ 是什么? MQ(Message Queue)消息队列,是基础数据结构中“先进先出”的一种数据结构。 指把要传输的数据(消息)放在队列中,用队列机制来实现消息传递——生产者产生消息并把消息放入队列,然后由消费者去处理。 消费者可以到指定队列...
-
5
1.lmax-Disruptor队列介绍 disruptor是英国著名的金融交易所lmax旗下技术团队开发的一款java实现的高性能内存队列框架 其发明disruptor的主要目的是为了改进传统的内存队列实现如jdk的ArrayBlocking...
-
4
java | 生产者/消费者 这里说一下,生产者-消费者和
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK