记fastHTTP协程池的实现
source link: https://www.tuicool.com/articles/qMny2mb
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
golang的net/http包实现了处理连接时比较简单粗暴,相比之下有性能更好的http库,项目里用到过的 fasthttp 就是一个很好的实现, Go开发HTTP的另一个选择fasthttp 中总结了它跟标准库实现的几点不同:
-
net/http
的实现是一个连接新建一个 goroutine,fasthttp
是利用一个 worker pool做了协程池,复用 goroutine,减轻 runtime 调度 goroutine 的压力 -
net/http
解析的请求数据很多放在http.Header或者http.Request.Form中,数据结构map[string]string
或map[string][]string
涉及不必要的 []byte 到 string 的转换,是可以规避的 -
net/http
解析 HTTP 请求每次生成新的*http.Request
和http.ResponseWriter
,fasthttp
解析 HTTP 数据到*fasthttp.RequestCtx
,然后使用sync.Pool
复用结构实例,减少对象的数量 -
fasthttp
会延迟解析 HTTP 请求中的数据,尤其是 Body 部分。这样节省了很多不直接操作 Body 的情况的消耗
workerpool
net包和fasthttp最大的不同可能就是server在处理连接的时候使用了协程池。在并发量大的时候,goroutine数量巨大,runtime层的上下文切换成本对性能有影响。而fasthttp用协程池规避了这个问题,去年在做AOS的时候,项目中后期也引入了workerpool。
func (s *Server) Serve(ln net.Listener) error { // default concurrency set to 256*1024 maxWorkersCount := s.getConcurrency() s.concurrencyCh = make(chan struct{}, maxWorkersCount) wp := &workerPool{ WorkerFunc: s.serveConn, MaxWorkersCount: maxWorkersCount, LogAllErrors: s.LogAllErrors, Logger: s.logger(), } wp.Start() for { if c, err = acceptConn(s, ln, &lastPerIPErrorTime); err != nil { wp.Stop() if err == io.EOF { return nil } return err } if !wp.Serve(c) { s.writeFastError(c, StatusServiceUnavailable, "The connection cannot be served for Server.Concurrency limit exceeded") c.Close() time.Sleep(100 * time.Millisecond) } c = nil } }
workerpool的数据结构中WorkerFunc就是 s.serveConn
,即每条net.conn的处理函数。workerChanPool是个对象池,MaxIdleWorkerDuration是worker的最大空闲时间,ready是可用的worker列表,也就是说所有goroutine worker是存放在一个数组里面的。
这个数组模拟一个类似栈的FILO队列,也就是说我们每次使用的worker都从队列的尾部开始取。
type workerPool struct { WorkerFunc func(c net.Conn) error MaxWorkersCount int LogAllErrors bool MaxIdleWorkerDuration time.Duration Logger Logger lock sync.Mutex workersCount int mustStop bool ready []*workerChan stopCh chan struct{} workerChanPool sync.Pool } type workerChan struct { lastUseTime time.Time ch chan net.Conn }
start & stop
wp.start开启了一个goroutine,定时清理workerpool中未使用时间超过maxIdleWorkerDuration的goroutine。
func (wp *workerPool) Start() { wp.stopCh = make(chan struct{}) stopCh := wp.stopCh go func() { var scratch []*workerChan for { wp.clean(&scratch) select { case <-stopCh: return default: time.Sleep(wp.getMaxIdleWorkerDuration()) } } }() }
stop停止了ready里所有ch清空,并清空ready。 资源清理时chan要置nil 。
func (wp *workerPool) Stop() { close(wp.stopCh) wp.stopCh = nil wp.lock.Lock() ready := wp.ready for i, ch := range ready { ch.ch <- nil ready[i] = nil } wp.ready = ready[:0] wp.mustStop = true wp.lock.Unlock() }
serve
实现中还涉及到如果wp已经stop,那worker退出后channel对象通过临时对象池管理等细节,这里就跳过了。总结 wp.serve到s.serveconn的过程大概如下
-
当ready这个可用worker列表中没有ch可用时,创建一个新ch绑定
wp.Workfunc
的goroutine。即新建了一个协程worker,这个协程从绑定的ch中获取待处理net.Conn。 - wp.Serve把accept的conn发到这个ch上,供绑定的协程worker处理。
-
worker处理完后
release
这个绑定的ch到ready栈里。下一次有连接来时getCh优先从ready栈里找ch,也就是找worker。对ready的读取 FILO ,类似栈。
func (wp *workerPool) Serve(c net.Conn) bool { ch := wp.getCh() if ch == nil { return false } ch.ch <- c return true }
getCh的实现可以理解为一个用来执行workFunc的goroutine都绑定了一个workerChan。把要处理的conn发到这个workerChan,这个goroutine就开始执行。没有要执行的conn则goroutine阻塞,直到下次workerChan有连接发来。
func (wp *workerPool) getCh() *workerChan { var ch *workerChan createWorker := false wp.lock.Lock() ready := wp.ready n := len(ready) - 1 if n < 0 { if wp.workersCount < wp.MaxWorkersCount { createWorker = true wp.workersCount++ } } else { ch = ready[n] ready[n] = nil wp.ready = ready[:n] } wp.lock.Unlock() if ch == nil { if !createWorker { return nil } vch := wp.workerChanPool.Get() if vch == nil { vch = &workerChan{ ch: make(chan net.Conn, workerChanCap), } } ch = vch.(*workerChan) go func() { wp.workerFunc(ch) wp.workerChanPool.Put(vch) }() } return ch }
worker处理完一个连接后,将release这个连接到ready这个可用worker栈。即表示这时worker阻塞,可以交给它任务啦。 同时处理完net.Conn后要置nil。 正常情况下worker是不退出的,除非wp.Stop。
func (wp *workerPool) workerFunc(ch *workerChan) { var c net.Conn var err error for c = range ch.ch { if c == nil { break } if err = wp.WorkerFunc(c); err != nil && err != errHijacked { } c = nil if !wp.release(ch) { break } } wp.lock.Lock() wp.workersCount-- wp.lock.Unlock() } func (wp *workerPool) release(ch *workerChan) bool { ch.lastUseTime = CoarseTimeNow() wp.lock.Lock() if wp.mustStop { wp.lock.Unlock() return false } wp.ready = append(wp.ready, ch) wp.lock.Unlock() return true }
clean
最后看下start中开启的clean定时任务。之所以清理过程只从前遍历清理前面部分,是因为ready是FILO先进后出的,所以ready中越往后的空闲时间最短。
func (wp *workerPool) clean(scratch *[]*workerChan) { maxIdleWorkerDuration := wp.getMaxIdleWorkerDuration() currentTime := time.Now() wp.lock.Lock() ready := wp.ready n := len(ready) i := 0 for i < n && currentTime.Sub(ready[i].lastUseTime) > maxIdleWorkerDuration { i++ } *scratch = append((*scratch)[:0], ready[:i]...) if i > 0 { m := copy(ready, ready[i:]) for i = m; i < n; i++ { ready[i] = nil } wp.ready = ready[:m] } wp.lock.Unlock() tmp := *scratch for i, ch := range tmp { ch.ch <- nil tmp[i] = nil } }
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK