0

试问,如何设计一个高并发的golang日志库

 10 months ago
source link: https://studygolang.com/articles/35285
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

试问,如何设计一个高并发的golang日志库

TangYiMo · 大约1分钟之前 · 60 次点击 · 预计阅读时间 10 分钟 · 大约8小时之前 开始浏览    

golang中有很多优秀的日志库,例如zerolog,see-log,blog4go等。 这些日志首先具备强大的高并发能力,其次具备丰富的接口,输入格式,索引接口。 把外壳去掉,日志库的设计原理本质上是一样的。

我个人认为一个日志系统需要满足的特点:

  • 1 内容不确定(非格式化数据)。为能够方便索引,因此需要额外引入一些namespace和类型,时间戳等的字符串。
  • 2 要求并发量大,写入快,也就是日志需要有实时性,不能对现有业务造成阻塞。 因此日志需要设置成带缓冲的功能、多协程。 在日志的生成、粗加工、传递、归类、输出的过程中,减少传递、减少cpu锁、减少拷贝次数/内容 方面下功夫,持久化优化,能提升日志库的性能。
  • 3 日志需要一定的格式,因此在输出时需要顺序输出到终端、或日志文件。 落盘需要是一个任务去执行。
  • 4 日志文件不能太大,需要定期清理掉无效的日志。 因此日志按天、周为单位,每个单位写入对应以时间戳命名的文件中。
  • 5 参照linux系统自带的系统日志,对于历史日志(例如一周之前的日志),需要压缩成压缩文档。删掉非压缩的文档。
  • 6 输出目标可以灵活指定,可以是终端,也可以是日志目录,也可以是网络流。 输出格式可以是raw原始内容、也可以是json格式。
  • 7 考虑小白用户。例如一些运维人员,vim用的也许不是那么的熟悉,还有一些c端的小白用户估计linux系统都不晓得是啥,因此日志系统如果能带一个简单的web界面,日志索引提供输入框返回结果,这虽然不一定用的上,但是会显得比较完善。

补充:本文比较仓促,是以学习原理为目的(我写的文章侧重原理分析 和 实现。 当然,生产环境是另外一套另当别论,我优先考虑用别人写好的组件)。 因此上述7个特点并未在本文全部实现。

直接上源码:

package main

import (
    "context"
    "fmt"
    "io"
    "os"
    "sync"
    "time"
)

var (
    colorOff    = []byte("\033[0m")
    colorRed    = []byte("\033[0;31m")
    colorGreen  = []byte("\033[0;32m")
    colorOrange = []byte("\033[0;33m")
    colorBlue   = []byte("\033[0;34m")
    colorPurple = []byte("\033[0;35m")
    colorCyan   = []byte("\033[0;36m")
    colorGray   = []byte("\033[0;37m")
)

var (
    plainFatal = []byte("[FATAL] ")
    plainError = []byte("[ERROR] ")
    plainWarn  = []byte("[WARN]  ")
    plainInfo  = []byte("[INFO]  ")
    plainDebug = []byte("[DEBUG] ")
    plainTrace = []byte("[TRACE] ")
)

var (
    MAX_INJECTION = 20000
)

// 着色器
func mixer(data []byte, color []byte) []byte {
    var result []byte
    return append(append(append(result, color...), data...), colorOff...)
}

// Red 将 data 染成红色
func Red(data []byte) []byte {
    return mixer(data, colorRed)
}

func Green(data []byte) []byte {
    return mixer(data, colorGreen)
}

func Orange(data []byte) []byte {
    return mixer(data, colorOrange)
}

func Blue(data []byte) []byte {
    return mixer(data, colorBlue)
}

func Purple(data []byte) []byte {
    return mixer(data, colorPurple)
}

func Cyan(data []byte) []byte {
    return mixer(data, colorCyan)
}

func Gray(data []byte) []byte {
    return mixer(data, colorGray)
}

type event struct {
    Plain []byte
    Color []byte
    Data  []byte
}

type Logger struct {
    ctx    context.Context
    start  time.Time
    mu     sync.Mutex
    prefix string
    cnt    int64

    fsig      chan struct{}
    ch        chan *event
    pevents   *[]*event
    events    [2][]*event
    out       io.Writer
    rootdir   string
    namespace string

    oj bool // output as json format
}

func NewLogger(ctx context.Context, out io.Writer, prefix string, cnt int64, dir string, ns string) *Logger {
    log := &Logger{
        out:       out,
        prefix:    prefix,
        cnt:       cnt,
        fsig:      make(chan struct{}, 1),
        ch:        make(chan *event, MAX_INJECTION),
        ctx:       ctx,
        rootdir:   dir,
        namespace: ns,
    }
    log.pevents = &log.events[0]
    go log.Run()

    return log
}

// SetOutput sets the output destination for the logger.
func (l *Logger) SetOutput(w io.Writer) {
    l.mu.Lock()
    defer l.mu.Unlock()
    l.out = w
}

func (l *Logger) Run() {
    fsync := func() {
        l.mu.Lock()
        if len(*l.pevents) == 0 {
            l.mu.Unlock()
            return
        }
        pcurevents := l.pevents
        for i, _ := range l.events {
            if pcurevents != &l.events[i] {
                l.pevents = &l.events[i]
                break
            }
        }
        l.mu.Unlock()

        for _, v := range *pcurevents {
            if l.out == os.Stderr || l.out == os.Stdout {
                l.out.Write(v.Color)
            } else {
                l.out.Write(v.Plain)
            }
            l.out.Write([]byte(l.prefix))
            l.out.Write([]byte(fmt.Sprint(time.Now())))
            l.out.Write(v.Data)
        }
        *pcurevents = (*pcurevents)[0:0]
    }

    for i := 0; i < 10; i++ {
        go func() {
            for {
                select {
                case <-l.ctx.Done():
                    return
                case req := <-l.ch:
                    l.mu.Lock()
                    *l.pevents = append(*l.pevents, req)
                    l.cnt++
                    l.mu.Unlock()
                }
            }
        }()
    }

    go func() {
        defer func() {
            fsync()
        }()
        for {
            select {
            case <-l.ctx.Done():
                return
            case <-time.After(time.Second * 5):
                fsync()
            case <-l.fsig:
                fsync()
            }
        }
    }()
}

func (l *Logger) Output(e *event) {
    if len(l.ch)+1 >= MAX_INJECTION {
        l.fsig <- struct{}{}
    }
    l.ch <- e
}

func (l *Logger) Debug(v ...interface{}) {
    s := fmt.Sprint(v...)
    s += "\r\n"
    l.Output(&event{
        Plain: plainDebug,
        Color: Purple(plainDebug),
        Data:  []byte(s),
    })
}

func (l *Logger) Info(v ...interface{}) {
    s := fmt.Sprint(v...)
    s += "\r\n"
    l.Output(&event{
        Plain: plainInfo,
        Color: Green(plainInfo),
        Data:  []byte(s),
    })
}

func (l *Logger) Warn(v ...interface{}) {
    s := fmt.Sprint(v...)
    s += "\r\n"
    l.Output(&event{
        Plain: plainWarn,
        Color: Orange(plainWarn),
        Data:  []byte(s),
    })
}

func (l *Logger) Error(v ...interface{}) {
    s := fmt.Sprint(v...)
    s += "\r\n"
    l.Output(&event{
        Plain: plainError,
        Color: Red(plainError),
        Data:  []byte(s),
    })
}

var std = NewLogger(context.Background(), os.Stderr, "mylog->", 0, "", "misc")

func Default() *Logger { return std }

func main() {
    // 功能测试
    std.Info("这里是一条Info日志, a")
    std.Info("这里是一条Info日志, b")
    std.Info("这里是一条Info日志, c")
    std.Info("这里是一条Info日志, d")
    std.Info("这里是一条Info日志, e")
    std.Debug("这里是一条Debuf日志, m")
    std.Error("这里是一条Error日志, n")
    std.Warn("这里是一条Warn日志, n")

    outstr := ""
    tf := func() {
        std.start = time.Now()
        wg := &sync.WaitGroup{}
        for i := 0; i < 200; i++ {
            wg.Add(1)
            go func() {
                for j := 0; j < 100000; j++ {
                    std.Info("-----------------------------------------------------info 日志")
                }
                wg.Done()
            }()
        }
        time.Sleep(time.Microsecond * 200)
        wg.Wait()
        diff := time.Now().Sub(std.start).Seconds()
        outstr = fmt.Sprintf("logger is exit now, total %+v item, used %+v second, ", std.cnt, diff)
    }
    //fmt.Println("输出到终端,性能测试")
    //std.SetOutput(std.out)
    //tf()

    fmt.Println("输出到文件,性能测试")
    ///*
    fd, err := os.OpenFile("./test.log", os.O_CREATE|os.O_APPEND|os.O_RDWR, 0666)
    if err != nil {
        panic(err)
    }
    std.SetOutput(fd)
    tf()
    fd.Sync()
    //*/

    fmt.Println(outstr)
}
[email protected]:~/comment/logger# lscpu 
Architecture:                    x86_64
CPU op-mode(s):                  32-bit, 64-bit
Byte Order:                      Little Endian
Address sizes:                   39 bits physical, 48 bits virtual
CPU(s):                          4
On-line CPU(s) list:             0-3
Thread(s) per core:              1
Core(s) per socket:              4
Socket(s):                       1
NUMA node(s):                    1
Vendor ID:                       GenuineIntel
CPU family:                      6
Model:                           165
Model name:                      Intel(R) Core(TM) i3-10100 CPU @ 3.60GHz
Stepping:                        3
CPU MHz:                         3600.006
BogoMIPS:                        7200.01
Hypervisor vendor:               KVM
Virtualization type:             full
L1d cache:                       128 KiB
L1i cache:                       128 KiB
L2 cache:                        1 MiB
L3 cache:                        24 MiB
NUMA node0 CPU(s):               0-3
Vulnerability Itlb multihit:     KVM: Mitigation: VMX unsupported
Vulnerability L1tf:              Not affected
Vulnerability Mds:               Not affected
Vulnerability Meltdown:          Not affected
Vulnerability Spec store bypass: Vulnerable
Vulnerability Spectre v1:        Mitigation; usercopy/swapgs barriers and __user pointer sanitization
Vulnerability Spectre v2:        Mitigation; Full generic retpoline, STIBP disabled, RSB filling
Vulnerability Srbds:             Not affected
Vulnerability Tsx async abort:   Not affected
Flags:                           fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush mmx fxsr sse sse2 ht syscall nx rdtscp lm constant_tsc rep_good nopl xtopology nonstop_tsc cpuid ts
                                 c_known_freq pni pclmulqdq ssse3 cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt aes xsave avx rdrand hypervisor lahf_lm abm 3dnowprefetch invpcid_single fsgsbase avx2 invpcid rdseed 
                                 clflushopt md_clear flush_l1d arch_capabilities
[email protected]:~/comment/logger#
[email protected]:~/comment/logger# free -h
              total        used        free      shared  buff/cache   available
Mem:          7.8Gi       2.1Gi       1.8Gi       1.0Mi       3.9Gi       5.4Gi
Swap:         1.6Gi       1.0Mi       1.6Gi
[email protected]:~/comment/logger#

功能测试: 在这里插入图片描述

压力测试:

参考值20w/秒。 磁盘,显卡,cpu等的性能等有对此时结果有影响。

以日志文件为输出时: (我要去给朋友过生日吃饭,先不写了)


有疑问加站长微信联系(非本文作者))

280

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:701969077


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK