19

golang协程池tunny源码解析

 4 years ago
source link: https://studygolang.com/articles/26279
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

tunny

github地址: https://github.com/Jeffail/tunny

项目结构

AbYnmy3.png!web

tunny的项目结构非常简单,核心文件就是tunny.go与worker.go

整体分析

Fv6VfeQ.jpg!web

tunny主要是通过reqChan管道来联系pool与worker之间的关系,worker的数量与协程池的大小相等,在初始化协程池时决定;各个worker竞争地获取reqChan中的数据,然后处理,最后返回给pool;

代码详解

type Pool struct {
      queuedJobs int64
  
      ctor    func() Worker
      workers []*workerWrapper
      reqChan chan workRequest
  
      workerMut sync.Mutex
}

Pool结构体:

  • queuedJobs,这个变量代表pool当前积压的job数量
  • ctor,这个变量代表worker具体的构造函数
  • workers,这个变量代表pool实际拥有的worker
  • reqChan,这个变量是pool与所有worker进行通信的管道,所有worker与pool都使用相同的reqChan指针
  • workerMut,这个变量是在pool进行SetSize操作时使用的,防止不同协程同时对size进行操作
type Worker interface {
      // Process will synchronously perform a job and return the result.
      Process(interface{}) interface{}
  
      // BlockUntilReady is called before each job is processed and must block the
      // calling goroutine until the Worker is ready to process the next job.
      BlockUntilReady()
  
      // Interrupt is called when a job is cancelled. The worker is responsible
      // for unblocking the Process implementation.
      Interrupt()
                                                                                                                               
      // Terminate is called when a Worker is removed from the processing pool
      // and is responsible for cleaning up any held resources.
      Terminate()
  }

worker在tunny中被设计成了一个interface,因为在之后的代码中可以看到,worker可以有许多不同地实现,正如之前一篇整理的博客所说: golang编码技巧总结 ,我们在写代码时都应该使用interface,来面向接口编程,实现解耦;

两种worker

// closureWorker is a minimal Worker implementation that simply wraps a
  // func(interface{}) interface{}
  type closureWorker struct {
      processor func(interface{}) interface{}
  }

闭包worker,这个worker是最常用的一种worker,它主要执行初始化时赋予它的processeor函数来完成工作;

type callbackWorker struct{}
  
func (w *callbackWorker) Process(payload interface{}) interface{} {
      f, ok := payload.(func())
      if !ok {
          return ErrJobNotFunc
      }
      f()
      return nil
  }

回调worker,这种worker处理的数据必须是一个函数,然后调用这个函数;

// NewFunc creates a new Pool of workers where each worker will process using
  // the provided func.
 func NewFunc(n int, f func(interface{}) interface{}) *Pool {
      return New(n, func() Worker {
          return &closureWorker{
              processor: f,
          }   
      })  
  }

初始化协程池时需要两个参数,一个是协程池大小n,一个是希望协程池执行的函数,这个函数最终交由闭包worker,运行时由它实际处理数据;

func New(n int, ctor func() Worker) *Pool {
      p := &Pool{
          ctor:    ctor,
          reqChan: make(chan workRequest),
      }
      p.SetSize(n)
                                                                                                                               
      return p
 }

可以看到,reqChan在这时出现了,这个在之后的代码中将是连接pool与worker的核心;

SetSize会做什么呢?

func (p *Pool) SetSize(n int) {
      p.workerMut.Lock()
      defer p.workerMut.Unlock()
  
      lWorkers := len(p.workers)
      if lWorkers == n {
          return
      }                                                                                                                        
  
      // Add extra workers if N > len(workers)
      for i := lWorkers; i < n; i++ {
          p.workers = append(p.workers, newWorkerWrapper(p.reqChan, p.ctor()))
      }
  
      // Asynchronously stop all workers > N
      for i := n; i < lWorkers; i++ {
          p.workers[i].stop()
      }
      
      // Synchronously wait for all workers > N to stop
      for i := n; i < lWorkers; i++ {
          p.workers[i].join()
      }
  
      // Remove stopped workers from slice
      p.workers = p.workers[:n]
  }

首先,会对这个函数加锁,这是为了防止在多个协程同时进行SetSize操作;

其次,当worker数量小于需要SetSize的数量,则增加worker的数量;

若worker数量大于SetSize的数量,则减小worker的数量;

增加worker的数量是如何增加呢? newWorkerWrapper 函数有很多值得关注的地方,值得注意的是,pool将它的reqChan传给了这个函数,也就是传给了worker;

func newWorkerWrapper(
      reqChan chan<- workRequest,
      worker Worker,
  ) *workerWrapper {
      w := workerWrapper{                                                                                                      
          worker:        worker,
          interruptChan: make(chan struct{}),
          reqChan:       reqChan,
          closeChan:     make(chan struct{}),
          closedChan:    make(chan struct{}),
      }
  
      go w.run()
  
      return &w
 }

可以看到,在调用初始化newWorkerWrapper后,go了一个协程,进行w.run()操作,worker在这里是调用的之前传入的闭包worker的构造函数生成的,因此这里的worker是闭包worker;

func (w *workerWrapper) run() {
      jobChan, retChan := make(chan interface{}), make(chan interface{})
      defer func() {
          w.worker.Terminate()
          close(retChan)
          close(w.closedChan)                                                                                                  
      }()
  
      for {
          // NOTE: Blocking here will prevent the worker from closing down.
          w.worker.BlockUntilReady()
          select {
          case w.reqChan <- workRequest{
              jobChan:       jobChan,
              retChan:       retChan,
              interruptFunc: w.interrupt,
          }:
              select {
              case payload := <-jobChan:
                  result := w.worker.Process(payload)
                  select {
                  case retChan <- result:
                  case <-w.interruptChan:
                      w.interruptChan = make(chan struct{})
                  }
              case _, _ = <-w.interruptChan:
                  w.interruptChan = make(chan struct{})
              }                                                                                                                
          case <-w.closeChan:
              return
          }
      }
  }

解读这个run函数,这是整个worker的核心;

首先,能看到一个大的for循环,里面嵌套了select;

一进入select,会无脑往reqChan里传入workRequest,这时需要与pool的接收函数对应起来看:

func (p *Pool) Process(payload interface{}) interface{} {
      atomic.AddInt64(&p.queuedJobs, 1)
  
      request, open := <-p.reqChan
      if !open {
          panic(ErrPoolNotRunning)
      }   
  
      request.jobChan <- payload
  
      payload, open = <-request.retChan
      if !open {
          panic(ErrWorkerClosed)
      }   
  
      atomic.AddInt64(&p.queuedJobs, -1)                                                                                       
      return payload
  }

可以发现,因为worker会无脑往reqChan管道里传入workRequest,因此pool一定会取到塞入的值交给request变量,payload是实际处理的数据,pool将其塞入workRequest的jobChan中,之后阻塞等待从retChan取得结果,由于这个jobChan与worker的jobChan是同一个指针,因此payload能在worker的

select {
              case payload := <-jobChan:
                  result := w.worker.Process(payload)
                  select {
                  case retChan <- result:
                  case <-w.interruptChan:
                      w.interruptChan = make(chan struct{})
                  }
                  ...

case语句中被取到,然后进行处理,处理完后进入下一个select语句,无脑将result塞到retChan中;由于worker的retChan与pool的retChan是同一个指针,因此pool取到了retChan的结果,将其返回;

多个worker的情况,则会竞争从reqChan取数据,但是总能保证只有size个worker在工作,达到了限制协程数量的目的。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK