go中控制goroutine数量
source link: https://studygolang.com/articles/34577?fr=sidebar
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
go中控制goroutine数量
小中01 · 大约12小时之前 · 76 次点击 · 预计阅读时间 16 分钟 · 大约8小时之前 开始浏览通过channel+sync
var ( // channel长度
poolCount = 5
// 复用的goroutine数量
goroutineCount = 10)func pool() {
jobsChan := make(chan int, poolCount) // workers
var wg sync.WaitGroup for i := 0; i < goroutineCount; i++ {
wg.Add(1) go func() { defer wg.Done() for item := range jobsChan { // ...
fmt.Println(item)
} // senders
for i := 0; i < 1000; i++ {
jobsChan <- i
} // 关闭channel,上游的goroutine在读完channel的内容,就会通过wg的done退出
close(jobsChan)
wg.Wait()
通过WaitGroup启动指定数量的goroutine,监听channel的通知。发送者推送信息到channel,信息处理完了,关闭channel,等待goroutine依次退出。
使用semaphore
package mainimport ( "context"
"fmt"
"sync"
"time"
"golang.org/x/sync/semaphore")const ( // 同时运行的goroutine上限
Limit = 3
// 信号量的权重
Weight = 1)func main() {
names := []string{ "小白", "小红", "小明", "小李", "小花",
sem := semaphore.NewWeighted(Limit) var w sync.WaitGroup for _, name := range names {
w.Add(1) go func(name string) {
sem.Acquire(context.Background(), Weight) // ... 具体的业务逻辑
fmt.Println(name, "-吃饭了")
time.Sleep(2 * time.Second)
sem.Release(Weight)
w.Done()
}(name)
w.Wait()
fmt.Println("ending--------")
借助于x包中的semaphore,也可以进行goroutine的数量限制。
不过原本go中的协程已经是非常轻量了,对于协程池还是要根据具体的场景分析。
对于小场景使用channel+sync就可以,其他复杂的可以考虑使用第三方的协程池库。
panjf2000/ants
go-playground/pool
Jeffail/tunny
几个开源的线程池的设计
fasthttp中的协程池实现
fasthttp比net/http效率高很多倍的重要原因,就是利用了协程池。来看下大佬的设计思路。
1、按需增长goroutine数量,有一个最大值,同时监听channel,Server会把accept到的connection放入到channel中,这样监听的goroutine就能处理消费。
2、本地维护了一个待使用的channel列表,当本地channel列表拿不到ch,会在sync.pool中取。
3、如果workersCount没达到上限,则从生成一个workerFunc监听workerChan。
4、对于待使用的channel列表,会定期清理掉超过最大空闲时间的workerChan。
看下具体实现
// workerPool通过一组工作池服务传入的连接// 按照FILO(先进后出)的顺序,即最近停止的工作人员将为下一个工作传入的连接。//// 这种方案能够保持cpu的缓存保持高效(理论上)type workerPool struct { // 这个函数用于server的连接
// It must leave c unclosed.
WorkerFunc ServeHandler // 最大的Workers数量
MaxWorkersCount int
LogAllErrors bool
MaxIdleWorkerDuration time.Duration
Logger Logger
lock sync.Mutex // 当前worker的数量
workersCount int
// worker停止的标识
mustStop bool
// 等待使用的workerChan
// 可能会被清理
ready []*workerChan // 用来标识start和stop
stopCh chan struct{} // workerChan的缓存池,通过sync.Pool实现
workerChanPool sync.Pool
connState func(net.Conn, ConnState)}// workerChan的结构type workerChan struct {
lastUseTime time.Time
ch chan net.Conn
Start
func (wp *workerPool) Start() { // 判断是否已经Start过了
if wp.stopCh != nil { panic("BUG: workerPool already started")
} // stopCh塞入值
wp.stopCh = make(chan struct{})
stopCh := wp.stopCh
wp.workerChanPool.New = func() interface{} { // 如果单核cpu则让workerChan阻塞
// 否则,使用非阻塞,workerChan的长度为1
return &workerChan{
ch: make(chan net.Conn, workerChanCap),
} go func() { var scratch []*workerChan for {
wp.clean(&scratch) select { // 接收到退出信号,退出
case <-stopCh: return
default:
time.Sleep(wp.getMaxIdleWorkerDuration())
}// 如果单核cpu则让workerChan阻塞// 否则,使用非阻塞,workerChan的长度为1var workerChanCap = func() int { // 如果GOMAXPROCS=1,workerChan的长度为0,变成一个阻塞的channel
if runtime.GOMAXPROCS(0) == 1 { return 0
} // 如果GOMAXPROCS>1则使用非阻塞的workerChan
return 1}()
梳理下流程:
1、首先判断下stopCh是否为nil,不为nil表示已经started了;
2、初始化wp.stopCh = make(chan struct{}),stopCh是一个标识,用了struct{}不用bool,因为空结构体变量的内存占用大小为0,而bool类型内存占用大小为1,这样可以更加最大化利用我们服务器的内存空间;
3、设置workerChanPool的New函数,然后可以在Get不到东西时,自动创建一个;如果单核cpu则让workerChan阻塞,否则,使用非阻塞,workerChan的长度设置为1;
4、启动一个goroutine,处理clean操作,在接收到退出信号,退出。
func (wp *workerPool) Stop() { // 同start,stop也只能触发一次
if wp.stopCh == nil { panic("BUG: workerPool wasn't started")
} // 关闭stopCh
close(wp.stopCh) // 将stopCh置为nil
wp.stopCh = nil
// 停止所有的等待获取连接的workers
// 正在运行的workers,不需要等待他们退出,他们会在完成connection或mustStop被设置成true退出
wp.lock.Lock()
ready := wp.ready // 循环将ready的workerChan置为nil
for i := range ready {
ready[i].ch <- nil
ready[i] = nil
wp.ready = ready[:0] // 设置mustStop为true
wp.mustStop = true
wp.lock.Unlock()
梳理下流程:
1、判断stop只能被关闭一次;
2、关闭stopCh,设置stopCh为nil;
3、停止所有的等待获取连接的workers,正在运行的workers,不需要等待他们退出,他们会在完成connection或mustStop被设置成true退出。
clean
func (wp *workerPool) clean(scratch *[]*workerChan) {
maxIdleWorkerDuration := wp.getMaxIdleWorkerDuration() // 清理掉最近最少使用的workers如果他们过了maxIdleWorkerDuration时间没有提供服务
criticalTime := time.Now().Add(-maxIdleWorkerDuration)
wp.lock.Lock()
ready := wp.ready
n := len(ready) // 使用二分搜索算法找出最近可以被清除的worker
// 最后使用的workerChan 一定是放回队列尾部的。
l, r, mid := 0, n-1, 0
for l <= r {
mid = (l + r) / 2
if criticalTime.After(wp.ready[mid].lastUseTime) {
l = mid + 1
} else {
r = mid - 1
i := r if i == -1 {
wp.lock.Unlock() return
} // 将ready中i之前的的全部清除
*scratch = append((*scratch)[:0], ready[:i+1]...)
m := copy(ready, ready[i+1:]) for i = m; i < n; i++ {
ready[i] = nil
wp.ready = ready[:m]
wp.lock.Unlock() // 通知淘汰的workers停止
// 此通知必须位于wp.lock之外,因为ch.ch
// 如果有很多workers,可能会阻塞并且可能会花费大量时间
// 位于非本地CPU上。
tmp := *scratch for i := range tmp {
tmp[i].ch <- nil
tmp[i] = nil
主要是清理掉最近最少使用的workers如果他们过了maxIdleWorkerDuration时间没有提供服务
getCh
获取一个workerChan
func (wp *workerPool) getCh() *workerChan { var ch *workerChan
createWorker := false
wp.lock.Lock()
ready := wp.ready
n := len(ready) - 1
// 如果ready为空
if n < 0 { if wp.workersCount < wp.MaxWorkersCount {
createWorker = true
wp.workersCount++
} else { // 不为空从ready中取一个
ch = ready[n]
ready[n] = nil
wp.ready = ready[:n]
wp.lock.Unlock() // 如果没拿到ch
if ch == nil { if !createWorker { return nil
} // 从缓存中获取一个ch
vch := wp.workerChanPool.Get()
ch = vch.(*workerChan) go func() { // 具体的执行函数
wp.workerFunc(ch) // 再放入到pool中
wp.workerChanPool.Put(vch)
} return ch
梳理下流程:
1、获取一个可执行的workerChan,如果ready中为空,并且workersCount没有达到最大值,增加workersCount数量,并且设置当前操作createWorker = true;
2、ready中不为空,直接在ready获取一个;
3、如果没有获取到则在sync.pool中获取一个,之后再放回到pool中;
4、拿到了就启动一个workerFunc监听workerChan,处理具体的业务逻辑。
workerFunc
func (wp *workerPool) workerFunc(ch *workerChan) { var c net.Conn var err error // 监听workerChan
for c = range ch.ch { if c == nil { break
} // 具体的业务逻辑
c = nil
// 释放workerChan
// 在mustStop的时候将会跳出循环
if !wp.release(ch) { break
wp.lock.Lock()
wp.workersCount--
wp.lock.Unlock()
}// 把Conn放入到channel中func (wp *workerPool) Serve(c net.Conn) bool {
ch := wp.getCh() if ch == nil { return false
ch.ch <- c return true}func (wp *workerPool) release(ch *workerChan) bool { // 修改 ch.lastUseTime
ch.lastUseTime = time.Now()
wp.lock.Lock() // 如果需要停止,直接返回
if wp.mustStop {
wp.lock.Unlock() return false
} // 将ch放到ready中
wp.ready = append(wp.ready, ch)
wp.lock.Unlock() return true}
梳理下流程:
1、workerFunc会监听workerChan,并且在使用完workerChan归还到ready中;
2、Serve会把connection放入到workerChan中,这样workerFunc就能通过workerChan拿到需要处理的连接请求;
3、当workerFunc拿到的workerChan为nil或wp.mustStop被设为了true,就跳出for循环。
panjf2000/ants
先看下示例
package mainimport ( "fmt"
"sync"
"sync/atomic"
"time"
"github.com/panjf2000/ants")func demoFunc() {
time.Sleep(10 * time.Millisecond)
fmt.Println("Hello World!")
}func main() { defer ants.Release()
runTimes := 1000
var wg sync.WaitGroup
syncCalculateSum := func() {
demoFunc()
wg.Done()
} for i := 0; i < runTimes; i++ {
wg.Add(1)
_ = ants.Submit(syncCalculateSum)
wg.Wait()
fmt.Printf("running goroutines: %d\n", ants.Running())
fmt.Printf("finish all tasks.\n")
package mainimport ( "fmt"
"sync"
"sync/atomic"
"time"
"github.com/panjf2000/ants")var sum int32func myFunc(i interface{}) {
n := i.(int32)
atomic.AddInt32(&sum, n)
fmt.Printf("run with %d\n", n)
}func main() { var wg sync.WaitGroup
runTimes := 1000
// Use the pool with a method,
// set 10 to the capacity of goroutine pool and 1 second for expired duration.
p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {
myFunc(i)
wg.Done()
}) defer p.Release() // Submit tasks one by one.
for i := 0; i < runTimes; i++ {
wg.Add(1)
_ = p.Invoke(int32(i))
wg.Wait()
fmt.Printf("running goroutines: %d\n", p.Running())
fmt.Printf("finish all tasks, result is %d\n", sum) if sum != 499500 { panic("the final result is wrong!!!")
整体的设计思路
梳理下思路:
1、先初始化缓存池的大小,然后处理任务事件的时候,一个task分配一个goWorker;
2、在拿goWorker的过程中会存在下面集中情况;
本地的缓存中有空闲的goWorker,直接取出;
本地缓存没有就去sync.Pool,拿一个goWorker;
3、如果缓存池满了,非阻塞模式直接返回nil,阻塞模式就循环去拿直到成功拿出一个;
4、同时也会定期清理掉过期的goWorker,通过sync.Cond唤醒其的阻塞等待;
5、对于使用完成的goWorker在使用完成之后重新归还到pool。
具体的设计细节可参考,作者的文章Goroutine 并发调度模型深度解析之手撸一个高性能 goroutine 池
go-playground/pool
go-playground/pool会在一开始就启动
先放几个使用的demo
Per Unit Work
package mainimport ( "fmt"
"time"
"gopkg.in/go-playground/pool.v3")func main() {
p := pool.NewLimited(10) defer p.Close()
user := p.Queue(getUser(13))
other := p.Queue(getOtherInfo(13))
user.Wait() if err := user.Error(); err != nil { // handle error
} // do stuff with user
username := user.Value().(string)
fmt.Println(username)
other.Wait() if err := other.Error(); err != nil { // handle error
} // do stuff with other
otherInfo := other.Value().(string)
fmt.Println(otherInfo)
}func getUser(id int) pool.WorkFunc { return func(wu pool.WorkUnit) (interface{}, error) { // simulate waiting for something, like TCP connection to be established
// or connection from pool grabbed
time.Sleep(time.Second * 1) if wu.IsCancelled() { // return values not used
return nil, nil
} // ready for processing...
return "Joeybloggs", nil
}func getOtherInfo(id int) pool.WorkFunc { return func(wu pool.WorkUnit) (interface{}, error) { // simulate waiting for something, like TCP connection to be established
// or connection from pool grabbed
time.Sleep(time.Second * 1) if wu.IsCancelled() { // return values not used
return nil, nil
} // ready for processing...
return "Other Info", nil
Batch Work
package mainimport ( "fmt"
"time"
"gopkg.in/go-playground/pool.v3")func main() {
p := pool.NewLimited(10) defer p.Close()
batch := p.Batch() // for max speed Queue in another goroutine
// but it is not required, just can't start reading results
// until all items are Queued.
go func() { for i := 0; i < 10; i++ {
batch.Queue(sendEmail("email content"))
} // DO NOT FORGET THIS OR GOROUTINES WILL DEADLOCK
// if calling Cancel() it calles QueueComplete() internally
batch.QueueComplete()
}() for email := range batch.Results() { if err := email.Error(); err != nil { // handle error
// maybe call batch.Cancel()
} // use return value
fmt.Println(email.Value().(bool))
}func sendEmail(email string) pool.WorkFunc { return func(wu pool.WorkUnit) (interface{}, error) { // simulate waiting for something, like TCP connection to be established
// or connection from pool grabbed
time.Sleep(time.Second * 1) if wu.IsCancelled() { // return values not used
return nil, nil
} // ready for processing...
return true, nil // everything ok, send nil, error if not
来看下实现
workUnit
workUnit作为channel信息进行传递,用来给work传递当前需要执行的任务信息。
// WorkUnit contains a single uint of works valuestype WorkUnit interface { // 阻塞直到当前任务被完成或被取消
Wait() // 执行函数返回的结果
Value() interface{} // Error returns the Work Unit's error
Error() error // 取消当前的可执行任务
Cancel() // 判断当前的可执行单元是否被取消了
IsCancelled() bool}var _ WorkUnit = new(workUnit)// workUnit contains a single unit of works valuestype workUnit struct { // 任务执行的结果
value interface{} // 错误信息
err error // 通知任务完成
done chan struct{} // 需要执行的任务函数
fn WorkFunc // 任务是会否被取消
cancelled atomic.Value // 是否正在取消任务
cancelling atomic.Value // 任务是否正在执行
writing atomic.Value
limitedPool
var _ Pool = new(limitedPool)// limitedPool contains all information for a limited pool instance.type limitedPool struct { // 并发量
workers uint
// work的channel
work chan *workUnit // 通知结束的channel
cancel chan struct{} // 是否关闭的标识
closed bool
// 读写锁
m sync.RWMutex
}// 初始化一个poolfunc NewLimited(workers uint) Pool { if workers == 0 { panic("invalid workers '0'")
} // 初始化pool的work数量
p := &limitedPool{
workers: workers,
} // 初始化pool的操作
p.initialize() return p
}func (p *limitedPool) initialize() { // channel的长度为work数量的两倍
p.work = make(chan *workUnit, p.workers*2)
p.cancel = make(chan struct{})
p.closed = false
// fire up workers here
for i := 0; i < int(p.workers); i++ {
p.newWorker(p.work, p.cancel)
}// 将工作传递并取消频道到newWorker()以避免任何潜在的竞争状况// 在p.work读写之间func (p *limitedPool) newWorker(work chan *workUnit, cancel chan struct{}) { go func(p *limitedPool) { var wu *workUnit defer func(p *limitedPool) { // 捕获异常,结束掉异常的工作单元,并将其再次作为新的任务启动
if err := recover(); err != nil {
trace := make([]byte, 1<<16)
n := runtime.Stack(trace, true)
s := fmt.Sprintf(errRecovery, err, string(trace[:int(math.Min(float64(n), float64(7000)))]))
iwu := wu
iwu.err = &ErrRecovery{s: s} close(iwu.done) // 重新启动
p.newWorker(p.work, p.cancel)
}(p) var value interface{} var err error // 监听channel,读取内容
for { select { // channel中取出数据
case wu = <-work: // 防止channel 被关闭后读取到零值
if wu == nil { continue
} // 单个和批量的cancellation这个都支持
if wu.cancelled.Load() == nil { // 执行我们的业务函数
value, err = wu.fn(wu)
wu.writing.Store(struct{}{}) // 如果WorkFunc取消了此工作单元,则需要再次检查
// 防止产生竞争条件
if wu.cancelled.Load() == nil && wu.cancelling.Load() == nil {
wu.value, wu.err = value, err // 执行完成,关闭当前channel
close(wu.done)
} // 如果取消了,就退出
case <-cancel: return
}// 放置一个执行的task到channel,并返回channelfunc (p *limitedPool) Queue(fn WorkFunc) WorkUnit { // 初始化一个workUnit类型的channel
w := &workUnit{
done: make(chan struct{}), // 具体的执行函数
fn: fn,
} go func() {
p.m.RLock() // 如果pool关闭的时候通知channel关闭
if p.closed {
w.err = &ErrPoolClosed{s: errClosed} if w.cancelled.Load() == nil { close(w.done)
p.m.RUnlock() return
} // 将channel传递给pool的work
p.work <- w
p.m.RUnlock()
}() return w
梳理下流程:
1、首先初始化pool的大小;
2、然后根据pool的大小启动对应数量的worker,阻塞等待channel被塞入可执行函数;
3、然后可执行函数会被放入workUnit,然后通过channel传递给阻塞的worker。
同样这里也提供了批量执行的方法
batch
// batch contains all information for a batch run of WorkUnitstype batch struct {
pool Pool
m sync.Mutex // WorkUnit的切片
units []WorkUnit // 结果集,执行完后的workUnit会更新其value,error,可以从结果集channel中读取
results chan WorkUnit // 通知batch是否完成
done chan struct{}
closed bool
wg *sync.WaitGroup
}// 初始化Batchfunc newBatch(p Pool) Batch { return &batch{
pool: p,
units: make([]WorkUnit, 0, 4),
results: make(chan WorkUnit),
done: make(chan struct{}),
wg: new(sync.WaitGroup),
}// 将WorkFunc放入到WorkUnit中并保留取消和输出结果的参考。func (b *batch) Queue(fn WorkFunc) {
b.m.Lock() if b.closed {
b.m.Unlock() return
} // 返回一个WorkUnit
wu := b.pool.Queue(fn) // 放到WorkUnit的切片中
b.units = append(b.units, wu) // 通过waitgroup进行goroutine的执行控制
b.wg.Add(1)
b.m.Unlock() // 执行任务
go func(b *batch, wu WorkUnit) {
wu.Wait() // 将执行的结果写入到results中
b.results <- wu
b.wg.Done()
}(b, wu)
}// QueueComplete让批处理知道不再有排队的工作单元// 以便在所有工作完成后可以关闭结果渠道。// 警告:如果未调用此函数,则结果通道将永远不会耗尽,// 但会永远阻止以获取更多结果。func (b *batch) QueueComplete() {
b.m.Lock()
b.closed = true
close(b.done)
b.m.Unlock()
}// 取消批次的任务func (b *batch) Cancel() {
b.QueueComplete()
b.m.Lock() // 一个个取消units,倒叙的取消
for i := len(b.units) - 1; i >= 0; i-- {
b.units[i].Cancel()
b.m.Unlock()
}// 输出执行完成的结果集func (b *batch) Results() <-chan WorkUnit { // 启动一个协程监听完成的通知
// waitgroup阻塞直到所有的worker都完成退出
// 最后关闭channel
go func(b *batch) {
<-b.done
b.m.Lock() // 阻塞直到上面waitgroup中的goroutine一个个执行完成退出
b.wg.Wait()
b.m.Unlock() // 关闭channel
close(b.results)
}(b) return b.results
USB Microphone https://www.soft-voice.com/
Wooden Speakers https://www.zeshuiplatform.com/
亚马逊测评 www.yisuping.cn
深圳网站建设www.sz886.com
有疑问加站长微信联系(非本文作者)
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK