0

ThreadPoolExecutor——高效处理并发任务的必备良器

 1 year ago
source link: https://zxs.io/article/1916
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

ThreadPoolExecutor——高效处理并发任务的必备良器 | XINDOO

@TOC
  ThreadPoolExecutor是Java concurrent中用于管理线程池的类,它是Executor框架的一个实现。线程池是一种提高应用程序性能和可靠性的技术,它将多个任务分配给多个线程执行,从而实现并发处理。ThreadPoolExecutor提供了一种灵活的方式来管理线程池,可以控制线程池的大小、阻塞队列的大小、线程池的状态、线程的创建和销毁等。使用ThreadPoolExecutor可以帮助我们避免线程创建和销毁的开销,提高应用程序的性能和可伸缩性。具体来说,它有以下这些优点:

  • 复用线程:线程池可以重用已经创建的线程,避免了线程创建和销毁的开销。
  • 控制线程数量:线程池可以控制线程的数量,避免了线程数量过多或过少的问题。
  • 提高响应速度:线程池可以提高应用程序的响应速度,因为线程可以立即执行任务,而不需要等待线程创建和启动。
  • 提高可伸缩性:线程池可以提高应用程序的可伸缩性,因为它可以自动调整线程的数量,以适应不同的工作负载。
  • 提高可靠性:线程池可以提高应用程序的可靠性,因为它可以避免线程崩溃或死锁的问题,从而保证应用程序的稳定性。

  总之,ThreadPoolExecutor在多线程开发中是绕不开的一个类,用的好它可以显著提升代码的性能,用不好就有可能代理一些其他的问题,比如我曾经见过因错误设置阻塞队列大小,导致严重的业务故障。这也是阿里巴巴Java开发手册中不允许用Executors去创建线程池的原因。

  接下来,我们通过一个完整的代码示例来看下ThreadPoolExecutor具体如何使用:

复制
import java.util.concurrent.*;

public class ThreadPoolExecutorExample {

    public static void main(String[] args) {
        int corePoolSize = 2;
        int maximumPoolSize = 4;
        long keepAliveTime = 10;
        TimeUnit unit = TimeUnit.SECONDS;
        BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
        ThreadFactory threadFactory = Executors.defaultThreadFactory();
        RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();

        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                unit,
                workQueue,
                threadFactory,
                handler
        );

        for (int i = 1; i <= 10; i++) {
            executor.execute(new Task(i));
        }

        executor.shutdown();
    }

    static class Task implements Runnable {
        private int taskId;

        public Task(int taskId) {
            this.taskId = taskId;
        }

        @Override
        public void run() {
            System.out.println("Task #" + taskId + " is running on " + Thread.currentThread().getName());
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Task #" + taskId + " is completed on " + Thread.currentThread().getName());
        }
    }
}

  上面代码也很简单,就是构造了一个线程,让其多线程打印一些信息。从上面代码来看,它构造方法一共有7个参数,其中keepAliveTime和timeUnit其实是搭配使用的。我们来看下具体每个参数的含义和作用:

  • corePoolSize:线程池的核心线程数。当有新的任务提交到线程池时,如果当前线程池中的线程数小于corePoolSize,那么线程池会创建新的线程来执行任务。如果当前线程池中的线程数大于或等于corePoolSize,那么线程池会将任务添加到阻塞队列中等待执行。默认值为1。

  • maximumPoolSize:线程池的最大线程数。如果阻塞队列已满,且当前线程池中的线程数小于maximumPoolSize,那么线程池会创建新的线程来执行任务。如果当前线程池中的线程数已经达到maximumPoolSize,那么线程池会根据拒绝策略来处理新的任务。默认值为Integer.MAX_VALUE。

  • keepAliveTime和timeUnit:线程的空闲时间。当线程池中的线程数大于corePoolSize时,如果一个线程在keepAliveTime后没有执行任何任务,那么该线程将被终止。默认值为0,表示线程池中的所有线程都是长期存活的。

  • workQueue:阻塞队列。当线程池中的线程数已经达到corePoolSize时,新的任务将被添加到阻塞队列中等待执行。ThreadPoolExecutor提供了多种类型的阻塞队列,包括SynchronousQueue、LinkedBlockingQueue和ArrayBlockingQueue等。默认值为LinkedBlockingQueue。

  • threadFactory:线程工厂。用于创建新的线程。如果没有指定线程工厂,ThreadPoolExecutor将使用默认的线程工厂来创建线程。默认值为DefaultThreadFactory。

  • handler:拒绝策略。当阻塞队列已满,且当前线程池中的线程数已经达到maximumPoolSize时,ThreadPoolExecutor会根据指定的拒绝策略来处理新的任务。ThreadPoolExecutor提供了多种拒绝策略,包括AbortPolicy、CallerRunsPolicy、DiscardOldestPolicy和DiscardPolicy等。默认值为AbortPolicy。

  在不同的应用场景下,我们可能需要调整这些参数,以提高应用程序的性能和可伸缩性。例如,可以通过增加corePoolSize和maximumPoolSize来提高线程池的并发能力,或者通过增加阻塞队列的大小来提高线程池的缓冲能力。同时,也可以通过指定不同的拒绝策略来处理新的任务,以避免任务丢失或应用程序崩溃的问题。

线程池状态

  ThreadPoolExecutor在实际使用中有四种状态,分别是RUNNING、SHUTDOWN、STOP和TERMINATED。

  • RUNNING:线程池正在运行。在这个状态下,线程池可以接受新的任务,并且会将任务添加到阻塞队列中等待执行,或者直接创建新的线程来执行任务。

  • SHUTDOWN: 线程池正在关闭。在这个状态下,线程池不会接受新的任务,但会将阻塞队列中的任务继续执行完毕。如果有新的任务提交到线程池,那么线程池会拒绝这些任务并抛出RejectedExecutionException异常。

  • STOP: 线程池已经停止。在这个状态下,线程池不会接受新的任务,并且会中断正在执行的任务。如果有新的任务提交到线程池,那么线程池会拒绝这些任务并抛出RejectedExecutionException异常。

  • TERMINATED: 线程池已经终止。在这个状态下,线程池中的所有任务都已经执行完毕,并且所有的线程都已经被销毁。如果需要重新使用线程池,那么需要重新创建一个新的线程池。

  通过合理地控制线程池的状态,可以避免任务丢失或线程池崩溃的问题,并且可以提高应用程序的性能和可靠性。例如,在应用程序关闭时,可以先将线程池的状态设置为SHUTDOWN,等待线程池中的所有任务执行完毕后再将状态设置为STOP,最终将线程池状态设置为TERMINATED,以确保线程池中的所有任务都能够得到执行。

线程池执行任务的过程

  ThreadPoolExecutor如何执行任务涉及到任务的提交、执行、取消和完成等多个方面。

  • 任务的提交:可以通过execute()方法将任务提交到线程池中,execute()方法会将任务添加到阻塞队列中等待执行。也可以通过submit()方法将任务提交到线程池中,submit()方法会返回一个Future对象,可以用于获取任务的执行结果。

  • 任务的执行:线程池会从阻塞队列中取出任务,并将任务分配给空闲的线程执行。如果当前线程池中的线程数小于corePoolSize,那么线程池会创建新的线程来执行任务。如果当前线程池中的线程数已经达到corePoolSize,那么线程池会将任务添加到阻塞队列中等待执行。如果阻塞队列已满,且当前线程池中的线程数小于maximumPoolSize,那么线程池会创建新的线程来执行任务。如果当前线程池中的线程数已经达到maximumPoolSize,那么线程池会根据拒绝策略来处理新的任务。

  • 任务的取消:可以通过cancel()方法将任务从阻塞队列中移除,如果任务还没有开始执行,那么任务将被取消。如果任务已经在执行,那么可以通过interrupt()方法中断任务的执行。

  • 任务的完成:可以通过Future对象来获取任务的执行结果,也可以通过isDone()方法来判断任务是否已经执行完毕。当任务执行完毕后,线程池会将任务从阻塞队列中移除,并将线程返回到线程池中等待下一个任务的执行。

  通过合理地控制任务的提交、执行、取消和完成等方面的内容,可以提高线程池的性能和可靠性,避免任务丢失或线程池崩溃的问题。例如,在提交任务时,可以根据任务的类型和优先级来选择合适的阻塞队列,以确保任务能够得到及时执行。在取消任务时,可以先使用isCancelled()方法来判断任务是否已经被取消,以避免重复取消任务的问题。在获取任务的执行结果时,可以使用get()方法来等待任务的执行结果,或者使用get(timeout, unit)方法来设置超时时间,以避免任务执行时间过长导致线程池阻塞的问题。

  ThreadPoolExecutor提供了多种类型的阻塞队列,用于存储等待执行的任务。不同类型的阻塞队列有不同的特点和适用场景。接下来介绍ThreadPoolExecutor的三种常用阻塞队列:

  • SynchronousQueue:同步队列。SynchronousQueue是一个没有容量的阻塞队列,它的作用是将任务直接交给线程来执行,而不是先将任务存储在队列中等待执行。如果当前没有空闲的线程来执行任务,那么SynchronousQueue会阻塞任务的提交,直到有线程空闲为止。SynchronousQueue适用于任务执行时间短、任务量大的场景,可以避免任务在队列中等待的时间,提高线程池的响应速度。

  • LinkedBlockingQueue:链表阻塞队列。LinkedBlockingQueue是一个有容量的阻塞队列,它的作用是将任务存储在队列中等待执行。如果队列已满,那么新的任务将被阻塞,直到队列中有空闲位置为止。LinkedBlockingQueue适用于任务执行时间长、任务量大的场景,可以避免任务在线程池中等待执行的时间过长,提高线程池的缓冲能力。

  • ArrayBlockingQueue:数组阻塞队列。ArrayBlockingQueue是一个有容量的阻塞队列,它的作用和LinkedBlockingQueue类似,但是它是一个基于数组的队列,而不是基于链表的队列。ArrayBlockingQueue适用于任务执行时间长、任务量大的场景,可以避免任务在线程池中等待执行的时间过长,提高线程池的缓冲能力。与LinkedBlockingQueue相比,ArrayBlockingQueue的吞吐量更高,但是它的性能可能会受到数组大小的限制。

  通过合理地选择不同类型的阻塞队列,可以根据应用程序的需求来提高线程池的性能和可靠性。例如,对于任务执行时间短、任务量大的场景,可以选择SynchronousQueue来避免任务在队列中等待的时间;对于任务执行时间长、任务量大的场景,可以选择LinkedBlockingQueue或ArrayBlockingQueue来提高线程池的缓冲能力。同时,也可以通过调整阻塞队列的大小来控制线程池的缓冲能力,以适应不同的工作负载。

  ThreadPoolExecutor的拒绝策略用于处理新的任务提交到线程池时,如果线程池已经达到最大线程数和阻塞队列已满的情况下,线程池应该如何处理这些新的任务。ThreadPoolExecutor提供了四种常用的拒绝策略:

  • AbortPolicy:抛出RejectedExecutionException异常。这是ThreadPoolExecutor的默认拒绝策略,如果线程池已经达到最大线程数和阻塞队列已满的情况下,新的任务将被拒绝并抛出异常。

  • CallerRunsPolicy: 由提交任务的线程来执行任务。如果线程池已经达到最大线程数和阻塞队列已满的情况下,新的任务将被提交到线程池的调用线程中执行。这种策略可以避免任务丢失,但是会降低线程池的吞吐量。

  • DiscardOldestPolicy:丢弃最老的任务。如果线程池已经达到最大线程数和阻塞队列已满的情况下,新的任务将被丢弃,而不是抛出异常或者执行任务。这种策略可以避免线程池阻塞,但是可能会丢失一些重要的任务。

  • DiscardPolicy:直接丢弃任务。如果线程池已经达到最大线程数和阻塞队列已满的情况下,新的任务将被直接丢弃,而不是抛出异常或者执行任务。这种策略可以避免线程池阻塞,但是会丢失所有的任务。

  通过合理地选择不同类型的拒绝策略,可以根据应用程序的需求来处理新的任务提交到线程池时的情况。例如,对于重要的任务,可以选择CallerRunsPolicy来确保任务能够得到执行;对于不重要的任务,可以选择DiscardPolicy来避免线程池阻塞。同时,也可以通过实现RejectedExecutionHandler接口来自定义拒绝策略,例如我们可以丢下任务前将其记录下:

复制
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

public class DiscardOldestWithKafkaRejectedExecutionHandler implements RejectedExecutionHandler {

    private String kafkaTopic;

    public DiscardOldestWithKafkaRejectedExecutionHandler(String kafkaTopic) {
        this.kafkaTopic = kafkaTopic;
    }

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        // 获取最早的未执行任务
        Runnable earliestTask = executor.getQueue().peek();
        if (earliestTask != null) {
            // 将最早的未执行任务发到kafka里
            KafkaProducer.send(kafkaTopic, earliestTask.toString());
        }
        // 将当前任务提交到线程池
        executor.execute(r);
    }
}

  在这个示例中,我们实现了一个DiscardOldestWithKafkaRejectedExecutionHandler,它继承了RejectedExecutionHandler接口,并重写了rejectedExecution()方法。它的作用是当线程池中的任务队列和线程池都满了,并且新的任务被拒绝时,这个方法会将最早未运行的任务拿出来,丢弃到kafka中,然后提交当前的任务。

线程池的监控和调优

  线程池稳定和高效的运行也是非常重要的,要想它稳定高效运行,就离不开监控和调优。下面给出一些监控和调优ThreadPoolExecutor的方法论:

  • 监控指标: 可以通过ThreadPoolExecutor提供的一些监控指标来了解线程池的状态和性能,例如线程池大小、活跃线程数、任务队列大小、已完成任务数、拒绝任务数等。可以通过调用ThreadPoolExecutor的getPoolSize()、getActiveCount()、getQueueSize()、getCompletedTaskCount()、getRejectedExecutionCount()等方法来获取这些监控指标,以便及时发现和解决线程池的问题。

  • 调优策略: 可以通过调整线程池的参数来优化线程池的性能和可靠性,例如调整corePoolSize和maximumPoolSize来提高线程池的并发能力,调整keepAliveTime来控制线程的空闲时间,调整阻塞队列的大小来提高线程池的缓冲能力,选择合适的拒绝策略来处理新的任务提交时的情况等。同时,也可以通过监控指标来实时调整线程池的参数,以适应不同的工作负载。

  • 线程池的优化: 可以通过线程池的优化来提高线程池的性能和可靠性,例如使用线程池前先评估任务的类型和优先级,选择合适的阻塞队列和拒绝策略,避免任务的等待和丢失;使用线程池时避免过度提交任务,控制任务的数量和质量;使用线程池时避免长时间的空闲或者过度使用线程池,以避免资源浪费和线程池崩溃的问题。

  通过合理地监控和调优ThreadPoolExecutor,可以提高线程池的性能和可靠性,避免任务丢失或线程池崩溃的问题,并且可以提高应用程序的性能和可靠性。

  使用ThreadPoolExecutor的最佳实践涉及到线程池的参数设置、任务处理、异常处理等多个方面。下面介绍一些使用ThreadPoolExecutor的最佳实践:

  • 线程池参数设置:在创建ThreadPoolExecutor时,需要根据应用程序的需求来设置线程池的参数,例如corePoolSize、maximumPoolSize、keepAliveTime、阻塞队列类型和大小、拒绝策略等。可以根据任务的类型和优先级来选择合适的阻塞队列和拒绝策略,以避免任务的等待和丢失。

  • 任务处理:在提交任务时,需要根据任务的类型和优先级来选择合适的提交方式,例如使用execute()方法或submit()方法提交任务。同时,还需要注意任务的异常处理,可以通过实现UncaughtExceptionHandler接口来自定义异常处理方式,以避免任务异常导致线程池崩溃的问题。

  • 线程池的关闭:在关闭线程池时,需要注意线程池的状态和任务的处理。可以通过调用shutdown()方法或shutdownNow()方法来关闭线程池,前者会等待所有任务执行完毕再关闭线程池,后者会立即关闭线程池并中断所有任务的执行。同时,还需要注意线程池的状态,可以通过isShutdown()方法和isTerminated()方法来判断线程池是否已经关闭。

  • 线程池的监控和调优:在使用ThreadPoolExecutor时,需要及时监控线程池的状态和性能,以便及时发现和解决线程池的问题。可以通过监控指标和调优策略来优化线程池的性能和可靠性,例如调整线程池的参数、选择合适的阻塞队列和拒绝策略、避免任务的等待和丢失等。

  通过遵循ThreadPoolExecutor的最佳实践,可以提高线程池的性能和可靠性,避免任务丢失或线程池崩溃的问题,并且可以提高应用程序的性能和可靠性。

  ThreadPoolExecutor是Java中用于管理线程池的一个类,它能够创建和管理线程池,以提高应用程序的性能和可靠性。它具有灵活的线程池管理、高效的任务处理和可靠的异常处理等特点。尤其适用于任务量大、执行时间长、任务类型多样的应用场景,例如Web服务器、数据库连接池、文件处理等。通过合理地设置线程池参数、处理任务和异常、监控和调优线程池,可以提高应用程序的性能和可靠性,避免任务丢失或线程池崩溃的问题。

  为了充分利用ThreadPoolExecutor提高应用程序的性能和可靠性,我们可以采取以下措施:

  1. 合理设置线程池参数,例如corePoolSize(核心线程数)、maximumPoolSize(最大线程数)、keepAliveTime(线程空闲时间)、阻塞队列类型和大小、拒绝策略等,以适应不同的工作负载和应用场景。
  2. 处理任务和异常,例如选择合适的任务提交方式(如execute或submit),实现UncaughtExceptionHandler接口来自定义异常处理方式,以避免任务异常导致线程池崩溃的问题。
  3. 监控和调优线程池,例如及时监控线程池的状态和性能、调整线程池的参数、选择合适的阻塞队列和拒绝策略、避免任务的等待和丢失等,以提高线程池的性能和可靠性。

  通过以上措施,我们可以充分发挥ThreadPoolExecutor的优势,提高应用程序的性能和可靠性,避免任务丢失或线程池崩溃的问题。同时,这也有助于我们在面临大量任务、长时间执行和多种任务类型的应用场景时,更好地应对挑战,实现高效、稳定的应用程序运行。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK