15

用 ForkJoin 写一个并发执行任务的工具类 BatchTaskRunner

 4 years ago
source link: https://my.oschina.net/javayou/blog/4870860
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client
用 ForkJoin 写一个并发执行任务的工具类 BatchTaskRunner

实际编程中经常需要并发执行多个任务,并等待这些任务运行结束返回结果。

所以用 Java 的 ForkJoin 简单撸了一个工具类:

package com.gitee.search.utils;

import java.util.List;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveAction;
import java.util.function.Consumer;

/**
 * Batch task action
 * @author Winter Lau<[email protected]>
 */
public final class BatchTaskRunner extends RecursiveAction {

    protected int threshold = 5; //每个线程处理的任务数
    protected List taskList;
    Consumer<List> action;

    /**
     * @param taskList      任务列表
     * @param threshold     每个线程处理的任务数
     */
    private BatchTaskRunner(List taskList, int threshold, Consumer action) {
        this.taskList = taskList;
        this.threshold = threshold;
        this.action = action;
    }

    /**
     * 多线程批量执行任务
     * @param taskList
     * @param threshold
     * @param action
     */
    public static <T> void execute(List<T> taskList, int threshold, Consumer<List<T>> action) {
        new BatchTaskRunner(taskList, threshold, action).invoke();
    }

    @Override
    protected void compute() {
        if (taskList.size() <= threshold) {
            this.action.accept(taskList);
        }
        else {
            this.splitFromMiddle(taskList);
        }
    }

    /**
     * 任务中分
     * @param list
     */
    private void splitFromMiddle(List list) {
        int middle = (int)Math.ceil(list.size() / 2.0);
        List leftList = list.subList(0, middle);
        List RightList = list.subList(middle, list.size());
        BatchTaskRunner left = newInstance(leftList);
        BatchTaskRunner right = newInstance(RightList);
        ForkJoinTask.invokeAll(left, right);
    }

    private BatchTaskRunner newInstance(List taskList) {
        return new BatchTaskRunner(taskList, threshold, action);
    }

}

使用方法:

List<Integer> allTasks = Arrays.asList(1,2,3,4,5);
int taskPerThread = 1;
BatchTaskRunner.execute(allTasks, taskPerThread, tasks -> {
    System.out.printf("[%s]: %s\n", Thread.currentThread().getName(), tasks);
}); 

这里假设有5个任务(allTasks),其中 taskPerThread 是指定每个线程处理的任务数。

而 { ... } 内就是任务的处理逻辑。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK