0

go自学记录——小型日志系统(基于go基础)

 2 years ago
source link: https://studygolang.com/articles/35300
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

go自学记录——小型日志系统(基于go基础)

VacancyS · 28分钟之前 · 32 次点击 · 预计阅读时间 7 分钟 · 大约8小时之前 开始浏览    

一、系统实现功能:

  1. 文件读取
  2. 正则匹配,规范化部分数据
  3. 录入数据库
  4. 简单输出到web端

二、代码部分:

8a29186facfceb7725dc2005435a54c8.png简单架构

代码部分:

package main

//系统主体部分
import (
	"bufio"
	"fmt"
	"io"
	"log"
	"net/http"
	"net/url"
	"os"
	"regexp"
	"strconv"
	"strings"
	"time"

	"github.com/gin-gonic/gin"
	_ "github.com/go-sql-driver/mysql"
	"github.com/jmoiron/sqlx"
)
//读接口
type Reader interface {
	Read(rc chan string)
}
//写结构
type Writer interface {
	Write(wc chan *Message)
}
//处理模块结构体,包含读入通道,写出通道,以及读写接口结构
type LogProcess struct {
	readChanel  chan string
	writeChanel chan *Message
	read        Reader
	write       Writer
}
//实现读接口结构体
type ReadFromFile struct {
	path string
}
//实现写接口结构体
type WriteToDB struct {
	DBinfo string
}
//格式化数据结构体,用于格式化从文件中读出的数据

type Message struct {
	TimeLocal    time.Time `db:"time"`
	BytesSent    int       `db:"bytesSent"`
	Path         string    `db:"path"`
	Method       string    `db:"method"`
	Scheme       string    `db:"scheme"`
	Status       string    `db:"status"`
	UpstreamTime float64   `db:"upstreamtime"`
	RequestTime  float64   `db:"requestTime"`
}
// 系统日志结构体,用于记录该系统运转情况,并反馈到前端response
type systemInfo struct {
	Tps          float64 `json:"tps"`
	RunTime      string  `json:"runtime"`
	HandleLine   int     `json:"handleline"`
	ReadChanLen  int     `json:"readchanlen"`
	WriteChanLen int     `json:"writechanlen"`
	ErrNum       int     `json:"errnum"`
}
//记录错误日志的缓存通道
var ErrNum chan int = make(chan int, 200)
//错误类型
const (
	TypeHandleLine = 0
	TypeErrNum     = 1
)
//日志监控结构体

type monitor struct {
	startTime time.Time
	data      *systemInfo
	tpsli     []int
}
//日志初始化函数
func (m *monitor) start(lp *LogProcess) {
   //记录错误状态
	go func() {
		for n := range ErrNum {
			switch n {
			case TypeErrNum:
				m.data.ErrNum += 1
			case TypeHandleLine:
				m.data.HandleLine += 1
			}
		}
	}()
    //计算TPS吞吐量
	ticker := time.NewTicker(time.Second * 5)
	go func() {
		for {
			<-ticker.C
			m.tpsli = append(m.tpsli, m.data.HandleLine)
			if len(m.tpsli) > 2 {
				m.tpsli = m.tpsli[1:]
			}
		}
	}()
    // 初始化前端响应体
	engine := gin.Default()
	engine.GET("/monitor", func(c *gin.Context) {
		m.data.RunTime = time.Since(m.startTime).String()
		m.data.ReadChanLen = len(lp.readChanel)
		m.data.WriteChanLen = len(lp.writeChanel)
		if len(m.tpsli) >= 2 {
			m.data.Tps = float64((m.tpsli[1] - m.tpsli[0]) / 5)
		}
		c.JSON(http.StatusOK, m.data)

	})
	engine.Run(":8080")

}

//127.0.0.12 -- [04/Mar/2018:13:49:52 + 0000] http "GET /foo?query=t HTTP/1.0" 200 2133 "-"
// "KeepAliveClient" "-" 1.005 1.854 格式化的结构

func (l *LogProcess) Porcess() {
	fmt.Println("LogProcess is transforming data")
//正则匹配文件的模拟数据
	r := regexp.MustCompile(`([\d\.]+)\s+([^ \[]+)\s+([^ \[]+)\s+\[([^\]]+)\]\s+([a-z]+)\s+\"([^"]+)\"\s+(\d{3})\s+(\d+)\s+\"([^"]+)\"\s+\"(.*?)\"\s+\"([\d\.-]+)\"\s+([\d\.-]+)\s+([\d\.-]+)`)

	for data := range l.readChanel {
		ret := r.FindStringSubmatch(string(data))
		if len(ret) != 14 {
			ErrNum <- TypeErrNum
			log.Println("FindStringSubmatch error ", ret)
			continue
		}
		message := &Message{}
		//
		TimeZone, _ := time.LoadLocation("Asia/Shanghai")
		time, _ := time.ParseInLocation("02/Jan/2006:15:04:05 +0000", ret[4], TimeZone)
		message.TimeLocal = time
		//
		bytesent, _ := strconv.Atoi(ret[8])
		message.BytesSent = bytesent
		//
		retslice := strings.Split(ret[6], " ")
		if len(retslice) != 3 {
			ErrNum <- TypeErrNum
			log.Printf("Split error %v \n", retslice)
		}
		message.Method = retslice[0]
		url, err := url.Parse(retslice[1])
		if err != nil {
			ErrNum <- TypeErrNum
			log.Printf("Parse error %v \n", url)
		}
		message.Path = url.Path
		message.Status = ret[7]
		message.Scheme = ret[5]
		message.UpstreamTime, _ = strconv.ParseFloat(ret[12], 64)
		message.RequestTime, _ = strconv.ParseFloat(ret[13], 64)

		/*
			[172.0.0.12 - - [04/Mar/2018:13:49:52 +0000] http "GET /foo?query=t HTTP/1.0" 200 2133 "-" "KeepAliveClient" "-" 1.005 1.854
			172.0.0.12 - - 04/Mar/2018:13:49:52 +0000 http GET /foo?query=t HTTP/1.0 200 2133 - KeepAliveClient - 1.005 1.854]
		*/
		fmt.Println(ret[4])
		l.writeChanel <- message
	}

}
//读接口函数
func (r *ReadFromFile) Read(rc chan string) {
	f, err := os.OpenFile(r.path, 'r', 0644)
	if err != nil {
		ErrNum <- TypeErrNum
		fmt.Printf("%v", err.Error())
	}
	f.Seek(0, 2)
	freader := bufio.NewReader(f)
	for {
		data, err := freader.ReadBytes('\n')
		if err == io.EOF {
			time.Sleep(1 * time.Second)
			continue
		} else if err != nil {
			ErrNum <- TypeErrNum
			panic(fmt.Sprintf("ReadLine panic %v", err.Error()))
		}
		ErrNum <- TypeHandleLine
		rc <- string(data[0 : len(data)-1])
	}
}
//写入数据库接口函数
func (w *WriteToDB) Write(wc chan *Message) {
// sql语句,问好和stmt.Exec(args),args 对应,为了简单的防sql注入攻击
	sql := "insert into logdata values(?,?,?,?,?,?,?,?) "
	DB, err := sqlx.Open("mysql", "root:123456@tcp(127.0.0.1:3306)/golang_test")
//用户名:密码&协议(ip:port)/databasename
	if err != nil {
		ErrNum <- TypeErrNum
		panic("open mysql failed " + err.Error())
	}
	for data := range wc {
		fmt.Printf("write data is %v", data)
		stmt, err := DB.Prepare(sql)
		if err != nil {
			ErrNum <- TypeErrNum
			panic(err.Error())
		}
		stmt.Exec(data.TimeLocal, data.BytesSent, data.Path, data.Method, data.Scheme, data.Status, data.UpstreamTime, data.RequestTime)

	}

}

func main() {

	readerIO := &ReadFromFile{
		path: "./access.log",
	}
	writeIO := &WriteToDB{
		DBinfo: "username&&password",
	}
	LProcess := &LogProcess{
		readChanel:  make(chan string),
		writeChanel: make(chan *Message),
		read:        readerIO,
		write:       writeIO,
	}
	m := &monitor{
		startTime: time.Now(),
		data:      &systemInfo{},
	}
	for i := 0; i < 2; i++ {
		go LProcess.read.Read(LProcess.readChanel)
	}
	for i := 0; i < 3; i++ {
		go LProcess.Porcess()
	}

	for i := 0; i < 5; i++ {
		go LProcess.write.Write(LProcess.writeChanel)
	}

	m.start(LProcess)
}

模拟数据部分:

package main

import (
	"fmt"
	"os"
	"time"
)

func main() {
	f, err := os.OpenFile("./others/access.log", 'w', 0644)
// 写入文件存放目录,"w" ,以写方式打开,0644,read,write,execute 4  2  1
	if err != nil {
		os.Exit(1)
	}
	for {
		n, err := f.WriteString(`172.0.0.12 - - [04/Mar/2018:13:49:52 +0000] http "GET /foo?query=t HTTP/1.0" 200 2133 "-" "KeepAliveClient" "-" 1.005 1.854` + "\n")
		fmt.Println(n)
		time.Sleep(1 * time.Second)
		if err != nil {
			fmt.Printf("%v", err)
		}
	}
}



有疑问加站长微信联系(非本文作者)

280

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:701969077


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK