【Go】使用压缩文件优化io (二)
2019-07-07
上一篇文章《使用压缩文件优化io (一)》中记录了日志备份 io 优化方案,使用文件流数据压缩方案优化 io 性能,效果十分显著。这篇文章记录数据分析前置清洗、格式化数据的 io 优化方案,我们有一台专用的日志前置处理服务器,所有业务日志通过这台机器从 OSS 拉取回来清洗、格式化,最后进入到数据仓储中便于后续的分析。
随着业务扩展这台服务器压力越来越大,高峰时数据延迟越来越厉害,早期也是使用 Python 脚本 + awk 以及一些 shell 命令完成相关工作,在数据集不是很大的时候这种方案很好,效率也很高,随着数据集变大,发现服务器负载很高,经过分析是还是 io 阻塞,依旧采用对数据流进行处理的方案优化io,以下记录优化的过程。
服务器配置:4 核 8G; 磁盘:1T
分析前置服务会根据业务不同分为十分钟、一小时两个阶段拉取分析日志,每隔一个阶段会去 OSS 拉取日志回到服务器进行处理,处理过程因 io 阻塞,导致 CPU 和 load 异常高,且处理效率严重下降,这次优化主要就是降低 io 阻塞,提升 CPU 利用率 (处理业务逻辑而不是等待 io) 和处理效率。
后文中会详细描述优化前后的方案,并用 go 编写测试,使用一台 2 核4G的服务器进行测试,测试数据集大小为:
- 文件数量:432个
- 压缩文件:17G
- 解压后文件:63G
- 压缩方案:lzo
- Goroutine 数量:20
优化前日志处理流程:
- 获取待处理文件列表
- 拉取 OSS 日志到本地磁盘 (压缩文件)
- 解压缩日志文件
- 读取日志数据
- 业务处理……
- 导入到数据仓储中
导致 io 阻塞的部分主要是: 拉取 OSS 日志、解压缩日志文件及读取日志数据,优化也主要从这三块着手。
这里有一段公共的日志读取方法,该方法接收一个 io.Reader
, 并按行读取日志,并简单切分日志字段,并没有实质的处理日志数据,后面的优化方案也将使用这个方法读取日志。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| package main
import ( "bufio" "bytes" "io"
"github.com/thinkeridea/go-extend/exbytes" )
func Read(r io.Reader) { rawBuffer := make([]byte, 512) buf := bufio.NewReader(r) for { line, ok, err := readLine(buf, rawBuffer) if err == io.EOF { return }
if err != nil { panic(nil) }
if ok { rawBuffer = line }
c := bytes.Count(line, []byte{'\x01'}) if c != 65 { panic("无效的行") } } }
func readLine(r *bufio.Reader, rawBuffer []byte) ([]byte, bool, error) { var ok bool line, err := r.ReadSlice('\n') if (err == bufio.ErrBufferFull || len(line) < 3 || exbytes.ToString(line[len(line)-3:]) != "\r\r\n") && err != io.EOF { rawBuffer = append(rawBuffer[:0], line...) for (err == bufio.ErrBufferFull || len(line) < 3 || exbytes.ToString(line[len(line)-3:]) != "\r\r\n") && err != io.EOF { line, err = r.ReadSlice('\n') rawBuffer = append(rawBuffer, line...) } line = rawBuffer ok = true }
if len(line) > 0 && err == io.EOF { err = nil }
return line, ok, err }
|
日志按 \r\r\n
分隔行,使用 \x01
切分字段,读取方法使用 bufio.ReadSlice
方法,避免内存分配,且当 bufio
缓冲区满之后使用 rwaBuffer
作为本地可扩展缓冲,每次扩展之后会保留最大的扩展空间,因为业务日志每行大小差不多,这样可以极大的减少内存分配,效率是 bufio.ReadLine
方法的好几倍。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
| package main
import ( "fmt" "os" "os/exec" "path/filepath" "strings" "sync" "time"
".../pkg/aliyun_oss" // 虚假的包 )
func main() { var oss *aliyun_oss.AliyunOSS // 对接阿里云 OSS,需要自行封装 files := list() // 这是一个虚构的方法,用来获取待处理的文件列表
fmt.Printf("待处理文件数量:%d\n", len(files))
start := time.Now() defer func(t time.Time) { fmt.Printf("共耗时:%0.6f\n", time.Now().Sub(t).Seconds()) }(start)
// 下载日志文件 n := 20 var wg sync.WaitGroup c := make(chan string) wg.Add(n) for i := 0; i < n; i++ { go func() { defer wg.Done() for { f, ok := <-c if !ok { return }
if _, err := os.Stat(f); err == nil { return } else if !os.IsNotExist(err) { panic(err) }
dir := filepath.Dir(f) err := os.MkdirAll(dir, 0755) if err != nil { panic(err) }
err = oss.GetObjectToFile(f, f) if err != nil { panic(err) } } }() }
for _, f := range files { c <- f }
close(c) wg.Wait()
fmt.Printf("下载文件耗时:%0.6f\n", time.Now().Sub(start).Seconds())
// 解压日志文件 start = time.Now() shell := exec.Command("/bin/bash", "-c", "lzop -df logs/*/*/*/*/*/*.lzo") err := shell.Run() if err != nil { panic(err) }
fmt.Printf("解压文件耗时:%0.6f\n", time.Now().Sub(start).Seconds())
// 读取日志文件 start = time.Now() c = make(chan string) wg.Add(n) for i := 0; i < n; i++ { go func() { defer wg.Done() for { file, ok := <-c if !ok { return } f, err := os.Open(file) if err != nil { panic(err) }
Read(f) f.Close() } }() }
for _, f := range files { c <- strings.TrimRight(f, ".lzo") }
close(c) wg.Wait()
fmt.Printf("读取文件耗时:%0.6f\n", time.Now().Sub(start).Seconds()) }
|
运行程序输出如下:
1 2 3 4 5
| 待处理文件数量:432 下载文件耗时:303.562865 解压文件耗时:611.236232 读取文件耗时:460.371245 共耗时:1375.187261
|
通过 iostat -m -x 5 10000
分析各个阶段结果如下:
1 2 3 4 5 6
| avg-cpu: %user %nice %system %iowait %steal %idle 7.85 0.00 16.44 11.24 0.00 64.48
Device: rrqm/s wrqm/s r/s w/s rMB/s wMB/s avgrq-sz avgqu-sz await r_await w_await svctm %util vda 0.00 0.00 80.40 7.80 8.98 0.04 209.36 0.40 4.57 4.64 3.77 0.50 4.44 vdb 1.40 761.20 247.60 264.00 14.70 60.92 302.72 9.17 17.92 10.36 25.00 0.52 26.52
|
1 2 3 4 5 6
| avg-cpu: %user %nice %system %iowait %steal %idle 8.54 0.00 8.33 68.39 0.00 14.74
Device: rrqm/s wrqm/s r/s w/s rMB/s wMB/s avgrq-sz avgqu-sz await r_await w_await svctm %util vda 0.00 1.20 3.40 11.80 0.01 0.05 8.95 0.30 20.03 0.41 25.68 0.55 0.84 vdb 0.00 22037.80 107.80 243.20 26.45 107.01 778.71 83.52 236.68 74.31 308.65 2.52 88.54
|
1 2 3 4 5 6
| avg-cpu: %user %nice %system %iowait %steal %idle 2.74 0.00 5.07 92.19 0.00 0.00
Device: rrqm/s wrqm/s r/s w/s rMB/s wMB/s avgrq-sz avgqu-sz await r_await w_await svctm %util vda 0.00 2.40 3.80 23.60 0.01 0.14 11.85 0.12 4.48 1.95 4.89 0.33 0.90 vdb 1.80 4.60 347.20 6.20 139.97 0.08 811.60 126.62 358.04 360.79 203.48 2.83 100.00
|
通过 iostat
结果可以看出,在解压和读取日志时 io
阻塞比较严重,且运行时间较长,下载时 io
阻塞也存在,但还可以接受,通过下面两个方案逐渐消除掉 io
。
优化方案一
优化前的方案反应出在解压和读取日志时 io
阻塞比较严重,那么是否可以通过读取 lzo
压缩文件,以此来消除解压缩日志耗时太大、io
太高的问题呢?并且读取 lzo
压缩文件远比解压后文件小,来降低读取日志耗时太大、io
太高的问题呢?
优化后日志处理流程:
- 获取待处理文件列表
- 拉取 OSS 日志到本地磁盘 (压缩文件)
- 读取压缩日志数据
- 业务处理……
- 导入到数据仓储中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
| package main
import ( "fmt" "os" "path/filepath" "sync" "time"
".../pkg/aliyun_oss" // 虚假的包 "github.com/cyberdelia/lzo" )
func main() { var oss *aliyun_oss.AliyunOSS // 对接阿里云 OSS,需要自行封装 files := list() // 这是一个虚构的方法,用来获取待处理的文件列表
fmt.Printf("待处理文件数量:%d\n", len(files)) start := time.Now() defer func(t time.Time) { fmt.Printf("共耗时:%0.6f\n", time.Now().Sub(t).Seconds()) }(start)
// 下载日志文件 n := 20 var wg sync.WaitGroup c := make(chan string) wg.Add(n) for i := 0; i < n; i++ { go func() { defer wg.Done() for { f, ok := <-c if !ok { return }
if _, err := os.Stat(f); err == nil { return } else if !os.IsNotExist(err) { panic(err) }
dir := filepath.Dir(f) err := os.MkdirAll(dir, 0755) if err != nil { panic(err) }
err = oss.GetObjectToFile(f, f) if err != nil { panic(err) } } }() }
for _, f := range files { c <- f }
close(c) wg.Wait()
fmt.Printf("下载文件耗时:%0.6f\n", time.Now().Sub(start).Seconds())
start = time.Now() c = make(chan string) wg.Add(n) for i := 0; i < n; i++ { go func() { defer wg.Done() for { file, ok := <-c if !ok { return } f, err := os.Open(file) if err != nil { panic(err) }
r, err := lzo.NewReader(f) if err != nil { panic(err) }
Read(r) r.Close() f.Close() } }() }
for _, f := range files { c <- f }
close(c) wg.Wait()
fmt.Printf("读取文件耗时:%0.6f\n", time.Now().Sub(start).Seconds()) }
|
这个方案消除了解压缩日志,并且直接读取压缩日志,使用 github.com/cyberdelia/lzo
包对压缩文件数据流进行边读取边解压,这次不用单独封装新的方法了,直接使用 lzo
包中的接口即可。
程序运行结果如下:
1 2 3 4
| 待处理文件数量:432 下载文件耗时:286.146603 读取文件耗时:132.787345 共耗时:418.942862
|
这个方案效果非常明显,总耗时从 1375.187261
降低到 418.942862
提升了 3 倍的效率,不仅消除了压缩的时间,还大大缩短了读取文件耗时,成果显著。
通过 iostat -m -x 5 10000
分析各个阶段结果如下:
1 2 3 4 5 6
| avg-cpu: %user %nice %system %iowait %steal %idle 5.08 0.00 13.24 29.34 0.00 52.33
Device: rrqm/s wrqm/s r/s w/s rMB/s wMB/s avgrq-sz avgqu-sz await r_await w_await svctm %util vda 0.00 2.80 1.40 11.80 0.01 0.07 12.00 0.02 1.85 1.14 1.93 0.18 0.24 vdb 0.00 17207.60 0.60 212.40 0.00 75.06 721.74 55.81 236.34 84.33 236.77 2.49 53.14
|
1 2 3 4 5 6
| avg-cpu: %user %nice %system %iowait %steal %idle 80.66 0.00 4.83 14.50 0.00 0.00
Device: rrqm/s wrqm/s r/s w/s rMB/s wMB/s avgrq-sz avgqu-sz await r_await w_await svctm %util vda 0.00 0.00 6.20 0.20 0.06 0.00 20.00 0.01 1.69 1.71 1.00 0.62 0.40 vdb 0.00 6.80 390.40 19.20 118.78 0.23 595.04 74.87 190.55 197.95 40.08 1.85 75.90
|
通过 iostat
结果分析,下载时 io
阻塞和优化前波动不是很大,读取时的 io
优化已经非常好了,iowait
从 92.19%
降低到 14.5%
,CPU 更多的任务用来处理解压缩日志,而不是处理 io
阻塞。
优化方案二
本来优化到上面的效果已经非常满意了,不过既然开始做优化就不能草草结束了,仔细思考业务场景,需要 本地 lzo
文件?重新处理日志的频率高吗?本地 lzo
日志清理方便吗?
通过上面的几个问题发现,除非程序出现问题或者数据存储出现故障,否者极少需要重新处理日志,一年里面这种情况也是极少的,甚至不会发生。
那么思考一下,不下载日志,直接读取网络数据流,实现边下边解压边读取,这样岂不是没有 io
了吗?
优化后日志处理流程:
- 获取待处理文件列表
- 拉取 OSS 日志,在内存中解压并读取分析日志
- 业务处理……
- 导入到数据仓储中
具体实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| package main
import ( "fmt" "sync" "time"
".../pkg/aliyun_oss" // 虚假的包 "github.com/cyberdelia/lzo" )
func main() { var oss *aliyun_oss.AliyunOSS // 对接阿里云 OSS,需要自行封装 files := list() // 这是一个虚构的方法,用来获取待处理的文件列表
fmt.Printf("待处理文件数量:%d\n", len(files))
start := time.Now() defer func(t time.Time) { fmt.Printf("共耗时:%0.6f\n", time.Now().Sub(t).Seconds()) }(start)
n := 20 var wg sync.WaitGroup c := make(chan string) wg.Add(n) for i := 0; i < n; i++ { go func() { defer wg.Done() for { f, ok := <-c if !ok { return }
r1, err := oss.GetObject(f) if err != nil { panic(err) }
r, err := lzo.NewReader(r1) if err != nil { panic(err) }
Read(r) r.Close() r1.Close() } }() }
for _, f := range files { c <- f }
close(c) wg.Wait()
fmt.Printf("读取文件耗时:%0.6f\n", time.Now().Sub(start).Seconds()) }
|
优化后只有一个流程了,代码简洁了不少,看看效率如何吧!
程序运行结果如下:
1 2 3
| 待处理文件数量:432 读取文件耗时:285.993661 共耗时:285.993717
|
天啊发生了什么,我使劲擦了擦眼睛,太不可思议了,居然只消耗了下载日志的耗时,较上一个方案总耗时从 418.942862
降低到 285.993717
,提升了近 2 倍的效率,让我们看看上个方案下载文件耗时 286.146603
,而新方案总耗时是 285.993717
居然只用了上个优化版本的下载时间,究竟发生了什么?
通过 iostat -m -x 5 10000
分析结果如下:
1 2 3 4 5 6
| avg-cpu: %user %nice %system %iowait %steal %idle 43.73 0.00 9.64 0.31 0.00 46.32
Device: rrqm/s wrqm/s r/s w/s rMB/s wMB/s avgrq-sz avgqu-sz await r_await w_await svctm %util vda 0.00 1.20 4.40 3.80 0.02 0.03 10.93 0.01 1.49 0.59 2.53 0.71 0.58 vdb 0.00 6.80 0.00 24.60 0.00 0.27 22.83 0.02 0.84 0.00 0.84 0.28 0.68
|
通过 iostat
结果分析,在程序运行期间没有任何 io
开销,CPU 居然还有一半的空闲,前面两个版本 CPU 是没有空闲的啊,由此看来之前 CPU 更多的消耗在 io
阻塞上了,并没有用来处理业务逻辑。
由此来看也就不足为奇了,为啥优化后只需要下载日志的时间就能处理完所有日志了,没有了 io
阻塞,CPU 更多了用来处理业务,把之前下载时写文件 io
的耗时,用来解压缩数据,读取数据,且还有更多的空闲,跑出这样的结果也就很正常了。
从优化前耗时 1375.187261
秒到 285.993717
秒,性能提升 80%, 从 iowait
92.19%
到 0.31%
提升近 100%
,从没有任何 CPU 空闲到有一半空闲,这个过程中有很多值得总结的事情。
io
对性能的影响非常大,对 CPU 占用非常严重,导致 CPU 处理业务逻辑的时间片非常少。从 io
转移到 CPU 对性能提升非常明显。CPU 计算效率十分的高,从 io
密集到密集计算,只要符合业务场景,往往能给我们带来意想不到的效果。
往往优化业务并不需要十分高大上的技术,只是转变一下思路,不仅代码更少,且程序更简短、好维护、逻辑更清晰。
一定要结合实际业务场景进行思考,减少理所当然和业务无关的中间环节,往往就可以极大的提升程序效率。