5

java线程池趣味事:这不是线程池

 3 years ago
source link: http://www.cnblogs.com/yougewe/p/14421826.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

要想写出高性能高并发的应用,自然有许多关键,如io,算法,异步,语言特性,操作系统特性,队列,内存,cpu,分布式,网络,数据结构,高性能组件。

胡说一通先。

回到主题,线程池。如果说多线程是提高系统并发能力利器之一,那么线程池就是让这个利器更容易控制的一种工具。如果我们自己纯粹使用多线程基础特性编写,那么,必然需要相当老道的经验,才能够驾驭复杂的环境。而线程池则不需要,你只需知道如何使用,即可轻松掌控多线程,安全地为你服务。

1. 常见线程池的应用样例

线程池,不说本身很简单,但应用一定是简单的。

线程池有许多的实现,但我们只说 ThreadPoolExecutor 版本,因其应用最广泛,别无其他。当然了,还有一个定时调度线程池 ScheduledThreadPoolExecutor 另说,因其需求场景不同,无法比较。

下面,我就几个应用级别,说明下我们如何快速使用线程池。(走走过场而已,无关其他)

1.1. 初级线程池

初级版本的使用线程池,只需要借助一个工具类即可: Executors . 它提供了许多静态方法,你只需随便选一个就可以使用线程池了。比如:

// 创建固定数量的线程池
Executors.newFixedThreadPool(8);
// 创建无限动态创建的线程池
Executors.newCachedThreadPool();
// 创建定时调度线程池
Executors.newScheduledThreadPool(2);
// 还有个创建单线程的就不说了,都一样

使用上面这些方法创建好的线程池,直接调用其 execute() 或者 submit() 方法,就可以实现多线程编程了。没毛病!

1.2. 中级线程池

我这里所说的中级,实际就是不使用以上超级简单方式使用线程池的方式。即你已经知道了 ThreadPoolExecutor 这个东东了。这不管你的出发点是啥!

// 自定义各线程参数
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4, 20, 20, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());

具体参数解释就不说了,咱们不扫盲。总之,使用这玩意儿,说明你已经开始有点门道了。

1.3. 高级线程池

实际上,这个版本就没法具体说如何做了。

但它可能是,你知道你的线程池应用场景的,你清楚你的硬件运行环境的,你会使用线程池命名的,你会定义你的队列大小的,你会考虑上下文切换的,你会考虑线程安全的,你会考虑锁性能的,你可能会自己造个轮子的。。。

2. 这不是线程池

我们通常理解的线程池,就是能够同时跑多个任务的地方。但有时候线程池不一像线程池,而像一个单线程。来看一个具体的简单的线程池的使用场景:

    // 初始化线程池
    private ExecutorService executor
            = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
                Runtime.getRuntime().availableProcessors(),
                0L, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(50),
                new NamedThreadFactory("test-pool"),
                new ThreadPoolExecutor.CallerRunsPolicy());
    // 使用线程池处理任务
    public Integer doTask(String updateIntervalDesc) throws Exception {
        long startTime = System.currentTimeMillis();
        List<TestDto> testList;
        AtomicInteger affectNum = new AtomicInteger(0);
        int pageSize = 1000;
        AtomicInteger pageNo = new AtomicInteger(1);
        Map<String, Object> condGroupLabel = new HashMap<>();
        log.info("start do sth:{}", updateIntervalDesc);
        List<Future<?>> futureList = new ArrayList<>();
        do {
            PageHelper.startPage(pageNo.getAndIncrement(), pageSize);
            List<TestDto> list
                    = testDao.getLabelListNew(condGroupLabel);
            testList = list;
            // 循环向线程池中提交任务
            for (TestDto s : list) {
                Future<?> future = executor.submit(() -> {
                    try {
                        // do sth...
                        affectNum.incrementAndGet();
                    }
                    catch (Throwable e) {
                        log.error("error:{}", pageNo.get(), e);
                    }
                });
                futureList.add(future);
            }
        } while (testList.size() >= pageSize);
        // 等待任务完成
        int i = 0;
        for (Future<?> future : futureList) {
            future.get();
            log.info("done:+{} ", i++);
        }
        log.info("doTask done:{}, num:{}, cost:{}ms",
                updateIntervalDesc, affectNum.get(), System.currentTimeMillis() - startTime);
        return affectNum.get();
    }

主要业务就是,从数据库中取出许多任务,放入线程池中运行。因为任务又涉及到db等的io操作,所以使用多线程处理,非常合理。

然而,有一种情况的出现,也许会打破这个平衡:那就是当单个任务能够快速执行完成时,而且快到刚上一任务提交完成,还没等下一次提交时,就任务就已被执行完成。这时,你就可能会看到一个神奇的现象,即一直只有一个线程在运行任务。这不是线程池该干的事,更像是单线程任务在跑。

然后,我们可能开始怀疑:某个线程被阻塞了?线程调度不公平了?队列选择不正确了?触发jdk bug了?线程池未完全利用的线程了?等等。。。

然而结果并非如此,纠其原因只是当我们向线程池提交任务时,实际上只是向线程池的队列中添加了任务。即上面显示的 ArrayBlockingQueue 添加了任务,而线程池中的各worker负责从队列中获取任务进行执行。而当任务数很少时,自然只有一部分worker会处理执行中了。至于为什么一直是同一个线程在执行,则可能是由于jvm的调度机制导致。事实上,是受制于 ArrayBlockingQueue.poll() 的公平性。而这个poll()的实现原理,则是由 wait/notify 机制的公平性决定的。

如下,是线程池的worker工作原理:

    // java.util.concurrent.ThreadPoolExecutor#runWorker
    /**
     * Main worker run loop.  Repeatedly gets tasks from queue and
     * executes them, while coping with a number of issues:
     *
     * 1. We may start out with an initial task, in which case we
     * don't need to get the first one. Otherwise, as long as pool is
     * running, we get tasks from getTask. If it returns null then the
     * worker exits due to changed pool state or configuration
     * parameters.  Other exits result from exception throws in
     * external code, in which case completedAbruptly holds, which
     * usually leads processWorkerExit to replace this thread.
     *
     * 2. Before running any task, the lock is acquired to prevent
     * other pool interrupts while the task is executing, and then we
     * ensure that unless pool is stopping, this thread does not have
     * its interrupt set.
     *
     * 3. Each task run is preceded by a call to beforeExecute, which
     * might throw an exception, in which case we cause thread to die
     * (breaking loop with completedAbruptly true) without processing
     * the task.
     *
     * 4. Assuming beforeExecute completes normally, we run the task,
     * gathering any of its thrown exceptions to send to afterExecute.
     * We separately handle RuntimeException, Error (both of which the
     * specs guarantee that we trap) and arbitrary Throwables.
     * Because we cannot rethrow Throwables within Runnable.run, we
     * wrap them within Errors on the way out (to the thread's
     * UncaughtExceptionHandler).  Any thrown exception also
     * conservatively causes thread to die.
     *
     * 5. After task.run completes, we call afterExecute, which may
     * also throw an exception, which will also cause thread to
     * die. According to JLS Sec 14.20, this exception is the one that
     * will be in effect even if task.run throws.
     *
     * The net effect of the exception mechanics is that afterExecute
     * and the thread's UncaughtExceptionHandler have as accurate
     * information as we can provide about any problems encountered by
     * user code.
     *
     * @param w the worker
     */
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            // worker 不停地向队列中获取任务,然后执行
            // 其中获取任务的过程,可能被中断,也可能不会,受到线程池伸缩配置的影响
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
    /**
     * Performs blocking or timed wait for a task, depending on
     * current configuration settings, or returns null if this worker
     * must exit because of any of:
     * 1. There are more than maximumPoolSize workers (due to
     *    a call to setMaximumPoolSize).
     * 2. The pool is stopped.
     * 3. The pool is shutdown and the queue is empty.
     * 4. This worker timed out waiting for a task, and timed-out
     *    workers are subject to termination (that is,
     *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
     *    both before and after the timed wait, and if the queue is
     *    non-empty, this worker is not the last thread in the pool.
     *
     * @return task, or null if the worker must exit, in which case
     *         workerCount is decremented
     */
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                // 可能调用超时方法,也可能调用阻塞方法
                // 固定线程池的情况下,调用阻塞 take() 方法
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

即线程池worker持续向队列获取任务,执行即可。而队列任务的获取,则由两个读写锁决定:

    // java.util.concurrent.ArrayBlockingQueue#take
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        // 此处锁,保证执行线程安全性
        lock.lockInterruptibly();
        try {
            while (count == 0)
                // 此处释放锁等待,再次唤醒时,要求必须重新持有锁
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
    // 
    /**
     * Inserts the specified element at the tail of this queue, waiting
     * for space to become available if the queue is full.
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
    /**
     * Inserts element at current put position, advances, and signals.
     * Call only when holding lock.
     */
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        // 通知取等线程,唤醒
        notEmpty.signal();
    }

所以,具体谁取到任务,就是要看谁抢到了锁。而这,可能又涉及到jvm的高效调度策略啥的了吧。(虽然不确定,但感觉像) 至少,任务运行的表象是,所有任务被某个线程一直抢到。

3. 回归线程池

线程池的目的,在于处理一些异步的任务,或者并发的执行多个无关联的任务。在于让系统减负。而当任务的提交消耗,大于了任务的执行消耗,那就没必要使用多线程了,或者说这是错误的用法了。我们应该线程池做更重的活,而不是轻量级的。如上问题,执行性能必然很差。但我们稍做转变,也许就不一样了。

    // 初始化线程池
    private ExecutorService executor
            = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
                Runtime.getRuntime().availableProcessors(),
                0L, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(50),
                new NamedThreadFactory("test-pool"),
                new ThreadPoolExecutor.CallerRunsPolicy());
    // 使用线程池处理任务
    public Integer doTask(String updateIntervalDesc) throws Exception {
        long startTime = System.currentTimeMillis();
        List<TestDto> testList;
        AtomicInteger affectNum = new AtomicInteger(0);
        int pageSize = 1000;
        AtomicInteger pageNo = new AtomicInteger(1);
        Map<String, Object> condGroupLabel = new HashMap<>();
        log.info("start do sth:{}", updateIntervalDesc);
        List<Future<?>> futureList = new ArrayList<>();
        do {
            PageHelper.startPage(pageNo.getAndIncrement(), pageSize);
            List<TestDto> list
                    = testDao.getLabelListNew(condGroupLabel);
            testList = list;
            // 一批任务只向线程池中提交任务
            Future<?> future = executor.submit(() -> {
                for (TestDto s : list) {
                    try {
                        // do sth...
                        affectNum.incrementAndGet();
                    }
                    catch (Throwable e) {
                        log.error("error:{}", pageNo.get(), e);
                    }
                }
            });
            futureList.add(future);
        } while (testList.size() >= pageSize);
        // 等待任务完成
        int i = 0;
        for (Future<?> future : futureList) {
            future.get();
            log.info("done:+{} ", i++);
        }
        log.info("doTask done:{}, num:{}, cost:{}ms",
                updateIntervalDesc, affectNum.get(), System.currentTimeMillis() - startTime);
        return affectNum.get();
    }

即,让每个线程执行的任务足够重,以至于完全忽略提交的消耗。这样才能够发挥多线程的作用。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK