1

go中控制goroutine数量

 3 years ago
source link: https://studygolang.com/articles/34577?fr=sidebar
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

go中控制goroutine数量

小中01 · 大约12小时之前 · 76 次点击 · 预计阅读时间 16 分钟 · 大约8小时之前 开始浏览    

通过channel+sync

var ( // channel长度

poolCount      = 5

// 复用的goroutine数量

goroutineCount = 10)func pool() {

jobsChan := make(chan int, poolCount) // workers

var wg sync.WaitGroup for i := 0; i < goroutineCount; i++ {

wg.Add(1) go func() { defer wg.Done() for item := range jobsChan { // ...

fmt.Println(item)

} // senders

for i := 0; i < 1000; i++ {

jobsChan <- i

} // 关闭channel,上游的goroutine在读完channel的内容,就会通过wg的done退出

close(jobsChan)

wg.Wait()

通过WaitGroup启动指定数量的goroutine,监听channel的通知。发送者推送信息到channel,信息处理完了,关闭channel,等待goroutine依次退出。

使用semaphore

package mainimport ( "context"

"fmt"

"sync"

"time"

"golang.org/x/sync/semaphore")const ( // 同时运行的goroutine上限

Limit = 3

// 信号量的权重

Weight = 1)func main() {

names := []string{ "小白", "小红", "小明", "小李", "小花",

sem := semaphore.NewWeighted(Limit) var w sync.WaitGroup for _, name := range names {

w.Add(1) go func(name string) {

sem.Acquire(context.Background(), Weight) // ... 具体的业务逻辑

fmt.Println(name, "-吃饭了")

time.Sleep(2 * time.Second)

sem.Release(Weight)

w.Done()

}(name)

w.Wait()

fmt.Println("ending--------")

借助于x包中的semaphore,也可以进行goroutine的数量限制。

不过原本go中的协程已经是非常轻量了,对于协程池还是要根据具体的场景分析。

对于小场景使用channel+sync就可以,其他复杂的可以考虑使用第三方的协程池库。

panjf2000/ants

go-playground/pool

Jeffail/tunny

几个开源的线程池的设计

fasthttp中的协程池实现

fasthttp比net/http效率高很多倍的重要原因,就是利用了协程池。来看下大佬的设计思路。

1、按需增长goroutine数量,有一个最大值,同时监听channel,Server会把accept到的connection放入到channel中,这样监听的goroutine就能处理消费。

2、本地维护了一个待使用的channel列表,当本地channel列表拿不到ch,会在sync.pool中取。

3、如果workersCount没达到上限,则从生成一个workerFunc监听workerChan。

4、对于待使用的channel列表,会定期清理掉超过最大空闲时间的workerChan。

看下具体实现

// workerPool通过一组工作池服务传入的连接// 按照FILO(先进后出)的顺序,即最近停止的工作人员将为下一个工作传入的连接。//// 这种方案能够保持cpu的缓存保持高效(理论上)type workerPool struct { // 这个函数用于server的连接

// It must leave c unclosed.

WorkerFunc ServeHandler // 最大的Workers数量

MaxWorkersCount int

LogAllErrors bool

MaxIdleWorkerDuration time.Duration

Logger Logger

lock         sync.Mutex // 当前worker的数量

workersCount int

// worker停止的标识

mustStop     bool

// 等待使用的workerChan

// 可能会被清理

ready []*workerChan // 用来标识start和stop

stopCh chan struct{} // workerChan的缓存池,通过sync.Pool实现

workerChanPool sync.Pool

connState func(net.Conn, ConnState)}// workerChan的结构type workerChan struct {

lastUseTime time.Time

ch          chan net.Conn

Start

func (wp *workerPool) Start() { // 判断是否已经Start过了

if wp.stopCh != nil { panic("BUG: workerPool already started")

} // stopCh塞入值

wp.stopCh = make(chan struct{})

stopCh := wp.stopCh

wp.workerChanPool.New = func() interface{} { // 如果单核cpu则让workerChan阻塞

// 否则,使用非阻塞,workerChan的长度为1

return &workerChan{

ch: make(chan net.Conn, workerChanCap),

} go func() { var scratch []*workerChan for {

wp.clean(&scratch) select { // 接收到退出信号,退出

case <-stopCh: return

default:

time.Sleep(wp.getMaxIdleWorkerDuration())

}// 如果单核cpu则让workerChan阻塞// 否则,使用非阻塞,workerChan的长度为1var workerChanCap = func() int { // 如果GOMAXPROCS=1,workerChan的长度为0,变成一个阻塞的channel

if runtime.GOMAXPROCS(0) == 1 { return 0

} // 如果GOMAXPROCS>1则使用非阻塞的workerChan

return 1}()

梳理下流程:

1、首先判断下stopCh是否为nil,不为nil表示已经started了;

2、初始化wp.stopCh = make(chan struct{}),stopCh是一个标识,用了struct{}不用bool,因为空结构体变量的内存占用大小为0,而bool类型内存占用大小为1,这样可以更加最大化利用我们服务器的内存空间;

3、设置workerChanPool的New函数,然后可以在Get不到东西时,自动创建一个;如果单核cpu则让workerChan阻塞,否则,使用非阻塞,workerChan的长度设置为1;

4、启动一个goroutine,处理clean操作,在接收到退出信号,退出。

func (wp *workerPool) Stop() { // 同start,stop也只能触发一次

if wp.stopCh == nil { panic("BUG: workerPool wasn't started")

} // 关闭stopCh

close(wp.stopCh) // 将stopCh置为nil

wp.stopCh = nil

// 停止所有的等待获取连接的workers

// 正在运行的workers,不需要等待他们退出,他们会在完成connection或mustStop被设置成true退出

wp.lock.Lock()

ready := wp.ready // 循环将ready的workerChan置为nil

for i := range ready {

ready[i].ch <- nil

ready[i] = nil

wp.ready = ready[:0] // 设置mustStop为true

wp.mustStop = true

wp.lock.Unlock()

梳理下流程:

1、判断stop只能被关闭一次;

2、关闭stopCh,设置stopCh为nil;

3、停止所有的等待获取连接的workers,正在运行的workers,不需要等待他们退出,他们会在完成connection或mustStop被设置成true退出。

clean

func (wp *workerPool) clean(scratch *[]*workerChan) {

maxIdleWorkerDuration := wp.getMaxIdleWorkerDuration() // 清理掉最近最少使用的workers如果他们过了maxIdleWorkerDuration时间没有提供服务

criticalTime := time.Now().Add(-maxIdleWorkerDuration)

wp.lock.Lock()

ready := wp.ready

n := len(ready) // 使用二分搜索算法找出最近可以被清除的worker

// 最后使用的workerChan 一定是放回队列尾部的。

l, r, mid := 0, n-1, 0

for l <= r {

mid = (l + r) / 2

if criticalTime.After(wp.ready[mid].lastUseTime) {

l = mid + 1

} else {

r = mid - 1

i := r if i == -1 {

wp.lock.Unlock() return

} // 将ready中i之前的的全部清除

*scratch = append((*scratch)[:0], ready[:i+1]...)

m := copy(ready, ready[i+1:]) for i = m; i < n; i++ {

ready[i] = nil

wp.ready = ready[:m]

wp.lock.Unlock() // 通知淘汰的workers停止

// 此通知必须位于wp.lock之外,因为ch.ch

// 如果有很多workers,可能会阻塞并且可能会花费大量时间

// 位于非本地CPU上。

tmp := *scratch for i := range tmp {

tmp[i].ch <- nil

tmp[i] = nil

主要是清理掉最近最少使用的workers如果他们过了maxIdleWorkerDuration时间没有提供服务

getCh

获取一个workerChan

func (wp *workerPool) getCh() *workerChan { var ch *workerChan

createWorker := false

wp.lock.Lock()

ready := wp.ready

n := len(ready) - 1

// 如果ready为空

if n < 0 { if wp.workersCount < wp.MaxWorkersCount {

createWorker = true

wp.workersCount++

} else { // 不为空从ready中取一个

ch = ready[n]

ready[n] = nil

wp.ready = ready[:n]

wp.lock.Unlock() // 如果没拿到ch

if ch == nil { if !createWorker { return nil

} // 从缓存中获取一个ch

vch := wp.workerChanPool.Get()

ch = vch.(*workerChan) go func() { // 具体的执行函数

wp.workerFunc(ch) // 再放入到pool中

wp.workerChanPool.Put(vch)

} return ch

梳理下流程:

1、获取一个可执行的workerChan,如果ready中为空,并且workersCount没有达到最大值,增加workersCount数量,并且设置当前操作createWorker = true;

2、ready中不为空,直接在ready获取一个;

3、如果没有获取到则在sync.pool中获取一个,之后再放回到pool中;

4、拿到了就启动一个workerFunc监听workerChan,处理具体的业务逻辑。

workerFunc

func (wp *workerPool) workerFunc(ch *workerChan) { var c net.Conn var err error // 监听workerChan

for c = range ch.ch { if c == nil { break

} // 具体的业务逻辑

c = nil

// 释放workerChan

// 在mustStop的时候将会跳出循环

if !wp.release(ch) { break

wp.lock.Lock()

wp.workersCount--

wp.lock.Unlock()

}// 把Conn放入到channel中func (wp *workerPool) Serve(c net.Conn) bool {

ch := wp.getCh() if ch == nil { return false

ch.ch <- c return true}func (wp *workerPool) release(ch *workerChan) bool { // 修改 ch.lastUseTime

ch.lastUseTime = time.Now()

wp.lock.Lock() // 如果需要停止,直接返回

if wp.mustStop {

wp.lock.Unlock() return false

} // 将ch放到ready中

wp.ready = append(wp.ready, ch)

wp.lock.Unlock() return true}

梳理下流程:

1、workerFunc会监听workerChan,并且在使用完workerChan归还到ready中;

2、Serve会把connection放入到workerChan中,这样workerFunc就能通过workerChan拿到需要处理的连接请求;

3、当workerFunc拿到的workerChan为nil或wp.mustStop被设为了true,就跳出for循环。

panjf2000/ants

先看下示例

package mainimport ( "fmt"

"sync"

"sync/atomic"

"time"

"github.com/panjf2000/ants")func demoFunc() {

time.Sleep(10 * time.Millisecond)

fmt.Println("Hello World!")

}func main() { defer ants.Release()

runTimes := 1000

var wg sync.WaitGroup

syncCalculateSum := func() {

demoFunc()

wg.Done()

} for i := 0; i < runTimes; i++ {

wg.Add(1)

_ = ants.Submit(syncCalculateSum)

wg.Wait()

fmt.Printf("running goroutines: %d\n", ants.Running())

fmt.Printf("finish all tasks.\n")

package mainimport ( "fmt"

"sync"

"sync/atomic"

"time"

"github.com/panjf2000/ants")var sum int32func myFunc(i interface{}) {

n := i.(int32)

atomic.AddInt32(&sum, n)

fmt.Printf("run with %d\n", n)

}func main() { var wg sync.WaitGroup

runTimes := 1000

// Use the pool with a method,

// set 10 to the capacity of goroutine pool and 1 second for expired duration.

p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {

myFunc(i)

wg.Done()

}) defer p.Release() // Submit tasks one by one.

for i := 0; i < runTimes; i++ {

wg.Add(1)

_ = p.Invoke(int32(i))

wg.Wait()

fmt.Printf("running goroutines: %d\n", p.Running())

fmt.Printf("finish all tasks, result is %d\n", sum) if sum != 499500 { panic("the final result is wrong!!!")

整体的设计思路

梳理下思路:

1、先初始化缓存池的大小,然后处理任务事件的时候,一个task分配一个goWorker;

2、在拿goWorker的过程中会存在下面集中情况;

本地的缓存中有空闲的goWorker,直接取出;

本地缓存没有就去sync.Pool,拿一个goWorker;

3、如果缓存池满了,非阻塞模式直接返回nil,阻塞模式就循环去拿直到成功拿出一个;

4、同时也会定期清理掉过期的goWorker,通过sync.Cond唤醒其的阻塞等待;

5、对于使用完成的goWorker在使用完成之后重新归还到pool。

具体的设计细节可参考,作者的文章Goroutine 并发调度模型深度解析之手撸一个高性能 goroutine 池

go-playground/pool

go-playground/pool会在一开始就启动

先放几个使用的demo

Per Unit Work

package mainimport ( "fmt"

"time"

"gopkg.in/go-playground/pool.v3")func main() {

p := pool.NewLimited(10) defer p.Close()

user := p.Queue(getUser(13))

other := p.Queue(getOtherInfo(13))

user.Wait() if err := user.Error(); err != nil { // handle error

} // do stuff with user

username := user.Value().(string)

fmt.Println(username)

other.Wait() if err := other.Error(); err != nil { // handle error

} // do stuff with other

otherInfo := other.Value().(string)

fmt.Println(otherInfo)

}func getUser(id int) pool.WorkFunc { return func(wu pool.WorkUnit) (interface{}, error) { // simulate waiting for something, like TCP connection to be established

// or connection from pool grabbed

time.Sleep(time.Second * 1) if wu.IsCancelled() { // return values not used

return nil, nil

} // ready for processing...

return "Joeybloggs", nil

}func getOtherInfo(id int) pool.WorkFunc { return func(wu pool.WorkUnit) (interface{}, error) { // simulate waiting for something, like TCP connection to be established

// or connection from pool grabbed

time.Sleep(time.Second * 1) if wu.IsCancelled() { // return values not used

return nil, nil

} // ready for processing...

return "Other Info", nil

Batch Work

package mainimport ( "fmt"

"time"

"gopkg.in/go-playground/pool.v3")func main() {

p := pool.NewLimited(10) defer p.Close()

batch := p.Batch() // for max speed Queue in another goroutine

// but it is not required, just can't start reading results

// until all items are Queued.

go func() { for i := 0; i < 10; i++ {

batch.Queue(sendEmail("email content"))

} // DO NOT FORGET THIS OR GOROUTINES WILL DEADLOCK

// if calling Cancel() it calles QueueComplete() internally

batch.QueueComplete()

}() for email := range batch.Results() { if err := email.Error(); err != nil { // handle error

// maybe call batch.Cancel()

} // use return value

fmt.Println(email.Value().(bool))

}func sendEmail(email string) pool.WorkFunc { return func(wu pool.WorkUnit) (interface{}, error) { // simulate waiting for something, like TCP connection to be established

// or connection from pool grabbed

time.Sleep(time.Second * 1) if wu.IsCancelled() { // return values not used

return nil, nil

} // ready for processing...

return true, nil // everything ok, send nil, error if not

来看下实现

workUnit

workUnit作为channel信息进行传递,用来给work传递当前需要执行的任务信息。

// WorkUnit contains a single uint of works valuestype WorkUnit interface { // 阻塞直到当前任务被完成或被取消

Wait() // 执行函数返回的结果

Value() interface{} // Error returns the Work Unit's error

Error() error // 取消当前的可执行任务

Cancel() // 判断当前的可执行单元是否被取消了

IsCancelled() bool}var _ WorkUnit = new(workUnit)// workUnit contains a single unit of works valuestype workUnit struct { // 任务执行的结果

value      interface{} // 错误信息

err        error // 通知任务完成

done       chan struct{} // 需要执行的任务函数

fn         WorkFunc // 任务是会否被取消

cancelled  atomic.Value // 是否正在取消任务

cancelling atomic.Value // 任务是否正在执行

writing    atomic.Value

limitedPool

var _ Pool = new(limitedPool)// limitedPool contains all information for a limited pool instance.type limitedPool struct { // 并发量

workers uint

// work的channel

work    chan *workUnit // 通知结束的channel

cancel  chan struct{} // 是否关闭的标识

closed  bool

// 读写锁

m       sync.RWMutex

}// 初始化一个poolfunc NewLimited(workers uint) Pool { if workers == 0 { panic("invalid workers '0'")

} // 初始化pool的work数量

p := &limitedPool{

workers: workers,

} // 初始化pool的操作

p.initialize() return p

}func (p *limitedPool) initialize() { // channel的长度为work数量的两倍

p.work = make(chan *workUnit, p.workers*2)

p.cancel = make(chan struct{})

p.closed = false

// fire up workers here

for i := 0; i < int(p.workers); i++ {

p.newWorker(p.work, p.cancel)

}// 将工作传递并取消频道到newWorker()以避免任何潜在的竞争状况// 在p.work读写之间func (p *limitedPool) newWorker(work chan *workUnit, cancel chan struct{}) { go func(p *limitedPool) { var wu *workUnit defer func(p *limitedPool) { // 捕获异常,结束掉异常的工作单元,并将其再次作为新的任务启动

if err := recover(); err != nil {

trace := make([]byte, 1<<16)

n := runtime.Stack(trace, true)

s := fmt.Sprintf(errRecovery, err, string(trace[:int(math.Min(float64(n), float64(7000)))]))

iwu := wu

iwu.err = &ErrRecovery{s: s} close(iwu.done) // 重新启动

p.newWorker(p.work, p.cancel)

}(p) var value interface{} var err error // 监听channel,读取内容

for { select { // channel中取出数据

case wu = <-work: // 防止channel 被关闭后读取到零值

if wu == nil { continue

} // 单个和批量的cancellation这个都支持

if wu.cancelled.Load() == nil { // 执行我们的业务函数

value, err = wu.fn(wu)

wu.writing.Store(struct{}{}) // 如果WorkFunc取消了此工作单元,则需要再次检查

// 防止产生竞争条件

if wu.cancelled.Load() == nil && wu.cancelling.Load() == nil {

wu.value, wu.err = value, err // 执行完成,关闭当前channel

close(wu.done)

} // 如果取消了,就退出

case <-cancel: return

}// 放置一个执行的task到channel,并返回channelfunc (p *limitedPool) Queue(fn WorkFunc) WorkUnit { // 初始化一个workUnit类型的channel

w := &workUnit{

done: make(chan struct{}), // 具体的执行函数

fn:   fn,

} go func() {

p.m.RLock() // 如果pool关闭的时候通知channel关闭

if p.closed {

w.err = &ErrPoolClosed{s: errClosed} if w.cancelled.Load() == nil { close(w.done)

p.m.RUnlock() return

} // 将channel传递给pool的work

p.work <- w

p.m.RUnlock()

}() return w

梳理下流程:

1、首先初始化pool的大小;

2、然后根据pool的大小启动对应数量的worker,阻塞等待channel被塞入可执行函数;

3、然后可执行函数会被放入workUnit,然后通过channel传递给阻塞的worker。

同样这里也提供了批量执行的方法

batch

// batch contains all information for a batch run of WorkUnitstype batch struct {

pool    Pool

m       sync.Mutex // WorkUnit的切片

units   []WorkUnit // 结果集,执行完后的workUnit会更新其value,error,可以从结果集channel中读取

results chan WorkUnit // 通知batch是否完成

done    chan struct{}

closed  bool

wg      *sync.WaitGroup

}// 初始化Batchfunc newBatch(p Pool) Batch { return &batch{

pool:    p,

units:   make([]WorkUnit, 0, 4),

results: make(chan WorkUnit),

done:    make(chan struct{}),

wg:      new(sync.WaitGroup),

}// 将WorkFunc放入到WorkUnit中并保留取消和输出结果的参考。func (b *batch) Queue(fn WorkFunc) {

b.m.Lock() if b.closed {

b.m.Unlock() return

} // 返回一个WorkUnit

wu := b.pool.Queue(fn) // 放到WorkUnit的切片中

b.units = append(b.units, wu) // 通过waitgroup进行goroutine的执行控制

b.wg.Add(1)

b.m.Unlock() // 执行任务

go func(b *batch, wu WorkUnit) {

wu.Wait() // 将执行的结果写入到results中

b.results <- wu

b.wg.Done()

}(b, wu)

}// QueueComplete让批处理知道不再有排队的工作单元// 以便在所有工作完成后可以关闭结果渠道。// 警告:如果未调用此函数,则结果通道将永远不会耗尽,// 但会永远阻止以获取更多结果。func (b *batch) QueueComplete() {

b.m.Lock()

b.closed = true

close(b.done)

b.m.Unlock()

}// 取消批次的任务func (b *batch) Cancel() {

b.QueueComplete()

b.m.Lock() // 一个个取消units,倒叙的取消

for i := len(b.units) - 1; i >= 0; i-- {

b.units[i].Cancel()

b.m.Unlock()

}// 输出执行完成的结果集func (b *batch) Results() <-chan WorkUnit { // 启动一个协程监听完成的通知

// waitgroup阻塞直到所有的worker都完成退出

// 最后关闭channel

go func(b *batch) {

<-b.done

b.m.Lock() // 阻塞直到上面waitgroup中的goroutine一个个执行完成退出

b.wg.Wait()

b.m.Unlock() // 关闭channel

close(b.results)

}(b) return b.results

USB Microphone  https://www.soft-voice.com/

Wooden Speakers  https://www.zeshuiplatform.com/

亚马逊测评 www.yisuping.cn

深圳网站建设www.sz886.com


有疑问加站长微信联系(非本文作者)

280

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK