5

CountDownLatch、CyclicBarrier、Semaphore、Exchanger 的详细解析

 3 years ago
source link: https://segmentfault.com/a/1190000038324545
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Uf2uEzQ.jpg!mobile

本文主要介绍和对比我们常用的几种并发工具类,主要涉及 CountDownLatchCyclicBarrierSemaphoreExchanger 相关的内容,如果对多线程相关内容不熟悉,可以看笔者之前的一些文章:

  • 介绍 CountDownLatchCyclicBarrier 两者的使用与区别,他们都是等待多线程完成,是一种并发流程的控制手段,
  • 介绍 SemaphoreExchanger 的使用, semaphore 是信号量,可以用来控制允许的线程数,而 Exchanger 可以用来交换两个线程间的数据。

CountDownLatch

  • CountDownLatchJDK5 之后加入的一种并发流程控制工具,它在 java.util.concurrent 包下
  • CountDownLatch 允许一个或多个线程等待其他线程完成操作,这里需要注意,是可以是一个等待也可以是多个来等待
  • CountDownLatch 的构造函数如下,它接受一个 int 类型的参数作为计数器,即如果你想等待 N 个线程完成,那么这里就传入 N
public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
  • 其中有两个核心的方法 countDownawait ,其中 当我们调用 countDown 方法时相应的 N 的值减 1,而 await 方法则会阻塞当前线程,直到 N 的值变为零。
  • 说起来比较抽象,下面我们通过实际案例来说明。

多个线程等待一个线程

  • 在我们生活中最典型的案例就是体育中的跑步,假设现在我们要进行一场赛跑,那么所有的选手都需要等待裁判员的起跑命令,这时候,我们将其抽象化每个选手对应的是一个线程,而裁判员也是一个线程,那么就是多个选手的线程再等待裁判员线程的命令来执行
  • 我们通过 CountDownLatch 来实现这一案例,那么等待的个数 N 就是上面的裁判线程的个数,即为 1,
/**
     * @url i-code.onlien
     * 云栖简码
     */
    public static void main(String[] args) throws InterruptedException {
        //模拟跑步比赛,裁判说开始,所有选手开始跑,我们可以使用countDownlatch来实现

        //这里需要等待裁判说开始,所以时等着一个线程
        CountDownLatch countDownLatch = new CountDownLatch(1);

        new Thread(() ->{
            try {
                System.out.println(Thread.currentThread().getName() +"已准备");
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+"开始跑~~");

        },"选手1").start();
        new Thread(() ->{
            try {
                System.out.println(Thread.currentThread().getName() +"已准备");
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+"开始跑~~");

        },"选手2").start();

        TimeUnit.SECONDS.sleep(1);
        System.out.println("裁判:预备~~~");
        countDownLatch.countDown();
        System.out.println("裁判:跑~~~");
    }
  • 运行结果如下:

faaUfmI.png!mobile

在上述代码中,我们首先创建了一个计数为1 的 CountDownLatch 对象,这代表我们需要等待的线程数,之后再创建了两个线程,用来代表选手线程,同时在选手的线程中我们都调用了 await 方法,让线程进入阻塞状态,直到CountDownLatch的计数为零后再执行后面的内容,在主线程 main 方法中我们等待 1秒后执行 countDown 方法,这个方法就是减一,此时的 N 则为零了,那么选手线程则开始执行后面的内容,整体的输出如上图所示

一个/多个线程等待多个线程

  • 同样从我们生活中的场景来抽象,假设公司要组织出游,大巴车接送,当凑够五个人大巴车则发车出发,这里就是大巴车需要等待这五个人全部到齐才能继续执行,我们抽象之后用 CountDownLatch 来实现,那么的计数个数 N 则为5,因为要等待这五个,通过代码实现如下:
public static void main(String[] args) throws InterruptedException {
        /**
         * i-code.online
         * 云栖简码 
         */
        //等待的个数
        CountDownLatch countDownLatch = new CountDownLatch(5);

        for (int i = 0; i < 5; i++) {
            new Thread(()->{
                System.out.println(Thread.currentThread().getName() + "从住所出发...");
                try {
                    TimeUnit.SECONDS.sleep((long) (Math.random()*10));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + " 到达目的地-----");
                countDownLatch.countDown();
            },"人员-"+i).start();
        }

        System.out.println("大巴正在等待人员中.....");
        countDownLatch.await();
        System.out.println("-----所有人到齐,出发-----");
    }
  • 上面代码执行结果如下:

Yr2263.png!mobile

从上述代码中我们可以看到,定义了一个计数为5的 countDownLatch ,之后通过循环创建五个线程,模拟五个人员,当他们到达指定地点后执行 countDown 方法,对计数减一。主线程相当于是大巴车的线程,执行 await 方法进行阻塞,只有当 N 的值减到0后则执行后面的输出

CountDownLatch 主要方法介绍

  • 构造函数:
public CountDownLatch(int count) {  };

它的构造函数是传入一个参数,该参数 count 是需要倒数的数值。

  • await() :调用 await() 方法的线程开始等待,直到倒数结束,也就是 count 值为 0 的时候才会继续执行。
  • await(long timeout, TimeUnit unit)await() 有一个重载的方法,里面会传入超时参数,这个方法的作用和 await() 类似,但是这里可以设置超时时间,如果超时就不再等待了。
  • countDown() :把数值倒数 1 ,也就是将 count 值减 1 ,直到减为 0 时,之前等待的线程会被唤起。

上面的案例介绍了 CountDownLatch 的使用,但是 CountDownLatch 有个特点,那就是不能够重用,比如已经完成了倒数,那可不可以在下一次继续去重新倒数呢?是可以的,一旦倒数到0 则结束了,无法再次设置循环执行,但是我们实际需求中有很多场景中需要循环来处理,这时候我们可以使用 CyclicBarrier 来实现

CyclicBarrier

  • CyclicBarrierCountDownLatch 比较相似,当等待到一定数量的线程后开始执行某个任务
  • CyclicBarrier 的字面意思是可以循环使用的屏障,它的功能就是让一组线程到达一个屏障(同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开会,此时所有被屏障阻塞的线程都将继续执行。如下演示

i2uiIrj.png!mobile

  • 上图中可以看到,到线程到达屏障后阻塞,直到最后一个也到达后,则全部放行
  • 首先我们来看下它的构造函数,如下:
public CyclicBarrier(int parties) {
        this(parties, null);
    }

    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }
  • CyclicBarrier(int parties) 构造函数提供了 int 类型的参数,代表的是需要拦截的线程数量,而每个线程通过调用 await 方法来告诉 CyclicBarrier 我到达屏障点了,然后阻塞
  • CyclicBarrier(int parties, Runnable barrierAction) 构造函数是为我们提供的一个高级方法,加了一个 barrierAction 的参数,这是一个 Runnable 类型的,也就是一个线程,它表示当所有线程到达屏障后,悠闲触发 barrierAction 线程执行,再执行各个线程之后的内容

案例

  • 假设你要和你女朋友约会,约定了一个时间地点,那么不管你们谁先到都会等待另一个到才会出发取约会~ 那么这时候我们通过 CyclicBarrier 的来实现,这里我们需要来拦截的线程就是两个。具体实现 如下:
/*
    CyclicBarrier 与countDownLatch 比较相似,也是等待线程完成,
    不过countDownLatch 是await等待其他的线程通过countDown的数量,达到一定数则执行,
    而 CyclicBarrier 则是直接看await的数量,达到一定数量直接全部执行,
     */
    public static void main(String[] args) {
        //好比情侣约会,不管谁先到都的等另一个,这里就是两个线程,
        CyclicBarrier cyclicBarrier = new CyclicBarrier(2);

        new Thread(() ->{
            System.out.println("快速收拾,出门~~~");
            try {
                TimeUnit.MILLISECONDS.sleep(500);
                System.out.println("到了约会地点等待女朋友前来~~");
                cyclicBarrier.await();
                System.out.println("女朋友到来嗨皮出发~~约会");
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }

        },"男朋友").start();
        new Thread(() ->{
            System.out.println("慢慢收拾,出门~~~");
            try {
                TimeUnit.MILLISECONDS.sleep(5000);
                System.out.println("到了约会地点等待男朋友前来~~");
                cyclicBarrier.await();
                System.out.println("男朋友到来嗨皮出发~~约会");
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        },"女朋友").start();

    }
  • 代码执行结果如下:

mA7nMnF.png!mobile

上面代码,相对简单,创建一个拦截数为2的屏障,之后创建两个线程,调用await方法,只有当调用两次才会触发后面的流程。

  • 我们再写一个案例sh,使用含有 Runnable 参数的构造函数;和之前 CountDownLatch 的案例相似,公司组织出游,这时候肯定有很多大巴在等待接送,大巴不会等所有的 人都到才出发,而是每坐满一辆车就出发一辆,这种场景我们就可以使用 CyclicBarrier 来实现,实现如下:
/*
    CyclicBarrier是可重复使用到,也就是每当几个满足是不再等待执行,
    比如公司组织出游,安排了好多辆大把,每坐满一辆就发车,不再等待,类似这种场景,实现如下:
     */

    public static void main(String[] args) {
        //公司人数
        int peopleNum = 2000;
        //每二十五个人一辆车,凑够二十五则发车~
        CyclicBarrier cyclicBarrier = new CyclicBarrier(25,() ->{
            //达到25人出发
            System.out.println("------------25人数凑齐出发------------");
        });

        for (int j = 1; j <= peopleNum; j++) {
            new Thread(new PeopleTask("People-"+j,cyclicBarrier)).start();
        }

    }

    static class PeopleTask implements Runnable{

        private String name;
        private  CyclicBarrier cyclicBarrier;
        public PeopleTask(String name,CyclicBarrier cyclicBarrier){
            this.name = name;
            this.cyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {
            System.out.println(name+"从家里出发,正在前往聚合地....");
            try {
                TimeUnit.MILLISECONDS.sleep(((int) Math.random()*1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(name+"到达集合地点,等待其他人..");
            try {
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }

        }
    }

CyclicBarrier 和 CountDownLatch 的异同

相同点:

  • 都能阻塞一个或一组线程,直到某个预设的条件达成发生,再统一出发

不同点:

  • 可重复性: CountDownLatch 的计数器只能使用一次,到到达0后就不能再次使用了,除非新建实例;而 CyclicBarrier 的计数器是可以复用循环的,所以 CyclicBarrier 可以用在更复杂的场景,可以随时调用 reset 方法来重制拦截数,如计算发生错误时可以直接充值计数器,让线程重新执行一次。
  • 作用对象: CyclicBarrier 要等固定数量的线程都到达了屏障位置才能继续执行,而 CountDownLatch 只需等待数字倒数到 0 ,也就是说 CountDownLatch 作用于事件,但 CyclicBarrier 作用于线程; CountDownLatch 是在调用了 countDown 方法之后把数字倒数减 1 ,而 CyclicBarrier 是在某线程开始等待后把计数减 1
  • 执行动作: CyclicBarrier 有执行动作 barrierAction ,而 CountDownLatch 没这个功能。

Semaphore

  • Semaphore (信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源,

Zvy6bm.png!mobile

acquire
release

案例

  • 如我们平时开发中典型的数据库操作,这是一个密集 IO 操作,我们可以启动很多线程但是数据库的连接池是有限制的,假设我们设置允许五个链接,如果我们开启太多线程直接操作则会出现异常,这时候我们可以通过信号量来控制,让一直最多只有五个线程来获取连接。代码如下:
/*
        Semaphore 是信号量, 可以用来控制线程的并发数,可以协调各个线程,以达到合理的使用公共资源
     */

    public static void main(String[] args) {
        //创建10个容量的线程池
        final ExecutorService service = Executors.newFixedThreadPool(100);
        //设置信号量的值5 ,也就是允许五个线程来执行
        Semaphore s = new Semaphore(5);
        for (int i = 0; i < 100; i++) {
            service.submit(() ->{
                try {
                    s.acquire();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                try {
                    System.out.println("数据库耗时操作"+Thread.currentThread().getName());
                    TimeUnit.MILLISECONDS.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "正在执行....");
                s.release();
            });
        }

    }

如上代码,创建了一个容量100的线程池,模拟我们程序中大量的线程,添加一百个任务,让线程池执行。创建了一个容量为5的信号量,在线程中我们调用 acquire 来获得信号量的许可,只有获得了才能只能下面的内容不然阻塞。当执行完后释放该许可,通过 release 方法,

  • 通过上面的演示,有没有觉得非常眼熟,对,就是和我们之前接触过的锁很相似,只是锁是只允许一个线程访问,那我们能不能将信号量的容量设置为1呢? 这当然是可以的,当我们设置为1时其实就和我们的锁的功能是一致的,如下代码:
private static int count = 0;
    /*
        Semaphore 中如果我们允许的的许可证数量为1 ,那么它的效果与锁相似。
     */
    public static void main(String[] args) throws InterruptedException {
        final ExecutorService service = Executors.newFixedThreadPool(10);

        Semaphore semaphore = new Semaphore(1);
        for (int i = 0; i < 10000; i++) {
            service.submit(() ->{
                try {
                    semaphore.acquire();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "执行了");
                count ++;
                semaphore.release();
            });
        }
        service.shutdown();
        TimeUnit.SECONDS.sleep(5);
        System.out.println(count);

    }

其他主要方法介绍

  • public boolean tryAcquire()tryAcquire 和锁的 trylock 思维是一致的,是尝试获取许可证,相当于看看现在有没有空闲的许可证,如果有就获取,如果现在获取不到也没关系,不必陷入阻塞,可以去做别的事。
  • public boolean tryAcquire(long timeout, TimeUnit unit) :是一个重载的方法,它里面传入了超时时间。比如传入了 3 秒钟,则意味着最多等待 3 秒钟,如果等待期间获取到了许可证,则往下继续执行;如果超时时间到,依然获取不到许可证,它就认为获取失败,且返回 false。
  • int availablePermits() :返回此信号量中当前可用的许可证数
  • int getQueueLength() :返回正在等待许可证的线程数
  • boolean hasQueuedThreads() :判断是否有线程正在等待获取许可证
  • void reducePermits(int reduction) :减少 reduction 个许可证,是个 protected 方法
  • Collection<Thread> getQueuedThreads() :返回正在等待获取许可证的线程集合,是个 protected 方法

Exchanger

  • Exchanger (交换者)是一个用于线程间协作的工具类,它主要用于进行线程间数据的交换,它有一个同步点,当两个线程到达同步点时可以将各自的数据传给对方,如果一个线程先到达同步点则会等待另一个到达同步点,到达同步点后调用 exchange 方法可以传递自己的数据并且获得对方的数据。
  • 我们假设现在需要录入一些重要的账单信息,为了保证准备,让两个人分别录入,之后再进行对比后是否一致,防止错误繁盛。下面通过代码来演示:
public class ExchangerTest {

    /*
    Exchanger 交换, 用于线程间协作的工具类,可以交换线程间的数据,
    其提供一个同步点,当线程到达这个同步点后进行数据间的交互,遗传算法可以如此来实现,
    以及校对工作也可以如此来实现
     */

    public static void main(String[] args) {
        /*
        模拟 两个工作人员录入记录,为了防止错误,两者录的相同内容,程序仅从校对,看是否有错误不一致的
         */

        //开辟两个容量的线程池
        final ExecutorService service = Executors.newFixedThreadPool(2);

        Exchanger<InfoMsg> exchanger = new Exchanger<>();

        service.submit(() ->{
            //模拟数据 线程 A的
            InfoMsg infoMsg = new InfoMsg();
            infoMsg.content="这是线程A";
            infoMsg.id ="10001";
            infoMsg.desc = "1";
            infoMsg.message = "message";
            System.out.println("正在执行其他...");
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            try {
                final InfoMsg exchange = exchanger.exchange(infoMsg);
                System.out.println("线程A 交换数据====== 得到"+ exchange);
                if (!exchange.equals(infoMsg)){
                    System.out.println("数据不一致~~请稽核");
                    return;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        service.submit(() ->{
            //模拟数据 线程 B的
            InfoMsg infoMsg = new InfoMsg();
            infoMsg.content="这是线程B";
            infoMsg.id ="10001";
            infoMsg.desc = "1";
            infoMsg.message = "message";
            System.out.println("正在执行其他...");
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            try {
                final InfoMsg exchange = exchanger.exchange(infoMsg);
                System.out.println("线程B 交换数据====== 得到"+ exchange);
                if (!exchange.equals(infoMsg)){
                    System.out.println("数据不一致~~请稽核");
                    return;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        service.shutdown();
    }

    static class InfoMsg{
        String id;
        String name;
        String message;
        String content;
        String desc;

        @Override
        public String toString() {
            return "InfoMsg{" +
                    "id='" + id + '\'' +
                    ", name='" + name + '\'' +
                    ", message='" + message + '\'' +
                    ", content='" + content + '\'' +
                    ", desc='" + desc + '\'' +
                    '}';
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
            InfoMsg infoMsg = (InfoMsg) o;
            return Objects.equals(id, infoMsg.id) &&
                    Objects.equals(name, infoMsg.name) &&
                    Objects.equals(message, infoMsg.message) &&
                    Objects.equals(content, infoMsg.content) &&
                    Objects.equals(desc, infoMsg.desc);
        }

        @Override
        public int hashCode() {
            return Objects.hash(id, name, message, content, desc);
        }
    }
}
  • 运行结果如下:

MjyU732.png!mobile

上面代码运行可以看到,当我们线程 A/B 到达同步点即调用 exchange 后进行数据的交换,拿到对方的数据再与自己的数据对比可以做到稽核 的效果

  • Exchanger 同样可以用于遗传算法中,选出两个对象进行交互两个的数据通过交叉规则得到两个混淆的结果。
  • Exchanger 中嗨提供了一个方法 public V exchange(V x, long timeout, TimeUnit unit) 主要是用来防止两个程序中一个一直没有执行 exchange 而导致另一个一直陷入等待状态,这是可以用这个方法,设置超时时间,超过这个时间则不再等待。

本文由AnonyStar 发布,可转载但需声明原文出处。

欢迎关注微信公账号 :云栖简码 获取更多优质文章

更多文章关注笔者博客 : 云栖简码 i-code.online


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK