3

Java 多线程(七):线程池 - Grey Zeng

 1 year ago
source link: https://www.cnblogs.com/greyzeng/p/16685196.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Java 多线程(七):线程池

作者:Grey

原文地址:

博客园:Java 多线程(七):线程池

CSDN:Java 多线程(七):线程池

工作原理#

线程池内部是通过队列结合线程实现的,当我们利用线程池执行任务时:

  1. 如果此时线程池中的线程数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。

  2. 如果此时线程池中的线程数量等于corePoolSize,但是缓冲队列workQueue未满,那么任务被放入缓冲队列。

  3. 如果此时线程池中的线程数量大于等于corePoolSize,缓冲队列workQueue已满,并且线程池中的线程数量小于maximumPoolSize,建新的线程来处理被添加的任务。

  4. 如果此时线裎池中的线数量大于corePoolSize,缓存冲队列workQueue已满, 并且线程池中的数量等于maximumPoolSize,那么过handler所指定的策略来处理此任务。

  5. 当线程池中的线程数量大于corePoolSize时,如果某线程空闲时间超过keepAliveTime, 线将被终止。这样,线程池可以动态的调整池中的线程数。

相关配置#

corePoolSize:核心线程数

maximumPoolSize:最大线程数 【包括核心线程数】

keepAliveTime:生存时间【线程长时间不干活了,归还给操作系统,核心线程不用归还,可以指定是否参与归还过程】

生存时间单位

任务队列:等待队列,如果不指定,最大值是Integer.MAX_VALUE【各种各样的BlockingQueue

线程工厂【默认设置优先级是普通优先级,非守护线程】,最好自定义线程名称,方便回溯

拒绝策略,包括以下四种:

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。

ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。

ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新提交被拒绝的任务

ThreadPoolExecutor.CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务

执行流程:先占满核心线程-> 再占满任务队列-> 再占满(最大线程数-核心线程数)-> 最后执行拒绝策略
一般自定义拒绝策略:将相关信息保存到redis,kafka,日志,MySQL记录 实现RejectedExecutionHandler并重写rejectedExecution方法

自定义拒绝策略代码示例:

package git.snippets.juc;

import java.util.concurrent.*;

/**
 * 自定义拒绝策略
 */
public class MyRejectedHandler {
    public static void main(String[] args) {
        ExecutorService service = new ThreadPoolExecutor(4, 4,
                0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(6),
                Executors.defaultThreadFactory(),
                new MyHandler());
    }

    static class MyHandler implements RejectedExecutionHandler {

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            //log("r rejected")
            //save r kafka mysql redis
            //try 3 times
            if (executor.getQueue().size() < 10000) {
                //try put again();
            }
        }
    }
}

SingleThreadPool#

  • 保证线程按顺序执行

  • 为什么要有单线程的线程池?这个主要是用来做任务队列和线程生命周期管理

  • 使用LinkedBlockingQueue作为任务队列,上界为:Integer.MAX_VALUE(2147483647) 约等于无界。

示例代码见:

package git.snippets.juc;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static java.util.concurrent.TimeUnit.SECONDS;

public class SingleThreadPoolUsage {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService service = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 10; i++) {
            final int j = i;
            service.submit(() -> System.out.println("current thread " + Thread.currentThread() + "  " + j));
        }
        service.shutdown();
        service.awaitTermination(60, SECONDS);
    }
}

CachedThreadPool#

  • corePoolSize:0

  • maxiumPoolSize:Integer.MAX_VALUE(2147483647)

  • keepAliveTime 60秒

  • 使用SynchronousQueue作为任务队列 必须马上执行

使用示例:

package git.snippets.juc;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class CachedThreadPoolUsage {
    public static void main(String[] args) throws InterruptedException {
        System.out.println("cached thread pool usage...");
        ExecutorService service = Executors.newCachedThreadPool();
        System.out.println(service);
        for (int i = 0; i < 2; i++) {
            service.execute(() -> {
                try {
                    TimeUnit.MILLISECONDS.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName());
            });
        }
        System.out.println(service);
        TimeUnit.SECONDS.sleep(80);
        System.out.println(service);
    }
}

FixedThreadPool#

  • 最大线程数等于核心线程数

  • 使用LinkedBlockingQueue作为任务队列,上界为:Integer.MAX_VALUE(2147483647)

使用示例见:

package git.snippets.juc;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
 * 多线程和单线程计算某个范围内的所有素数
 */
public class FixedThreadPoolUsage {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        long start = System.currentTimeMillis();
        getPrime(1, 200000);
        long end = System.currentTimeMillis();
        System.out.println("use single thread...cost: " + (end - start));

        final int cpuCoreNum = 4;

        ExecutorService service = Executors.newFixedThreadPool(cpuCoreNum);

        MyTask t1 = new MyTask(1, 80000); //1-5 5-10 10-15 15-20
        MyTask t2 = new MyTask(80001, 130000);
        MyTask t3 = new MyTask(130001, 170000);
        MyTask t4 = new MyTask(170001, 200000);

        Future<List<Integer>> f1 = service.submit(t1);
        Future<List<Integer>> f2 = service.submit(t2);
        Future<List<Integer>> f3 = service.submit(t3);
        Future<List<Integer>> f4 = service.submit(t4);
        System.out.println();
        start = System.currentTimeMillis();
        f1.get();
        f2.get();
        f3.get();
        f4.get();
        end = System.currentTimeMillis();
        System.out.println("use fixed thread pool...cost: " + (end - start));
        service.shutdown();
        service.awaitTermination(1, TimeUnit.MINUTES);
    }

    static boolean isPrime(int num) {
        for (int i = 2; i <= num / 2; i++) {
            if (num % i == 0) {
                return false;
            }
        }
        return true;
    }

    static List<Integer> getPrime(int start, int end) {
        List<Integer> results = new ArrayList<>();
        for (int i = start; i <= end; i++) {
            if (isPrime(i)) results.add(i);
        }

        return results;
    }

    static class MyTask implements Callable<List<Integer>> {
        int startPos, endPos;

        MyTask(int s, int e) {
            this.startPos = s;
            this.endPos = e;
        }

        @Override
        public List<Integer> call() {
            List<Integer> r = getPrime(startPos, endPos);
            return r;
        }

    }
}

代码说明:本实例演示了多线程和单线程计算某个范围内的所有素数。输出结果如下

use single thread...cost: 1733

use fixed thread pool...cost: 505

ScheduledThreadPool#

使用DelayWorkQueue,包括了如下两个主要方法

scheduleAtFixedRate()

当前任务执行时间小于间隔时间,每次到点即执行;

当前任务执行时间大于等于间隔时间,任务执行后立即执行下一次任务。相当于连续执行了。

scheduleWithFixedDelay()

每当上次任务执行完毕后,间隔一段时间执行。不管当前任务执行时间大于、等于还是小于间隔时间,执行效果都是一样的。

使用示例:

package git.snippets.juc;

import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import static java.util.concurrent.TimeUnit.SECONDS;

public class ScheduleThreadPoolUsage {
    static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);


    public static void main(String[] args) {
        test1();
        test2();
        test3();
    }

    /**
     * 任务执行时间(8s)小于间隔时间(10s)
     */
    public static void test1() {
        scheduler.scheduleAtFixedRate(() -> {
            System.out.println("Start: scheduleAtFixedRate:    " + new Date());
            try {
                Thread.sleep(8000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("End  : scheduleAtFixedRate:    " + new Date());
        }, 0, 10, SECONDS);
    }

    /**
     * 任务执行时间(12s)大于间隔时间(10s)
     */
    public static void test2() {
        scheduler.scheduleAtFixedRate(() -> {
            System.out.println("Start: scheduleAtFixedRate:    " + new Date());
            try {
                Thread.sleep(12000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("End  : scheduleAtFixedRate:    " + new Date());
        }, 0, 10, SECONDS);
    }

    /**
     * 任务执行时间(8s)小于间隔时间(10s)
     */
    public static void test3() {
        scheduler.scheduleWithFixedDelay(() -> {
            System.out.println("Start: scheduleWithFixedDelay: " + new Date());
            try {
                Thread.sleep(12000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("End  : scheduleWithFixedDelay: " + new Date());
        }, 0, 10, SECONDS);
    }
}

ForkJoinPool#

Java SE 1.7 以后新增的线程池,包括以下两个核心类

第一个是:RecursiveAction

它是一种没有任何返回值的任务。只是做一些工作,比如写数据到磁盘,然后就退出了。 一个RecursiveAction可以把自己的工作分割成更小的几块, 这样它们可以由独立的线程或者 CPU 执行。
我们可以通过继承来实现一个RecursiveAction

第二个是:RecursiveTask

它是一种会返回结果的任务。可以将自己的工作分割为若干更小任务,并将这些子任务的执行合并到一个集体结果。 可以有几个水平的分割和合并。

使用示例:

package git.snippets.juc;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.TimeUnit;
import java.util.stream.LongStream;

/**
 * @author <a href="mailto:[email protected]">Grey</a>
 * @date 2021/4/25
 * @since 1.7
 */
public class ForkJoinPoolUsage implements Calculator {
    private ForkJoinPool pool;

    public ForkJoinPoolUsage() {
        // 也可以使用公用的 ForkJoinPool:
        // pool = ForkJoinPool.commonPool()
        pool = new ForkJoinPool();
    }

    public static void useRecursiveAction() throws InterruptedException {
        // 创建包含Runtime.getRuntime().availableProcessors()返回值作为个数的并行线程的ForkJoinPool
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        // 提交可分解的PrintTask任务
        forkJoinPool.submit(new MyRecursiveAction(0, 1000));

        while (!forkJoinPool.isTerminated()) {
            forkJoinPool.awaitTermination(2, TimeUnit.SECONDS);
        }
        // 关闭线程池
        forkJoinPool.shutdown();
    }

    public static void useRecursiveTask() {
        long[] numbers = LongStream.rangeClosed(1, 1000).toArray();
        Calculator calculator = new ForkJoinPoolUsage();
        System.out.println(calculator.sumUp(numbers)); // 打印结果500500
    }

    public static void main(String[] args) throws InterruptedException {
        useRecursiveTask();
        useRecursiveAction();
    }

    @Override
    public long sumUp(long[] numbers) {
        return pool.invoke(new SumTask(numbers, 0, numbers.length - 1));
    }

    private static class MyRecursiveAction extends RecursiveAction {

        /**
         * 每个"小任务"最多只打印20个数
         */
        private static final int MAX = 20;

        private int start;
        private int end;

        public MyRecursiveAction(int start, int end) {
            this.start = start;
            this.end = end;
        }

        @Override
        protected void compute() {
            //当end-start的值小于MAX时,开始打印
            if ((end - start) < MAX) {
                for (int i = start; i < end; i++) {
                    System.out.println(Thread.currentThread().getName() + "-i的值" + i);
                }
            } else {
                // 将大任务分解成两个小任务
                int middle = (start + end) / 2;
                MyRecursiveAction left = new MyRecursiveAction(start, middle);
                MyRecursiveAction right = new MyRecursiveAction(middle, end);
                left.fork();
                right.fork();
            }
        }


    }

    private static class SumTask extends RecursiveTask<Long> {
        private long[] numbers;
        private int from;
        private int to;

        public SumTask(long[] numbers, int from, int to) {
            this.numbers = numbers;
            this.from = from;
            this.to = to;
        }


        @Override
        protected Long compute() {

            // 当需要计算的数字小于6时,直接计算结果
            if (to - from < 6) {
                long total = 0;
                for (int i = from; i <= to; i++) {
                    total += numbers[i];
                }
                return total;
                // 否则,把任务一分为二,递归计算
            } else {
                int middle = (from + to) / 2;
                SumTask taskLeft = new SumTask(numbers, from, middle);
                SumTask taskRight = new SumTask(numbers, middle + 1, to);
                taskLeft.fork();
                taskRight.fork();
                return taskLeft.join() + taskRight.join();
            }
        }
    }
}

interface Calculator {
    long sumUp(long[] numbers);
}

此外,Java 的流式 API 底层也是 ForkJoinPool 实现的。

更多内容参考如下两篇文章

ForkJoinPool 的使用以及原理

聊聊并发(八)——Fork/Join 框架介绍

WorkStealingPool#

每个线程都有单独的队列,每个线程队列执行完毕后,就会去其他的线程队列里面拿过来执行, 底层是ForkJoinPool

  • Java SE 1.8 新增

  • 会自动启动 CPU 核数个线程去执行任务

使用示例:

/**
 *
 */
package git.snippets.juc;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @since 1.8
 */
public class WorkStealingPoolUsage {
    public static void main(String[] args) throws IOException {
        int core = Runtime.getRuntime().availableProcessors();
        //  会自动启动cpu核数个线程去执行任务 ,其中第一个是1s执行完毕,其余都是2s执行完毕,
        //  有一个任务会进行等待,当第一个执行完毕后,会再次偷取最后一个任务执行
        ExecutorService service = Executors.newWorkStealingPool();
        service.execute(new R(1000));
        for (int i = 0; i < core; i++) {
            service.execute(new R(2000));
        }
        //由于产生的是精灵线程(守护线程、后台线程),主线程不阻塞的话,看不到输出
        System.in.read();
    }

    static class R implements Runnable {

        int time;

        R(int t) {
            this.time = t;
        }

        @Override
        public void run() {
            try {
                TimeUnit.MILLISECONDS.sleep(time);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println(time + " " + Thread.currentThread().getName());
        }
    }
}

CompletableFuture#

  • Java SE 1.8 新增

  • anyOf()可以实现“任意个 CompletableFuture 只要一个成功”,allOf()可以实现“所有 CompletableFuture 都必须成功”,这些组合操作可以实现非常复杂的异步流程控制。

使用示例:

package git.snippets.juc;

import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

/**
 * 假设你能够提供一个服务
 * 这个服务查询各大电商网站同一类产品的价格并汇总展示
 */
public class CompletableFutureUsage {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        way1();
        way2();
    }

    public static void way1() {
        long start = System.currentTimeMillis();
        System.out.println("p1 " + priceOfJD());
        System.out.println("p2 " + priceOfTB());
        System.out.println("p3 " + priceOfTM());
        long end = System.currentTimeMillis();
        System.out.println("串行执行,耗时(ms):" + (end - start));
    }

    public static void way2() throws ExecutionException, InterruptedException {
        long start = System.currentTimeMillis();
        CompletableFuture<Double> p1 = CompletableFuture.supplyAsync(() -> priceOfJD());
        CompletableFuture<Double> p2 = CompletableFuture.supplyAsync(() -> priceOfTB());
        CompletableFuture<Double> p3 = CompletableFuture.supplyAsync(() -> priceOfTM());
        CompletableFuture.allOf(p1, p2, p3).join();
        System.out.println("p1 " + p1.get());
        System.out.println("p2 " + p2.get());
        System.out.println("p3 " + p3.get());
        long end = System.currentTimeMillis();
        System.out.println("使用CompletableFuture并行执行,耗时(ms): " + (end - start));
    }

    private static double priceOfTM() {
        delay();
        return 1.00;
    }

    private static double priceOfTB() {
        delay();
        return 2.00;
    }

    private static double priceOfJD() {
        delay();
        return 3.00;
    }

    private static void delay() {
        int time = new Random().nextInt(500);
        try {
            TimeUnit.MILLISECONDS.sleep(time);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

证明原子操作类比synchronized更高效#

示例代码如下

package git.snippets.juc;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 证明原子操作类比synchronized更高效
 *
 * @author <a href="mailto:[email protected]">Grey</a>
 * @date 2021/4/26
 */
public class AtomVSSync {
    public static void main(String[] args) {
        test1();
    }

    AtomicInteger atomicCount = new AtomicInteger(0);
    int count = 0;
    final static int TIMES = 80000000;

    void m() {
        for (int i = 0; i < TIMES; i++) {
            atomicCount.incrementAndGet(); //原子操作
        }
    }

    void m2() {
        for (int i = 0; i < TIMES; i++) {
            synchronized (this) {
                count++;
            }
        }
    }


    public static void test1() {
        AtomVSSync t1 = new AtomVSSync();
        AtomVSSync t2 = new AtomVSSync();
        long time1 = time(t1::m);
        System.out.println("使用原子类得到的结果是:" + t1.atomicCount);
        long time2 = time(t2::m2);
        System.out.println("使用synchronized得到的结果是:" + t2.count);

        System.out.println("使用原子类花费的时间是:" + time1);
        System.out.println("使用 synchronized 花费的时间是 :" + time2);
    }

    private static long time(Runnable runnable) {
        List<Thread> threads = new ArrayList<>();
        long startTime = System.currentTimeMillis();
        for (int i = 0; i < 10; i++) {
            threads.add(new Thread(runnable, "thread-" + i));
        }
        threads.forEach(Thread::start);
        threads.forEach(o -> {
            try {
                o.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        long endTime = System.currentTimeMillis();
        return endTime - startTime;
    }
}

Java SE 11 下运行上述代码,代码输出结果如下:

使用原子类得到的结果是:800000000
使用synchronized得到的结果是:800000000
使用原子类花费的时间是:12111
使用 synchronized 花费的时间是 :16471

AtomXXX类可以保证可见性吗?#

package git.snippets.juc;

import java.util.concurrent.atomic.AtomicBoolean;

/**
 * AtomXXX类可以保证可见性吗?请写一个程序来证明
 *
 * @author <a href="mailto:[email protected]">Grey</a>
 * @date 2021/4/26
 */
public class AtomVisible {
    public static void main(String[] args) {
        test2();
    }

    AtomicBoolean running = new AtomicBoolean(true);

    void m3() {
        System.out.println("m1 start");
        while (running.get()) {  //死循环。只有running=false时,才能执行后面的语句

        }
        System.out.println("m2 end");
    }

    public static void test2() {
        AtomVisible t = new AtomVisible();
        new Thread(t::m3, "t1").start();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        t.running.getAndSet(false);

    }
}
m1 start
m2 end

写一个程序证明AtomXXX类的多个方法并不构成原子性#

package git.snippets.juc;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 写一个程序证明AtomXXX类的多个方法并不构成原子性
 *
 * @author <a href="mailto:[email protected]">Grey</a>
 * @date 2021/4/26
 */
public class MultiAtomMethod {
    public static void main(String[] args) {
        test3();
    }

    AtomicInteger count = new AtomicInteger(0);

    void m4() {
        for (int i = 0; i < 10000; i++) {
            if (count.get() < 999 && count.get() >= 0) { //如果未加锁,之间还会有其他线程插进来
                count.incrementAndGet();
            }
        }
    }

    public static void test3() {
        MultiAtomMethod t = new MultiAtomMethod();
        List<Thread> threads = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            threads.add(new Thread(t::m4, "thread" + i));
        }
        threads.forEach(Thread::start);
        threads.forEach((o) -> {
            try {
                //join()方法阻塞调用此方法的线程,直到线程t完成,此线程再继续。通常用于在main()主线程内,等待其它线程完成再结束main()主线程。
                o.join(); //相当于在main线程中同步o线程,o执行完了,main线程才有执行的机会
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        System.out.println(t.count);
    }
}

说明#

本文涉及到的所有代码和图例

图例

代码

更多内容见:Java 多线程

参考资料#

工作线程数究竟要设置为多少 | 架构师之路

实战Java高并发程序设计(第2版)

深入浅出Java多线程

多线程与高并发-马士兵

Java并发编程实战

理解ScheduledExecutorService中scheduleAtFixedRate和scheduleWithFixedDelay的区别

ForkJoinPool 的使用以及原理

聊聊并发(八)——Fork/Join 框架介绍

使用CompletableFuture

图解Java多线程设计模式


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK