12

Java 常用四大线程池用法以及 ThreadPoolExecutor 详解

 3 years ago
source link: https://dong4j.github.io/views/java/2018/11091150.html#_4-scheduledthreadpool
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Java 常用四大线程池用法以及 ThreadPoolExecutor 详解


dong4j 11/9/2018 Concurrent

为什么用线程池

  • 1.创建/销毁线程伴随着系统开销, 过于频繁的创建/销毁线程, 会很大程度上影响处-理效率
  • 2.线程并发数量过多, 抢占系统资源从而导致阻塞
  • 3.对线程进行一些简单的管理

在 Java 中, 线程池的概念是 Executor 这个接口, 具体实现为 ThreadPoolExecutor 类, 学习 Java 中的线程池, 就可以直接学习他了对线程池的配置, 就是对ThreadPoolExecutor构造函数的参数的配置

# 构造函数:

//五个参数的构造函数
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue)

//六个参数的构造函数-1
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory)

//六个参数的构造函数-2
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler)

//七个参数的构造函数
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

下面来解释下各个参数:

  • int corePoolSize: 该线程池中 核心线程数最大值

核心线程: 线程池新建线程的时候, 如果当前线程总数小于corePoolSize, 则新建的是核心线程, 如果超过corePoolSize, 则新建的是非核心线程核心线程默认情况下会一直存活在线程池中, 即使这个核心线程啥也不干(闲置状态).

如果指定ThreadPoolExecutor的allowCoreThreadTimeOut这个属性为true, 那么核心线程如果不干活(闲置状态)的话, 超过一定时间(时长下面参数决定), 就会被销毁掉.

  • int maximumPoolSize:  该线程池中线程总数最大值

线程总数 = 核心线程数 + 非核心线程数.

  • long keepAliveTime: 该线程池中非核心线程闲置超时时长

一个非核心线程, 如果不干活(闲置状态)的时长超过这个参数所设定的时长, 就会被销毁掉, 如果设置allowCoreThreadTimeOut = true, 则会作用于核心线程.

  • TimeUnit unit: keepAliveTime的单位

TimeUnit是一个枚举类型, 其包括: NANOSECONDS : 1微毫秒 = 1微秒 / 1000 MICROSECONDS : 1微秒 = 1毫秒 / 1000 MILLISECONDS : 1毫秒 = 1秒 /1000 SECONDS : 秒 MINUTES : 分 HOURS : 小时 DAYS : 天

  • BlockingQueue workQueue: 该线程池中的任务队列: 维护着等待执行的Runnable对象

当所有的核心线程都在干活时, 新添加的任务会被添加到这个队列中等待处理, 如果队列满了, 则新建非核心线程执行任务.

常用的workQueue类型:

  • SynchronousQueue: 这个队列接收到任务的时候, 会直接提交给线程处理, 而不保留它, 如果所有线程都在工作怎么办?那就新建一个线程来处理这个任务!所以为了保证不出现的错误, 使用这个类型队列的时候, maximumPoolSize一般指定成Integer.MAX_VALUE, 即无限大
  • LinkedBlockingQueue: 这个队列接收到任务的时候, 如果当前线程数小于核心线程数, 则新建线程(核心线程)处理任务;如果当前线程数等于核心线程数, 则进入队列等待. 由于这个队列没有最大值限制, 即所有超过核心线程数的任务都将被添加到队列中, 这也就导致了maximumPoolSize的设定失效, 因为总线程数永远不会超过corePoolSize
  • ArrayBlockingQueue: 可以限定队列的长度, 接收到任务的时候, 如果没有达到corePoolSize的值, 则新建线程(核心线程)执行任务, 如果达到了, 则入队等候, 如果队列已满, 则新建线程(非核心线程)执行任务, 又如果总线程数到了maximumPoolSize, 并且队列也满了, 则发生错误
  • DelayQueue: 队列内元素必须实现Delayed接口, 这就意味着你传进去的任务必须先实现Delayed接口. 这个队列接收到任务时, 首先先入队, 只有达到了指定的延时时间, 才会执行任务
  • ThreadFactory threadFactory: 创建线程的方式, 这是一个接口, 你new他的时候需要实现他的Thread newThread(Runnable r)方法, 一般用不上.
  • RejectedExecutionHandler handler: 这玩意儿就是抛出异常专用的, 比如上面提到的两个错误发生了, 就会由这个handler抛出异常, 根本用不上.

# 向 ThreadPoolExecutor 添加任务

我们怎么知道new一个ThreadPoolExecutor, 大概知道各个参数是干嘛的, 可是我new完了, 怎么向线程池提交一个要执行的任务啊?

ThreadPoolExecutor.execute(Runnable command)

通过ThreadPoolExecutor.execute(Runnable command)方法即可向线程池内添加一个任务.

# ThreadPoolExecutor 的策略

这里给总结一下, 当一个任务被添加进线程池时, 执行策略:

  • 1.线程数量未达到corePoolSize, 则新建一个线程(核心线程)执行任务
  • 2.线程数量达到了corePools, 则将任务移入队列等待
  • 3.队列已满, 新建线程(非核心线程)执行任务
  • 4.队列已满, 总线程数又达到了maximumPoolSize, 就会由(RejectedExecutionHandler)抛出异常

# 常见四种线程池:

如果你不想自己写一个线程池, Java通过Executors提供了四种线程池, 这四种线程池都是直接或间接配置ThreadPoolExecutor的参数实现的.

# 1.可缓存线程池CachedThreadPool()

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

1
2
3
4
5
6

根据源码可以看出:

  1. 这种线程池内部没有核心线程, 线程的数量是有没限制的.
  2. 在创建任务时, 若有空闲的线程时则复用空闲的线程, 若没有则新建线程.
  3. 没有工作的线程(闲置状态)在超过了60S还不做事, 就会销毁.

创建方法:

ExecutorService mCachedThreadPool = Executors.newCachedThreadPool();

//开始下载
private void startDownload(final ProgressBar progressBar, final int i) {
        mCachedThreadPool.execute(new Runnable() {
            @Override
            public void run() {
                int p = 0;
                progressBar.setMax(10);//每个下载任务10秒
                while (p < 10) {
                    p++;
                    progressBar.setProgress(p);
                    Bundle bundle = new Bundle();
                    Message message = new Message();
                    bundle.putInt("p", p);
                    //把当前线程的名字用handler让textview显示出来
                    bundle.putString("ThreadName", Thread.currentThread().getName());
                    message.what = i;
                    message.setData(bundle);
                    mHandler.sendMessage(message);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
    }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

# 2.FixedThreadPool 定长线程池

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

1
2
3
4
5
6

根据源码可以看出:

  1. 该线程池的最大线程数等于核心线程数, 所以在默认情况下, 该线程池的线程不会因为闲置状态超时而被销毁.
  2. 如果当前线程数小于核心线程数, 并且也有闲置线程的时候提交了任务, 这时也不会去复用之前的闲置线程, 会创建新的线程去执行任务. 如果当前执行任务数大于了核心线程数, 大于的部分就会进入队列等待. 等着有闲置的线程来执行这个任务.

创建方法:

//nThreads => 最大线程数即maximumPoolSize
ExecutorService mFixedThreadPool= Executors.newFixedThreadPool(int nThreads);

//threadFactory => 创建线程的方法, 用得少
ExecutorService mFixedThreadPool= Executors.newFixedThreadPool(int nThreads, ThreadFactory threadFactory);

1
2
3
4
5
6
private void startDownload(final ProgressBar progressBar, final int i) {
        mFixedThreadPool.execute(new Runnable() {
            @Override
            public void run() {
               //....逻辑代码自己控制
            }
        });
    }

1
2
3
4
5
6
7
8
9

# 3.SingleThreadPool

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

1
2
3
4
5
6
7

根据源码可以看出:

  1. 有且仅有一个工作线程执行任务
  2. 所有任务按照指定顺序执行, 即遵循队列的入队出队规则

创建方法: ExecutorService mSingleThreadPool = Executors.newSingleThreadPool();

用法同上.

# 4.ScheduledThreadPool

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

//ScheduledThreadPoolExecutor():
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE,
          DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
          new DelayedWorkQueue());
}

1
2
3
4
5
6
7
8
9
10
11

根据源码可以看出: DEFAULT_KEEPALIVE_MILLIS就是默认10L, 这里就是10秒. 这个线程池有点像是吧CachedThreadPool和FixedThreadPool 结合了一下.

  1. 不仅设置了核心线程数, 最大线程数也是Integer.MAX_VALUE.
  2. 这个线程池是上述4个中为唯一个有延迟执行和周期执行任务的线程池.
//nThreads => 最大线程数即maximumPoolSize
ExecutorService mScheduledThreadPool = Executors.newScheduledThreadPool(int corePoolSize);

一般的执行任务方法和上面的都大同小异, 我们主要看看延时执行任务和周期执行任务的方法.

//表示在3秒之后开始执行我们的任务. 
mScheduledThreadPool.schedule(new Runnable() {
            @Override
            public void run() {
            //....
            }
        }, 3, TimeUnit.SECONDS);

1
2
3
4
5
6
7
8
//延迟3秒后执行任务, 从开始执行任务这个时候开始计时, 每7秒执行一次不管执行任务需要多长的时间.  
mScheduledThreadPool.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
             //....
            }
        },3, 7, TimeUnit.SECONDS);

1
2
3
4
5
6
7
8
/**延迟3秒后执行任务, 从任务完成时这个时候开始计时, 7秒后再执行, 
*再等完成后计时7秒再执行也就是说这里的循环执行任务的时间点是
*从上一个任务完成的时候. 
*/
mScheduledThreadPool.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
             //....
            }
        },3, 7, TimeUnit.SECONDS);

1
2
3
4
5
6
7
8
9
10
11

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK