22

生产者消费者Java实现

 3 years ago
source link: https://suiyia.github.io/2019/12/27/%E7%94%9F%E4%BA%A7%E8%80%85%E6%B6%88%E8%B4%B9%E8%80%85Java%E5%AE%9E%E7%8E%B0/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

生产者消费者Java实现

upload successful

生产者消费者问题,描述的是共享固定大小缓冲区的两个进程——生产者和消费者,在实际运行时会发生的问题。

即生产者不能一直生产而不消费,这会造成缓冲区数据堆积;而消费者也需要有数据,才能进行消费。

使用 Java 多线程相关语法,解决生产者与消费者问题,有助于在以后的工作学习中,解决类似多个线程协同处理共享资源的问题。

Java 提供了关键字 synchronize、及相关工具类实现多线程,这里将使用 3 种方式实现。

  1. 定义共享缓冲区,缓冲区能够添加元素和去除元素。缓冲区满时将不能添加元素,缓冲区为空时将不能去除元素

  2. 定义生产者,负责向缓冲区添加数据;定义消费者,负责向缓冲区取数据

使用 Synchronize 关键字实现

public class TestA {

// 定义共享资源区
class Resource{
private int currCount = 0;
private int maxCount = 3;
private Object object = new Object();

// 存数据之前先拿到对象锁
       public void put() throws Exception{
synchronized (object){
while (currCount >= maxCount){
try {
object.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
currCount++;
System.out.println(Thread.currentThread().getName() + " put:"+currCount);
object.notify();
}
}

public void take() throws Exception{
synchronized (object){
while (currCount == 0){
try {
object.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
currCount--;
System.out.println(Thread.currentThread().getName() + " take:"+currCount);
object.notify();
}
}
}

// 生产者,传入资源实例 存数据
class Producer implements Runnable{

private Resource resource;

public Producer(Resource resource){
this.resource = resource;
}

@Override
public void run() {
while (true){
try {
sleep(1000);
resource.put();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

// 消费者,传入资源实例 取数据
class Consumer implements Runnable{

private Resource resource;

public Consumer(Resource resource){
this.resource = resource;
}

@Override
public void run() {
while (true){
try {
sleep(1000);
resource.take();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

public static void main(String[] args) {
TestA testA = new TestA();
Resource resource = testA.new Resource();
// 定义 3 个生产者, 1 个消费者
new Thread(testA.new Producer(resource)).start();
new Thread(testA.new Producer(resource)).start();
new Thread(testA.new Producer(resource)).start();
new Thread(testA.new Consumer(resource)).start();
}
}

使用 Lock、Condition 实现

与前面 synchronize 实现相比,主要区别就是 lock 需要在 finally 代码块中主动释放锁。

public class TestB {

// 定义共享资源区
class Resource{
private int currCount = 0;
private int maxCount = 3;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();


public void put() throws Exception{
lock.lock();
try {
while (currCount >= maxCount){
condition.await();
}
currCount++;
System.out.println(Thread.currentThread().getName() + " put:"+currCount);
condition.signal();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}

public void take() throws Exception{
lock.lock();
try {
while (currCount == 0){
condition.await();
}
currCount--;
System.out.println(Thread.currentThread().getName() + " take:"+currCount);
condition.signal();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
}

...
   定义生产者、消费者、main 方法与上面一样,不再赘述。
}

使用 BlockingQueue 阻塞队列实现

阻塞队列无需额外的关键字或者方法进行控制,它的底层实现逻辑就是 Lock 方法实现,当数据不满足取或者存的条件时它就会阻塞等待。

public class TestC {

// 定义共享资源区
class Resource{
private BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(3);

public void put() throws Exception{
queue.put(1);
System.out.println(Thread.currentThread().getName()+" put:"+queue.size());
}

public void take() throws Exception{
queue.take();
System.out.println(Thread.currentThread().getName()+" take:"+queue.size());
}
}
...
   定义生产者、消费者、main 方法与上面一样,不再赘述。
}

任务同步,Java 提供很多实现方式。后续学习了其它工具类的用法再来更新这个例子。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK