

Java 几分钟处理完 30 亿个数据?
source link: https://blog.51cto.com/u_15430445/5848550
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

1. 场景说明
现有一个 10G 文件的数据,里面包含了 18-70 之间的整数,分别表示 18-70 岁的人群数量统计。假设年龄范围分布均匀,分别表示系统中所有用户的年龄数,找出重复次数最多的那个数,现有一台内存为 4G、2 核 CPU 的电脑,请写一个算法实现。
2. 模拟数据
Java 中一个整数占 4 个字节,模拟 10G 为 30 亿左右个数据, 采用追加模式写入 10G 数据到硬盘里。
每 100 万个记录写一行,大概 4M 一行,10G 大概 2500 行数据。
import java.io.*;
import java.util.Random;
/**
* @Desc:
* @Author: bingbing
* @Date: 2022/5/4 0004 19:05
*/
public class GenerateData {
private static Random random = new Random();
public static int generateRandomData(int start, int end) {
return random.nextInt(end - start + 1) + start;
}
/**
* 产生10G的 1-1000的数据在D盘
*/
public void generateData() throws IOException {
File file = new File("D:\\ User.dat");
if (!file.exists()) {
try {
file.createNewFile();
} catch (IOException e) {
e.printStackTrace();
}
}
int start = 18;
int end = 70;
long startTime = System.currentTimeMillis();
BufferedWriter bos = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file, true)));
for (long i = 1; i < Integer.MAX_VALUE * 1.7; i++) {
String data = generateRandomData(start, end) + ",";
bos.write(data);
// 每100万条记录成一行,100万条数据大概4M
if (i % 1000000 == 0) {
bos.write("\n");
}
}
System.out.println("写入完成! 共花费时间:" + (System.currentTimeMillis() - startTime) / 1000 + " s");
bos.close();
}
public static void main(String[] args) {
GenerateData generateData = new GenerateData();
try {
generateData.generateData();
} catch (IOException e) {
e.printStackTrace();
}
}
}
上述代码调整参数执行 2 次,凑 10G 数据在 D 盘 User.dat 文件里:

准备好 10G 数据后,接着写如何处理这些数据。
3. 场景分析
10G 的数据比当前拥有的运行内存大的多,不能全量加载到内存中读取。如果采用全量加载,那么内存会直接爆掉,只能按行读取。Java 中的 bufferedReader 的 readLine() 按行读取文件里的内容。
4. 读取数据
首先,我们写一个方法单线程读完这 30 亿数据需要多少时间,每读 100 行打印一次:
BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(FILE_NAME), "utf-8"));
String line;
long start = System.currentTimeMillis();
int count = 1;
while ((line = br.readLine()) != null) {
// 按行读取
if (count % 100 == 0) {
System.out.println("读取100行,总耗时间: " + (System.currentTimeMillis() - start) / 1000 + " s");
System.gc();
}
count++;
}
running = false;
br.close();
}
按行读完 10G 的数据大概 20 秒,基本每 100 行,1 亿多数据花 1 秒,速度还挺快。

5. 处理数据
5.1 思路一
通过单线程处理,初始化一个 countMap,key 为年龄,value 为出现的次数。将每行读取到的数据按照 "," 进行分割,然后获取到的每一项进行保存到 countMap 里。如果存在,那么值 key 的 value+1。
try {
File subFile = new File(dir + "\\" + i + ".dat");
if (!file.exists()) {
subFile.createNewFile();
}
countMap.computeIfAbsent(i + "", integer -> new AtomicInteger(0));
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
单线程读取并统计 countMap:
String[] arr = lineData.split(",");
for (String str : arr) {
if (StringUtils.isEmpty(str)) {
continue;
}
countMap.computeIfAbsent(str, s -> new AtomicInteger(0)).getAndIncrement();
}
}
通过比较找出年龄数最多的年龄并打印出来:
Integer targetValue = 0;
String targetKey = null;
Iterator<Map.Entry<String, AtomicInteger>> entrySetIterator = countMap.entrySet().iterator();
while (entrySetIterator.hasNext()) {
Map.Entry<String, AtomicInteger> entry = entrySetIterator.next();
Integer value = entry.getValue().get();
String key = entry.getKey();
if (value > targetValue) {
targetValue = value;
targetKey = key;
}
}
System.out.println("数量最多的年龄为:" + targetKey + "数量为:" + targetValue);
}
测试结果
总共花了 3 分钟读取完并统计完所有数据。

内存消耗为 2G-2.5G,CPU 利用率太低,只向上浮动了 20%-25% 之间。

要想提高 CPU 利用率,那么可以使用多线程去处理。
下面我们使用多线程去解决这个 CPU 利用率低的问题。
5.2 思路二:分治法
使用多线程去消费读取到的数据。采用生产者、消费者模式去消费数据。
因为在读取的时候是比较快的,单线程的数据处理能力比较差。因此思路一的性能阻塞在取数据的一方且又是同步操作,导致整个链路的性能会变的很差。
所谓分治法就是分而治之,也就是说将海量数据分割处理。根据 CPU 的能力初始化 n 个线程,每一个线程去消费一个队列,这样线程在消费的时候不会出现抢占队列的问题。同时为了保证线程安全和生产者消费者模式的完整,采用阻塞队列。Java 中提供了 LinkedBlockingQueue 就是一个阻塞队列。

初始化阻塞队列
使用 LinkedList 创建一个阻塞队列列表:
在 static 块里初始化阻塞队列的数量和单个阻塞队列的容量为 256。
上面讲到了 30 亿数据大概 2500 行,按行塞到队列里。20 个队列,那么每个队列 125 个,因此可以容量可以设计为 256 即可。
for (int i = 0; i < threadNums; i++) {
blockQueueLists.add(new LinkedBlockingQueue<>(256));
}
生产者
为了实现负载的功能,首先定义一个 count 计数器,用来记录行数:
按照行数来计算队列的下标 long index=count.get()%threadNums。
下面算法就实现了对队列列表中的队列进行轮询的投放:
public static void splitLine(String lineData) {
String[] arr = lineData.split("\n");
for (String str : arr) {
if (StringUtils.isEmpty(str)) {
continue;
}
long index = count.get() % threadNums;
try {
// 如果满了就阻塞
blockQueueLists.get((int) index).put(str);
} catch (InterruptedException e) {
e.printStackTrace();
}
count.getAndIncrement();
}
}
消费者
1) 队列线程私有化
消费方在启动线程的时候根据 index 去获取到指定的队列,这样就实现了队列的线程私有化。
//如果共用一个队列,那么线程不宜过多,容易出现抢占现象
System.out.println("开始消费...");
for (int i = 0; i < threadNums; i++) {
final int index = i;
// 每一个线程负责一个 queue,这样不会出现线程抢占队列的情况。
new Thread(() -> {
while (consumerRunning) {
startConsumer = true;
try {
String str = blockQueueLists.get(index).take();
countNum(str);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
2) 多子线程分割字符串
由于从队列中多到的字符串非常的庞大,如果又是用单线程调用 split(",") 去分割,那么性能同样会阻塞在这个地方。
private static void countNum(String str) {
int[] arr = new int[2];
arr[1] = str.length() / 3;
for (int i = 0; i < 3; i++) {
final String innerStr = SplitData.splitStr(str, arr);
new Thread(() -> {
String[] strArray = innerStr.split(",");
for (String s : strArray) {
countMap.computeIfAbsent(s, s1 -> new AtomicInteger(0)).getAndIncrement();
}
}).start();
}
}
3) 分割字符串算法
分割时从 0 开始,按照等分的原则,将字符串 n 等份,每一个线程分到一份。
用一个 arr 数组的 arr[0] 记录每次的分割开始位置。arr[1] 记录每次分割的结束位置,如果遇到的开始的字符不为 "," 那么就 startIndex-1。如果结束的位置不为 "," 那么将 endIndex 向后移一位。
如果 endIndex 超过了字符串的最大长度,那么就把最后一个字符赋值给 arr[1]。
* 按照 x坐标 来分割 字符串,如果切到的字符不为“,”, 那么把坐标向前或者向后移动一位。
*
* @param line
* @param arr 存放x1,x2坐标
* @return
*/
public static String splitStr(String line, int[] arr) {
int startIndex = arr[0];
int endIndex = arr[1];
char start = line.charAt(startIndex);
char end = line.charAt(endIndex);
if ((startIndex == 0 || start == ',') && end == ',') {
arr[0] = endIndex + 1;
arr[1] = arr[0] + line.length() / 3;
if (arr[1] >= line.length()) {
arr[1] = line.length() - 1;
}
return line.substring(startIndex, endIndex);
}
if (startIndex != 0 && start != ',') {
startIndex = startIndex - 1;
}
if (end != ',') {
endIndex = endIndex + 1;
}
arr[0] = startIndex;
arr[1] = endIndex;
if (arr[1] >= line.length()) {
arr[1] = line.length() - 1;
}
return splitStr(line, arr);
}
测试结果
内存和 CPU 初始占用大小:

启动后,运行时内存稳定在 11.7G,CPU 稳定利用在 90% 以上。

总耗时由 180 秒缩减到 103 秒,效率提升 75%,得到的结果也与单线程处理的一致。

6. 遇到的问题
如果在运行了的时候,发现 GC 突然罢工不工作了,有可能是 JVM 的堆中存在的垃圾太多,没回收导致内存的突增。

解决方法
在读取一定数量后,可以让主线程暂停几秒,手动调用 GC。
Recommend
-
45
这个男孩叫JP Gibson,特别喜爱篮球,尤其喜欢犹他爵士队。可不幸的是,他被诊断患有白血病。爵士队知道这个消息后,当即联系上这位小粉丝,并跟他签了为期1天的合同,同时让他参加比赛。从他萌萌哒的上场到在帮助下完成的一记扣篮,短短几分钟,却深...
-
61
几分钟快速读懂渐进式 Web 应用 PWA
-
66
AdWeek 制作了一期视频展示了 HomePod 广告片《Welcome Home》幕后的故事。苹果拍广告真的是肯花钱肯花精力啊。整个场景都是搭出来实拍的,你们感受一下短短几分钟的广告背后是多大的工作量
-
60
相关专题:爱奇艺登陆纳斯达克:开盘涨1.1%新浪科技讯3月29日晚间消息,爱奇艺正式登陆纳斯达克,股票代码“IQ”,开盘价18.2美元,较18美元的发行价上涨1.11%,按开盘价计算,爱奇艺市值达到128.2亿美元。但开盘仅仅过去几分钟,爱
-
6
或许是为了讨个好意头,端午前夕,刚满百岁的五芳斋,提交了招股书,谋求 A 股上市。目前尚无以粽子为主营业务的上市公司,所以五芳斋有可能成为「粽子第一股」。 你不一定吃过五芳斋粽子,但你也许看过五芳斋的魔性广告片为此捧腹大笑,知道五芳斋和王...
-
3
摘要第一个 .com 域名诞生的五年后,互联网才成为一个世界范围内的事件。也就是你早就听说过的,在1993 年,麻省理工学院教授蒂姆·伯纳斯·李,利用超文本连接,为互联网做了一个方便提取的目录,万维网诞生了。
-
9
【链向FM】Polygon披露了使 90 亿个MATIC面临风险的修补漏洞,BTC的非流动性供应继续触及多年高位_区块链资讯_链向财经【链向FM】Polygon披露了使 90 亿个MATIC面临风险的修补漏洞,BTC的非流动性供应继续触及多年高位【数据情报】
-
5
V2EX › 程序员 如何快速向文件中写入 1 亿个 ip? lsk569937453 · 2 小时 39 分钟...
-
2
2022 年超过 7 亿个账户信息被泄露,2200 万台设备被感染|同一手机号_网易订阅 IT之家 3 月 15 日消息,根据 SpyCloud 公布的《2023 年度身份暴露报告》,安...
-
5
V2EX › 宽带症候群 中国有 3 亿个 IP4 地址所有权,我就是没想明白,怎么会不够用?
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK