47

记fastHTTP协程池的实现

 4 years ago
source link: https://www.tuicool.com/articles/qMny2mb
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

golang的net/http包实现了处理连接时比较简单粗暴,相比之下有性能更好的http库,项目里用到过的 fasthttp 就是一个很好的实现, Go开发HTTP的另一个选择fasthttp 中总结了它跟标准库实现的几点不同:

  • net/http 的实现是一个连接新建一个 goroutine, fasthttp 是利用一个 worker pool做了协程池,复用 goroutine,减轻 runtime 调度 goroutine 的压力
  • net/http 解析的请求数据很多放在http.Header或者http.Request.Form中,数据结构 map[string]stringmap[string][]string 涉及不必要的 []byte 到 string 的转换,是可以规避的
  • net/http 解析 HTTP 请求每次生成新的 *http.Requesthttp.ResponseWriterfasthttp 解析 HTTP 数据到 *fasthttp.RequestCtx ,然后使用 sync.Pool 复用结构实例,减少对象的数量
  • fasthttp 会延迟解析 HTTP 请求中的数据,尤其是 Body 部分。这样节省了很多不直接操作 Body 的情况的消耗

workerpool

net包和fasthttp最大的不同可能就是server在处理连接的时候使用了协程池。在并发量大的时候,goroutine数量巨大,runtime层的上下文切换成本对性能有影响。而fasthttp用协程池规避了这个问题,去年在做AOS的时候,项目中后期也引入了workerpool。

func (s *Server) Serve(ln net.Listener) error {
    // default concurrency set to 256*1024
    maxWorkersCount := s.getConcurrency()
    s.concurrencyCh = make(chan struct{}, maxWorkersCount)
    wp := &workerPool{
        WorkerFunc:      s.serveConn,
        MaxWorkersCount: maxWorkersCount,
        LogAllErrors:    s.LogAllErrors,
        Logger:          s.logger(),
    }
    wp.Start()
    for {
        if c, err = acceptConn(s, ln, &lastPerIPErrorTime); err != nil {
            wp.Stop()
            if err == io.EOF {
                return nil
            }
            return err
        }
        if !wp.Serve(c) {
            s.writeFastError(c, StatusServiceUnavailable,
            "The connection cannot be served for Server.Concurrency limit exceeded")
            c.Close()
            time.Sleep(100 * time.Millisecond)
        }
        c = nil
    }
}

workerpool的数据结构中WorkerFunc就是 s.serveConn ,即每条net.conn的处理函数。workerChanPool是个对象池,MaxIdleWorkerDuration是worker的最大空闲时间,ready是可用的worker列表,也就是说所有goroutine worker是存放在一个数组里面的。

这个数组模拟一个类似栈的FILO队列,也就是说我们每次使用的worker都从队列的尾部开始取。

type workerPool struct {
    WorkerFunc func(c net.Conn) error
    MaxWorkersCount int
    LogAllErrors bool
    MaxIdleWorkerDuration time.Duration
    Logger Logger
  
    lock         sync.Mutex
    workersCount int
    mustStop     bool
    ready []*workerChan
    stopCh chan struct{}
    workerChanPool sync.Pool
}

type workerChan struct {
    lastUseTime time.Time
    ch          chan net.Conn
}

start & stop

wp.start开启了一个goroutine,定时清理workerpool中未使用时间超过maxIdleWorkerDuration的goroutine。

func (wp *workerPool) Start() {
    wp.stopCh = make(chan struct{})
    stopCh := wp.stopCh
    go func() {
        var scratch []*workerChan
        for {
            wp.clean(&scratch)
            select {
            case <-stopCh:
                return
            default:
                time.Sleep(wp.getMaxIdleWorkerDuration())
            }
        }
    }()
}

stop停止了ready里所有ch清空,并清空ready。 资源清理时chan要置nil

func (wp *workerPool) Stop() {
    close(wp.stopCh)
    wp.stopCh = nil
    wp.lock.Lock()
    ready := wp.ready
    for i, ch := range ready {
        ch.ch <- nil
        ready[i] = nil
    }
    wp.ready = ready[:0]
    wp.mustStop = true
    wp.lock.Unlock()
}

serve

实现中还涉及到如果wp已经stop,那worker退出后channel对象通过临时对象池管理等细节,这里就跳过了。总结 wp.serve到s.serveconn的过程大概如下

  1. 当ready这个可用worker列表中没有ch可用时,创建一个新ch绑定 wp.Workfunc 的goroutine。即新建了一个协程worker,这个协程从绑定的ch中获取待处理net.Conn。
  2. wp.Serve把accept的conn发到这个ch上,供绑定的协程worker处理。
  3. worker处理完后 release 这个绑定的ch到ready栈里。下一次有连接来时getCh优先从ready栈里找ch,也就是找worker。对ready的读取 FILO ,类似栈。
func (wp *workerPool) Serve(c net.Conn) bool {
    ch := wp.getCh()
    if ch == nil {
        return false
    }
    ch.ch <- c
    return true
}

getCh的实现可以理解为一个用来执行workFunc的goroutine都绑定了一个workerChan。把要处理的conn发到这个workerChan,这个goroutine就开始执行。没有要执行的conn则goroutine阻塞,直到下次workerChan有连接发来。

func (wp *workerPool) getCh() *workerChan {
    var ch *workerChan
    createWorker := false

    wp.lock.Lock()
    ready := wp.ready
    n := len(ready) - 1
    if n < 0 {
        if wp.workersCount < wp.MaxWorkersCount {
            createWorker = true
            wp.workersCount++
        }
    } else {
        ch = ready[n]
        ready[n] = nil
        wp.ready = ready[:n]
    }
    wp.lock.Unlock()

    if ch == nil {
        if !createWorker {
            return nil
        }
        vch := wp.workerChanPool.Get()
        if vch == nil {
            vch = &workerChan{
                ch: make(chan net.Conn, workerChanCap),
            }
        }
        ch = vch.(*workerChan)
        go func() {
            wp.workerFunc(ch)
            wp.workerChanPool.Put(vch)
        }()
    }
    return ch
}

worker处理完一个连接后,将release这个连接到ready这个可用worker栈。即表示这时worker阻塞,可以交给它任务啦。 同时处理完net.Conn后要置nil。 正常情况下worker是不退出的,除非wp.Stop。

func (wp *workerPool) workerFunc(ch *workerChan) {
    var c net.Conn
    var err error
    for c = range ch.ch {
        if c == nil {
            break
        }
        if err = wp.WorkerFunc(c); err != nil && err != errHijacked {
        }
        c = nil
        if !wp.release(ch) {
            break
        }
    }
    wp.lock.Lock()
    wp.workersCount--
    wp.lock.Unlock()
}

func (wp *workerPool) release(ch *workerChan) bool {
    ch.lastUseTime = CoarseTimeNow()
    wp.lock.Lock()
    if wp.mustStop {
        wp.lock.Unlock()
        return false
    }
    wp.ready = append(wp.ready, ch)
    wp.lock.Unlock()
    return true
}

clean

最后看下start中开启的clean定时任务。之所以清理过程只从前遍历清理前面部分,是因为ready是FILO先进后出的,所以ready中越往后的空闲时间最短。

func (wp *workerPool) clean(scratch *[]*workerChan) {
    maxIdleWorkerDuration := wp.getMaxIdleWorkerDuration()
    currentTime := time.Now()
    wp.lock.Lock()
    ready := wp.ready
    n := len(ready)
    i := 0
    for i < n && currentTime.Sub(ready[i].lastUseTime) > maxIdleWorkerDuration {
        i++
    }
    *scratch = append((*scratch)[:0], ready[:i]...)
    if i > 0 {
        m := copy(ready, ready[i:])
        for i = m; i < n; i++ {
            ready[i] = nil
        }
        wp.ready = ready[:m]
    }
    wp.lock.Unlock()
    tmp := *scratch
    for i, ch := range tmp {
        ch.ch <- nil
        tmp[i] = nil
    }
}

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK