2

java~并行计算~大集合的并行处理

 2 years ago
source link: https://www.cnblogs.com/lori/p/15251165.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

java~并行计算~大集合的并行处理

上一次写了关于《FunctionalInterface~一个批量处理数据的类》和《Future和Callable实现大任务的并行处理》的文章,本讲主要结合实际应用,来封装一个集合并行处理组件,我们的集合分为数据库查询出现的分页集合;还有一个是内存的集合,今天主要说一下内存集合的并行处理。

  • 有一个比较耗时的工作,将top 400的用户的行为信息统计
  • 统计的信息来自很多业务,很多服务,不能使用聚合直接计算
  • 这些业务统计的时间,大概每个人平均需要1秒
  • 这些用户的各种类型,彼此独立,没有关系

如果直接顺序写代码,那1万的用户,需要400秒的时间,这是我们不能接受的,我们使用并行编程8秒就把它搞定。

  • 400的集合,进行拆分,每100个为一组,分为4组(4页)
  • 对每100个集合进行拆分,每2个为1组,将100个分成了50组
  • 对50组数据,开50个线程并行处理,结果为2行完成
  • 400的信息,分成了4页,每页2秒,一共8秒
/**
 * 数据集并行处理工具
 */
public class DataHelper {
    /**
     * 并行处理线程数字
     */
    static final int THREAD_COUNT = 50;
    /**
     * 单线程中处理的集合的长度,50个线程,每个线程处理2条,如果处理时间为1S,则需要2S的时间.
     */
    static final int INNER_LIST_LENGTH = 2;
    static Logger logger = LoggerFactory.getLogger(DataHelper.class);

    /**
     * 大集合拆分.
     *
     * @param list
     * @param len
     * @param <T>
     * @return
     */
    private static <T> List<List<T>> splitList(List<T> list, int len) {
        if (list == null || list.size() == 0 || len < 1) {
            return null;
        }
        List<List<T>> result = new ArrayList<List<T>>();
        int size = list.size();
        int count = (size + len - 1) / len;
        for (int i = 0; i < count; i++) {
            List<T> subList = list.subList(i * len, ((i + 1) * len > size ? size : len * (i + 1)));
            result.add(subList);
        }
        return result;
    }

    /**
     * 并行处理.
     *
     * @param list     大集合
     * @param pageSize 单页数据大小
     * @param consumer 处理程序
     * @param <T>
     */
    public static <T> void fillDataByPage(List<T> list,
                                          int pageSize,
                                          Consumer<T> consumer) {

        List<List<T>> innerList = new ArrayList<>();
        splitList(list, pageSize).forEach(o -> innerList.add(o));
        int totalPage = innerList.size();
        AtomicInteger i = new AtomicInteger();
        innerList.forEach(items -> {
            ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
            i.getAndIncrement();
            Collection<BufferInsert<T>> bufferInserts = new ArrayList<>();
            splitList(items, INNER_LIST_LENGTH).forEach(o -> {
                bufferInserts.add(new BufferInsert(o, consumer));
            });

            try {
                executor.invokeAll(bufferInserts);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            executor.shutdown();
            logger.info("【当前数据页:{}/{}】", i.get(), totalPage);
        });

    }

    /**
     * 多线程并发处理数据.
     *
     * @param <T>
     */
    static class BufferInsert<T> implements Callable<Integer> {
        /**
         * 要处理的数据列表.
         */
        List<T> items;
        /**
         * 处理程序.
         */
        Consumer<T> consumer;

        public BufferInsert(List<T> items, Consumer<T> consumer) {
            this.items = items;
            this.consumer = consumer;
        }

        @Override
        public Integer call() {
            for (T item : items) {
                this.consumer.accept(item);
            }
            return 1;
        }
    }

}
    /**
     * 8秒处理400个任务,每个任务执行时间为1S,并行的威力
     */
    @Test
    public void test() {
        List<Integer> sumList = new ArrayList<>();
        for (int i = 0; i < 400; i++) {
            sumList.add(i);
        }
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        DataHelper.fillDataByPage(sumList, 100, (o) -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        stopWatch.stop();
        System.out.println("time:" + stopWatch.getTotalTimeMillis());
    }

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK