11

ants协程池学习笔记

 3 years ago
source link: https://studygolang.com/articles/31847
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

ants协程池学习笔记

go

ants是一个高性能的协程池

入口函数

首先看看PoolWithFund结构体定义:

// PoolWithFunc accepts the tasks from client,
// it limits the total of goroutines to a given number by recycling goroutines.
type PoolWithFunc struct {
    // capacity of the pool.
    capacity int32

    // running is the number of the currently running goroutines.
    running int32

    // workers is a slice that store the available workers.
    workers []*goWorkerWithFunc

    // state is used to notice the pool to closed itself.
    state int32

    // lock for synchronous operation.
    lock sync.Locker

    // cond for waiting to get a idle worker.
    cond *sync.Cond

    // poolFunc is the function for processing tasks.
    poolFunc func(interface{})

    // workerCache speeds up the obtainment of the an usable worker in function:retrieveWorker.
    workerCache sync.Pool

    // blockingNum is the number of the goroutines already been blocked on pool.Submit, protected by pool.lock
    blockingNum int

    options *Options
}

NewPoolWithFunc

关键代码

p := &PoolWithFunc{
        capacity: int32(size),
        poolFunc: pf,
        lock:     internal.NewSpinLock(), //自旋锁
        options:  opts,
    }
    p.workerCache.New = func() interface{} {
        return &goWorkerWithFunc{
            pool: p,
            args: make(chan interface{}, workerChanCap),
        }
    }
  • capacity是协程池的容量
  • poolFunc是真正执行工作的函数
  • lock是一个自旋锁

自旋锁的作用是什么呢?

对于锁的竞争不激烈,且占用锁时间非常短的代码块来说性能能大幅度的提升,因为自旋的消耗会小于线程阻塞挂起再唤醒的操作的消耗,这些操作会导致线程发生两次上下文切换

  • workerCache是一个sync.Pool类型的临时对象池,它复用的对象是一个goWorkerWithFunc,这个结构体实际是执行工作的worker。它由以下结构组成:
// goWorkerWithFunc is the actual executor who runs the tasks,
// it starts a goroutine that accepts tasks and
// performs function calls.
type goWorkerWithFunc struct {
    // pool who owns this worker.
    pool *PoolWithFunc

    // args is a job should be done.
    args chan interface{}

    // recycleTime will be update when putting a worker back into queue.
    recycleTime time.Time
}

其中需要交给协程池处理的数据都会发送到args管道,该worker从该管道中取值处理即可

会初始化cond变量:

p.cond = sync.NewCond(p.lock)

这里使用的锁是自旋锁

做完初始化操作后,会启动一个后台协程:

// Start a goroutine to clean up expired workers periodically.
    go p.periodicallyPurge()

从注释来看,它的作用是周期性地清理过期的worker。

提交任务给协程池

// Invoke submits a task to pool.
func (p *PoolWithFunc) Invoke(args interface{}) error {
    if atomic.LoadInt32(&p.state) == CLOSED {
        return ErrPoolClosed
    }
    var w *goWorkerWithFunc
    if w = p.retrieveWorker(); w == nil {
        return ErrPoolOverload
    }
    w.args <- args
    return nil
}
  • 由代码可见,首先会判断当前协程池的状态是否为CLOSED,如果是的话返回一个协程池关闭的错误,提交失败
  • 然后会调用retrieveWorker方法获取一个worker,如果没获取到会返回一个池子已满的错误,提交失败
  • 获取worker成功,将提交的参数发送到worker的args管道中即可

获取worker

生产worker

spawnWorker := func() {
        w = p.workerCache.Get().(*goWorkerWithFunc)
        w.run()
}

首先尝试从临时对象池获取worker,若获取不到则会新建一个worker。然后调用worker的run方法让其运行

1. 首先给协程池加锁
2. 检查协程池的workers数组是否>0,workers数组存储的是所有可用的worker,若有空闲的worker,则取空闲worker数组的最后一个,然后将其从worker数组中移除,然后解锁
3. 若running的worker小于协程池的容量,则解锁协程池,生产一个worker
4. 若无可用的worker,running数量也大于等于协程池的容量,若协程池是非阻塞模式则直接返回;
5. 若有设置最大阻塞任务数,超过这个限制则直接返回
6. 阻塞任务数+1,然后协程池的p.Cond开始Wait,等待被别的协程唤醒
7. 若之后别的协程发出了唤醒信号,那么阻塞任务数-1,此时检查running状态的为0,那么生产一个worker
8. running状态数不为0,那么重新尝试从workers数组获取最后一个worker,若workers长度为0,则再次阻塞住,重新跳到`步骤5`处循环

可以看出,作者对协程池的中心思想是一个worker复用的过程:首先尝试从workers数组中取可用的worker,如果取不到则从临时对象池取一个worker,再取不到才会新建一个worker。这里有几个疑问:

1. worker何时被放回到临时对象池中?
2. workers数组何时会进行插入操作?

我们接着看看worker的run方法

worker运行

  1. 协程池的running状态数量+1

然后会启动一个协程

1. 监听args管道,不断从中取出参数进行处理
2. 如果参数为nil,那么这个协程中止
3. 注册的函数执行完参数后,会将自身插入到workers数组中,并将recycleTime更新为当前时间,然后给cond发一个通知Signal,这是为了告诉阻塞在`无可用worker且running数量大于等于协程池容量的获取worker的协程`,现在有worker可用了
4. 如果协程池的状态已为CLOSED或者running数量的协程池已经大于了协程池的容量,那么这个协程也会中止
5. 协程中止后,会进行一些收尾工作:
    1. running数量-1
    2. 将worker自身放回到临时对象池中

可以看出,worker协程的中止有两种情况会触发:一个是arg参数为nil,另一个是协程池状态相关导致worker中止(closed状态或running数超出限制)

其余情况下,worker协程是常驻的,并且每处理完一个任务,就会将自己放回到workers数组中,这样当下次有向协程池提交任务时,就可以从workers数组中取出可用的worker了;

周期性清理worker

周期性清理worker是一个后台协程,整个操作一个ticker里,ticker的每个周期会做这么几件事:

1. 如果协程池的状态为closed,则跳出ticker
2. 获取workers数组中所有recycleTime已经过期的worker
3. 向这些worker的管道中发送nil,让其退出
4. 如果running状态数为0了,有可能出现一种情况,因为Signal是在worker正常处理完一条数据后才会调用,如果是这种发送nil导致worker协程退出是不会调用Signal的,那么可能还有协程在等待cond的通知信号,但是此时running已经为0,之后没机会再被通知了,程序会死锁,所以在这里会广播Broadcast一个信号给那些阻塞的协程

周期性清理worker的作用主要是有的worker长期未被使用,协程存在会占用资源,因此将其清理掉;

有疑问加站长微信联系(非本文作者)

eUjI7rn.png!mobile

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK