10

深度分析:Java并发编程之线程池技术,看完面试这个再也不慌了!

 3 years ago
source link: https://segmentfault.com/a/1190000022949387
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

线程池的好处

Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。在开发过程中,合理地使用线程池,相对于单线程串行处理(Serial Processing)和为每一个任务分配一个新线程(One Task One New Thread)的做法能够带来3个好处。

  1. 降低资源消耗 。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  2. 提高响应速度 。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  3. 提高线程的可管理性 。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。但是,要做到合理利用线程池,必须对其实现原理了如指掌。

线程池的实现原理

下面所有的介绍都是基于JDK 1.8源码。

架构设计

Java中的线程池核心实现类是ThreadPoolExecutor。这个类的设计是继承了AbstractExecutorService抽象类和实现了ExecutorService,Executor两个接口,关系大致如下图所示:

rQJjE3B.png!web

下面将从顶向下逐个介绍这个4个接口与类。

Executor

顶层接口Executor提供了一种将 任务提交每个任务的执行机制 (包括线程使用的细节以及线程调度等)解耦分开的方法。使用Executor可以避免显式的创建线程。例如,对于一系列的任务,你可能会使用下列这种方式来代替 new Thread(new(RunnableTask())).start() 的方式:

Executor executor = anExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());

Executor接口提供了一个接口方法,用来在未来的某段时间执行指定的任务。指定的任务

  1. 可能由一个新创建的线程执行;
  2. 可能由一个线程池中空闲的线程执行;
  3. 也可能由方法的调用线程执行。

这些可能执行方式都取决于Executor接口实现类的设计或实现方式。

public interface Executor {
    void execute(Runnable command);
}

Serial Processing

事实上,Executor接口并没有严格的要求线程的执行需要异步进行。 最简单的接口实现方法是,将所有的任务以调用方法的线程执行。

class DirectExecutor implements Executor {
   public void execute(Runnable r) {
     r.run();
   }
}

这种实际上就是上面提到的Serial Processing的方式。假设,我们现在以这种方式去实现一个响应请求的服务器应用。那么,这种实现方式虽然在理论上是正确的。

  1. 但是其性能却非常差,因为它每次只能响应处理一个请求。如果有大量请求则只能串行响应。
  2. 同时,如果服务器响应逻辑里面有文件I/O或者数据库操作,服务器需要等待这些操作完成才能继续执行。这个时候如果阻塞的时间过长,服务器资源利用率就很低。这样,在等待过程中,服务器CPU将处于空闲状态。

综上,这种Serial Processing的方式方式就会有 无法快速响应问题低吞吐率 问题。

One Task One New Thread

不过,更典型的实现方式是,任务由一些其他的线程执行而不是方法调用的线程执行。例如,下面的Executor的实现方法是对于每一个任务都新建一个线程去执行。

class ThreadPerTaskExecutor implements Executor {
   public void execute(Runnable r) {
     new Thread(r).start();
   }
}

这种方式实际上就是上面提到的One Task One New Thread的方式,这种无限创建线程的方法也有很多问题。

  1. 线程生命周期的开销非常高 。如果有大量任务需要执行,那么就需要创建大量线程。这样就会造成线程生命周期的创建和销毁的开销非常大。
  2. 资源消耗 。活跃的线程会消耗系统资源,尤其是内存。如果,已经有足够多的线程使所有的CPU保持忙碌状态,那么在创建更多的线程反而会降低性能。最简单的例子是,一个4核的CPU机器,对于100个任务创建100个线程去执行。
  3. 稳定性 。可创建线程的数量上存在一个限制。这个限制受JVM启动参数,栈大小以及底层操作系统对线程的限制等因素。超过了这个限制,就可能抛出OutOfMemoryError异常。

ExecutorService

ExecutorService接口是继承自Executor接口,并增加了一些接口方法。接口也可以继承?以前没注意,现在学习到了。这里介绍下 接口继承的语义

  1. 接口Executor有execute(Runnable)方法,接口ExecutorService继承Executor,不用复写Executor的方法。只需要,写自己的方法(业务)即可。
  2. 当一个类ThreadPoolExecutor要实现ExecutorService接口的时候,需要实现ExecutorService和Executor两个接口的方法。

ExecutorService大致新增了2类接口方法:

  1. ExecutorService的关闭方法。对于线程池实现,这些方法的具体实现在ThreadPoolExecutor里面。
  2. 扩充异步执行任务的方法。对于线程池实现,用的这类方法都是AbstractExecutorService抽象类里面实现的模板方法。

I3u2Ezy.png!web

AbstractExecutorService

抽象类AbstractExecutorService提供了ExecutorService接口类中各种submit异步执行方法的实现,这些方法与Executor.execute(Runnable)相比,它们都是有返回值的。同时,这些方法的实现的最终都是调用ThreadPoolExecutor类中实现的execute(Runnable)方法。

尽管说submit方法能提供线程执行的返回值,但只有实现了Callable才会有返回值,而实现Runnable的返回值是null。

public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

除此之外,这个抽象类中还有ExecutorService接口类中invokeAny和invokeAll方法的实现。这里就只是简单介绍下这2个种方法的语义。

invokeAny

  1. invokeAny() 接收一个包含 Callable 对象的集合 作为参数。调用该方法不会返回 Future 对象,而是返回集合中 某一个Callable对象的运行结果
  2. 这个方法没法保证调用之后返回的结果是哪一个Callable,只知道它是这些 Callable 中一个执行结束的Callable 对象。

invokeAll

  1. invokeAll接受一个包含 Callable 对象的集合 作为参数。调用该方法会返回一个Future 对象的列表,对应输入的Callable 对象的集合的运行结果。
  2. 这里 提交的任务容器列表和返回的Future列表存在顺序对应的关系

ThreadPoolExecutor

execute(Runnable)方法

线程池是如何执行输入的任务,这个整个线程池实现的核心逻辑,我们从这个方法开始学习。其代码如下所示:

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

可以发现,当提交一个新任务到线程池时,线程池的处理流程如下:

  1. 判断线程池中工作的线程数是否小于核心线程数(corePoolSize)。如果是,则新建一个新的工作线程来执行任务(需要获取全局锁)。否则,进入下个流程。
  2. 判断线程池的工作队列(BlockingQeue)是否已满。如果未满,将新加的任务存储在工作队列中。否则,进入下个流程。
  3. 判断线程池中工作的线程数是否小于最大线程数(maximumPoolSize)。如果小于,则新建一个工作线程来执行任务(需要获取全局锁)。
  4. 如果大于或者等于,则交给饱和策略处理这个任务。

新提交任务处理流程图

以流程图来说明的话,线程池处理一个新提交的任务的流程如下图所示:

qYVRRrm.png!web

ThreadPoolExecutor执行示意图

bimmm2N.png!web

从上面的内容,我们可以发现线程池对于一个新任务有4种处理的可能,分别对应于上面处理流程的4个步骤。

ThreadPoolExecutor采取上述步骤的总体设计思路,是为了在执行execute()方法时,尽可能地 避免获取全局锁 (那将会是一个严重的可伸缩瓶颈)。在ThreadPoolExecutor 完成预热之后(当前运行的线程数大于等于corePoolSize ),几乎所有的execute()方法调用都是执行步骤2,而步骤2不需要获取全局锁。

工作线程

从上面execute(Runnable)的代码我们可以发现,线程池创建线程时,会将线程封装成工作线程Worker,Worker在执行完任务后,还会循环获取工作队列里的任务来执行。

ThreadPoolExecutor中线程执行任务的示意图如下所示:

MBVN7fu.png!web

线程池中的线程执行任务分两种情况:

  1. 在execute()方法中创建一个线程时,会让这个线程执行当前任务。
  2. 这个线程执行完上图中1的任务后,会反复从BlockingQueue获取任务来执行。

ThreadPoolExecutor的ctl变量

ctl 是一个 AtomicInteger 的类,保存的 int 变量的更新都是原子操作,保证线程安全。它的 前面3位用来表示线程池状态,后面29位用来表示工程线程数量

ThreadPoolExecutor的状态

线程池的状态有5种:

  1. Running :线程池处在Running的状态时, 能够接收新任务,以及对已添加的任务进行处理 。线程池的初始化状态是RUNNING。换句话说, 线程池被一旦被创建,就处于Running状态

    ,并且线程池中的任务数为0。

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

  2. Shutdown : 线程池处在SHUTDOWN状态时, 不接收新任务,但能处理已添加(正在运行的以及在BlockingQueue)的任务 。调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN。
  3. Stop : 线程池处在STOP状态时, 不接收新任务,不处理已添加的任务,并且会中断正在运行的任务 。 调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP。
  4. Tidying : 当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为Tidying状态。当线程池变为Tidying状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为Tidying时,进行相应的处理;可以通过重载terminated()函数来实现。
  5. Terminated : 线程池彻底终止,就变成Terminated状态。 线程池处在Tidying状态时,执行完terminated()之后,就会由 Tidying -> Terminated。

QN7rQfM.png!web

线程池的使用

线程池的创建

我们可以通过ThreadPoolExecutor的构造函数来创建一个线程池。

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
  1. corePoolSize(线程池的核心线程数):线程池要保持的线程数目,即使是他们是空闲也不会停止。 当提交一个任务到线程池时, 线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建 。如果调用了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有基本线程。
  2. maximumPoolSize(线程池的最大线程数): 线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是,如果使用了 无界的任务队列这个参数就没什么效果
  3. keepAliveTime(线程活动保持时间): 当线程池中的线程数大于corePoolSize时, keepAliveTime为多余的空闲线程等待新任务的最长保持存活的时间 。所以,如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程的利用率。
  4. unit(线程活动保持时间的单位) : 可选的单位有天(DAYS)、小时(HOURS)、分钟(MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和纳秒(NANOSECONDS,千分之一微秒)。
  5. runnableTaskQueue(任务队列):用于 保存等待执行的任务的阻塞队列 。可以选择以下几个阻塞队列。
  • ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按FIFO(先进先出)原则对元素进行排序。
  • LinkedBlockingQueue:一个基于链表结构的无界阻塞队列,此队列按FIFO排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
  • SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于Linked-BlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
  • PriorityBlockingQueue:一个具有优先级的无限阻塞队列。
  1. ThreadFactory: 用于设置创建线程的工厂 ,可以通过线程工厂给每个创建出来的线程设置更有意义的名字。
  2. RejectedExecutionHandler(饱和策略):当ThreadPoolExecutor已经关闭或ThreadPoolExecutor已经饱和 时(达到了最大线程池大小且工作队列已满),execute()方法将要调用的Handler,那么必须采取 一种策略处理提交的新任务 。这个策略默认情况下是AbortPolicy。Java线程池框架提供了以下4种策略:

    • AbortPolicy:直接抛出异常
    • CallerRunsPolicy:只用调用者所在线程来运行任务
    • DiscardOldestPolicy:丢弃队列里最老的一个任务,并执行当前任务
    • DiscardPolicy:不处理,丢弃掉

常用ThreadPoolExecutor

通过Executor框架的工具类Executors,可以创建以下3种类型的ThreadPoolExecutor。通过源码可以发现这3种线程池的本质都是不同输入参数配置的ThreadPoolExecutor。

FixedThreadPool

FixedThreadPool被称为 可重用固定线程数的线程池 。下面是FixedThreadPool的源代码实现。

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

注意到,

  1. FixedThreadPool的corePoolSize和maximumPoolSize都被设置为创建时的同一个指定的参数nThreads。
  2. 任务阻塞队列使用的是无界队列new LinkedBlockingQueue()。
  3. keepAliveTime设置为0。
  4. ThreadFactory和RejectedExecutionHandler皆使用的默认值。

FixedThreadPool的execute()方法的运行示意图如下所示:

maEvQji.png!web

其运行说明:

  1. 如果当前运行的线程数少于corePoolSize,则创建新线程来执行任务。
  2. 在线程池完成预热之后(当前运行的线程数等于corePoolSize),将任务加入LinkedBlockingQueue。
  3. 线程执行完1中的任务后,会在循环中反复从LinkedBlockingQueue获取任务来执行。

FixedThreadPool使用无界队列LinkedBlockingQueue作为线程池的工作队列(队列的容量为Integer.MAX_VALUE)对线程池会带来如下影响:

  1. 当线程池中的线程数达到corePoolSize后,新任务将在无界队列中等待。由于无界队列永远不会满,因此线程池中的线程数不会超过corePoolSize。
  2. 由于1,使用无界队列时maximumPoolSize将是一个无效参数。
  3. 由于1和2,使用无界队列时keepAliveTime将是一个无效参数。不会有超过corePoolSize的线程数目。
  4. 由于使用无界队列。运行中的FixedThreadPool(未执行方法shutdown()或shutdownNow())不会拒绝任务(不会调用RejectedExecutionHandler.rejectedExecution方法)。

SingleThreadExecutor

SingleThreadExecutor是使用单个worker线程的Executor。SingleThreadExecutor与FixedThreadPool类似,只是它的corePoolSize和maximumPoolSize被设置为1。下面是SingleThreadExecutor的源代码实现。

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

CachedThreadPool

CachedThreadPool是一个会根据需要创建新线程的线程池。下面是创建CachedThread-Pool的源代码。

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

注意到:

  1. CachedThreadPool的corePoolSize被设置为0,即corePool为空;maximumPoolSize被设置为 Integer.MAX_VALUE,即maximumPool是无界的。
  2. keepAliveTime设置为60L,意味着CachedThreadPool中的空闲线程等待新任务的最长时间为60秒,空闲线程超过60秒后将会被终止。
  3. CachedThreadPool使用没有容量的SynchronousQueue作为线程池的工作队列,但CachedThreadPool的maximumPool是无界的。这意味着,如果主线程提交任务的速度高于maximumPool中线程处理任务的速度时,CachedThreadPool会不断创建新线程。极端情况下,CachedThreadPool会因为创建过多线程而耗尽CPU和内存资源。

CacheThreadPool的execute()方法的执行过程如下图所示:

V3EFRzN.png!web

其执行过程的说明如下:

  1. 首先执行SynchronousQueue.offer(Runnable task)。如果当前maximumPool中有空闲线程正在执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),那么主线程执行offer操作与空闲线程执行的poll操作配对成功,主线程把任务交给空闲线程执行;否则执行下面的步骤2。
  2. 当初始maximumPool为空,或者maximumPool中当前没有空闲线程时,将没有线程执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。这种情况下,CachedThreadPool将会创建一个新线程执行任务。
  3. 步骤2中新创建的线程将任务执行完后,会执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。这个poll操作会让空闲线程最多在SynchronousQueue中等待60秒钟。如果60秒钟内主线程提交了一个新任务(主线程执行步骤1),那么这个空闲线程将执行主线程提交的新任务;否则,这个空闲线程将终止。由于空闲60秒的空闲线程会被终止,因此长时间保持空闲的CachedThreadPool不会使用任何资源。

向线程池提交任务

可以使用两个方法向线程池提交任务,分别为execute()和submit()方法。

  1. execute()方法用于提交 不需要返回值的任务 ,所以无法判断任务是否被线程池执行成功。一般execute()方法输入的任务是一个Runnable类的实例。
  2. submit()方法用于提交 需要返回值的任务 。线程池会返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功,并且可以通过 future的get()方法来获取返回值get()方法会阻塞当前线程直到任务完成 ,而使用get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。

关闭线程池

可以通过调用线程池的shutdown或者shutdownNow方法来关闭线程池。它们的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止。但是它们存在一定的区别。

  1. shutdown首先将线程池的状态设置成SHUTDOWN。然后 阻止新提交的任务 ,对于新提交的任务,如果测试到状态不为RUNNING,则抛出rejectedExecution 。对于 已经提交 (正在运行的以及在任务队列中的) 任务不会产生任何影响 。同时会将那些 闲置的线程(idleWorkers)进行中断
  2. shutdownNow首先将线程池的状态设置成STOP。然后 阻止新提交的任务 ,对于新提交的任务,如果测试到状态不为RUNNING,则抛出rejectedExecution 同时会 中断当前正在运行的线程 。另外它还将BolckingQueue中的任务给移除,并将 这些任务添加到列表中进行返回

线程池的监控

可以通过线程池提供的参数进行监控,在监控线程池的时候可以使用以下属性:

  1. taskCount :线程池需要执行的任务数量。
  2. completedTaskCount :线程池在运行过程中已完成的任务数量,小于或等于taskCount。
  3. largestPoolSize :线程池里曾经创建过的最大线程数量。通过这个数据可以知道线程池是 否曾经满过。如该数值等于线程池的最大大小,则表示线程池曾经满过。
  4. getPoolSize :线程池的线程数量。如果线程池不销毁的话,线程池里的线程不会自动销 毁,所以这个大小只增不减。
  5. getActiveCount :获取活动的线程数。

另外, 通过扩展线程池进行监控 。可以通过继承线程池来自定义线程池,重写线程池的beforeExecute、afterExecute和terminated方法,也可以在任务执行前、执行后和线程池关闭前执行一些代码来进行监控。例如,监控任务的平均执行时间、最大执行时间和最小执行时间等。这几个方法在线程池里是空方法。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK