50

线程池工厂类Executors的编程的艺术

 5 years ago
source link: http://www.cnblogs.com/yougewe/p/10092311.html?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Executors是一个线程池的工厂类,提供各种有用的线程池的创建,使用得当,将会使我们并发编程变得简单!今天就来聊聊这个工厂类的艺术吧!

Executors只是Executor框架的主要成员组件之一,为java的异步任务调度执行提供了重要的入口!

在说Executors之前,还需要说一下另一个Executor框架的重要成员,ThreadPoolExecutor。

ThreadPoolExecutor 实现了ExecutorService接口,提供了线程池的调度功能!成为纯种池技术的基石!

ThreadPoolExecutor 的主要构造函数如下:

    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

咱们此处不是要去分析其源码,只是想看一下怎样使用他!

从构造方法可以看出 ThreadPoolExecutor 的主要参数 7 个,在其注释上也有说明功能,咱们翻译下每个参数的功能:

    corePoolSize: 线程池核心线程数(平时保留的线程数),使用时机: 在初始时刻,每次请求进来都会创建一个线程直到达到该size
    maximumPoolSize: 线程池最大线程数,使用时机: 当workQueue都放不下时,启动新线程,直到最大线程数,此时到达线程池的极限
    keepAliveTime/unit: 超出corePoolSize数量的线程的保留时间,unit为时间单位
    workQueue: 阻塞队列,当核心线程数达到或者超出后,会先尝试将任务放入该队列由各线程自行消费;  
        ArrayBlockingQueue: 构造函数一定要传大小
        LinkedBlockingQueue: 构造函数不传大小会默认为65536(Integer.MAX_VALUE ),当大量请求任务时,容易造成 内存耗尽。
        SynchronousQueue: 同步队列,一个没有存储空间的阻塞队列 ,将任务同步交付给工作线程。
        PriorityBlockingQueue: 优先队列
    threadFactory:线程工厂,用于线程需要创建时,调用其newThread()生产新线程使用
    handler: 饱和策略,当队列已放不下任务,且创建的线程已达到 maximum 后,则不能再处理任务,直接将任务交给饱和策略
        AbortPolicy: 直接抛弃(默认)
        CallerRunsPolicy: 用调用者的线程执行任务
        DiscardOldestPolicy: 抛弃队列中最久的任务
        DiscardPolicy: 抛弃当前任务

所以,虽然参数有点多,但是其实仔细阅读以上的注释,基本就能很好地使用线程池了,不过咱们还可以通过一个流程图来说明这一切,对于喜欢看图说话的同学来说是件好事!

ve2uMjN.png!web

好了,ThreadPoolExecutor 介绍完后,什么感觉呢? 说是简单,其实还是挺麻烦的,所以,是 Executors 工厂出场了!

作为一个工厂类,Executors 简化了各种参数,只忘文生义即可明白其意思!Executors 主要提供4种类型的线程池!

1. FixedThreadPool 创建一个指定数量的线程池,可控制线程最大并发数,超出的线程会在队列中等待。

其实现方式如下:

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

使用固定线程数的worker和无界队列保存多余的task,所以创建的线程数不会超过corePoolSize,所以maxPoolSize是一个无效参数,所以keepAliveTime是一个无效参数,所以不会调用RejectedExecutionHandler.rejectedExecution()方法,即无拒绝策略可用;从而在外部表现来看,就是固定线程,在执行无限的队列!

2. SingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

其实现方式如下:

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

与 FixedThreadPool 原理类似,不过这里只会使用一个线程在运行。使用一个线程可以保证取任务时的顺序一致性,从而表现出先到先得的效果!在要求有执行顺序要求的场景,刚好派上用场!

3. CachedThreadPool, 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。

其实现方式如下:

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

使用SyncronousQueue作为线程池的工作队列,这是特殊的队列,它是个没容量的阻塞队列(至于为什么要用这个特性,据说是性能比 LinkedBlockingQueue 更高,详解他),每个插入操作必须有一个线程对应的移除操作,反之一样;当提交一个cached任务时,执行SyncronousQueue.offer(x),如果有等待的空闲线程,则匹配成功,exec立即返回;如果没有匹配,则会创建一个新线程执行任务;任务执行完后使用poll()等待keep即60秒,一直尝试从队列中获取任务,超时后自动释放线程;

使用这种线程池把握好节奏,因为看起来线程池动不动就会创建系列线程池,而且动不动就会释放线程,完全是不可控的,不可监控的。

4. ScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。

其实现如下:

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

看起来,这个在参数方面没有太多的解说,只是依赖于 ScheduledThreadPoolExecutor 的实现了!所以,我们来看一下 ScheduledThreadPoolExecutor是怎么实现的?

    public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor    {}
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

ok, 如上两个,就够了!ScheduledThreadPoolExecutor 也是继承了 ThreadPoolExecutor, 其构造方法也是依赖于父类,只是将 队列实现改为 DelayedWorkQueue,从而实现延时运行或者定期执行的效果!

而相比于 Timer 来说,ScheduledThreadPool会有更好的执行效果,因为Timer只会有一个线程来执行任务,会有局限性。而且 ScheduledThreadPool 提供其他很多的可操作方法!


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK