33

Golang多协程并发工作池

 5 years ago
source link: https://studygolang.com/articles/15284?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

本文一个简单的golang工作池,仅供交流和学习使用。golang工作池的作用是可以限制goroutine的启动数量。

//NewTask是放到工作池当中运行的函数。使用的时候需要先实例化他
    w := pool.NewTask(func() error { fmt.Println(time.Now());return nil })
    //实例化工作池
    p := pool.NewPool(3)
    //这里启用另外一个goroutine向worker当中写入,不然会出现all goroutines are asleep,需要从管道中获得一个数据,而这个数据必须是其他goroutine线放入管道的
    go func() {
        for {
            p.Worker <- w //把需要运行的函数依次放入工作池。
        }
    }()
    p.Run()

为什么需要工作池?

一般情况下,goroutine在操作系统上只要你的硬件资源够它是可以无限启动的。但是如果出现大规模的启动goroutine的情况会造成大量占用系统资源,我们知道普通的部署一个golang应用的时候操作系统不仅仅会运行golang程序还有其他辅助的程序运行,所以理论上讲工作池的目的就是为了限制golang的启动数量,保证不会出现硬件计算资源溢出的情况。

实际我们真的需要工作池吗?

理论上来讲,我们其实不需要在golang层面设置工作池的。如果是网络请求大部分时候我们会使用nginx或者其他网关,中间件作为golang程序的代理,我们可以在请求访问流量进入到golang程序之前使用nginx或者其他中间件限制流量或者使用熔断机制来保证我们的golang程序不会开满goroutine造成硬件计算资源溢出的情况。 如果流量小完全没必要限制goroutine 如果流量大没有熔断机制对整个服务都是很危险的。

工作池性能的讨论

我在编写自己的工作池之前使用了很多其他的工作池,我发现工作池本身并不会提升执行效率,反而会拖慢效率,使用工作池程序执行时间大概是原来的2/3,但是有一些pool会在一定程度上节省内存,比如ants,但内存的节省我觉得更多的是在于数据结构的复用,没有大量的重复创建数据对象导致的内存节省。

goroutine是否需要像其他编程语言一样使用I/O多路复用?

根据我在网上可以搜索到的资料,以及我自己的理解,golang本身是有一个pool用来复用goroutine的,所以我们并不需要自己再去实现一个多路复用的功能,反而会拖慢程序。

其他

以上论调并没有严谨的验证过,只是个人遇到的情况分享,希望和大家共同讨论学习,共同进步。

工作池代码如下

package pool

import "sync"

//创建worker,每一个worker抽象成一个可以执行任务的函数
type Worker struct {
    f func() error
}
//通过NewTask来创建一个worker
func NewTask(f func() error)  *Worker {
    return &Worker{
        f:f,
    }
}
//执行worker
func (t *Worker) Run(wg *sync.WaitGroup)  {
    t.f()
    //减少waitGroup计数器的值
    wg.Done()
}
//池
type Pool struct {
    //这个*Worker指针切片用来接受任务,方便外部调用,减少channel异常的问题,这里会整个切片一起提交
    //Workers []*Worker
    //这里的Worker是一个管道,用来接受其他go程带来的数据,实时执行,无限等待数据循环,这里使用另外一个管道还可以隐藏wg的操作。让外部程序使用更方便一些。
    Worker chan *Worker
    //size用来表明池的大小,不能超发。
    size int
    //jobs表示执行任务的通道用于作为队列,我们将任务从切片当中取出来,然后存放到通道当中,再从通道当中取出任务并执行。
    Jobs chan *Worker
    //用于阻塞
    wg sync.WaitGroup
}
//实例化工作池使用
func NewPool(cap int) *Pool {
    return &Pool{
        //Workers:tasks,
        Worker:make(chan *Worker),
        size: cap,
        Jobs:make(chan *Worker),
    }
}
//从jobs当中取出任务并执行。
func (p *Pool) work()  {
    for task := range p.Jobs{
        task.Run(&p.wg)
    }
}
//执行工作池当中的任务
func (p *Pool) Run(){
    //只启动有限大小的协程,协程的数量不可以超过工作池设定的数量,防止计算资源崩溃
    for i:=0;i<p.size;i++{
        go p.work()
    }
    //从worker切片当中把任务取出
    for task := range p.Worker{
        p.wg.Add(1)
        p.Jobs <- task
    }
    //执行完毕就需要关闭jobs
    close(p.Jobs)
    //执行的过程需要阻塞直到有空闲的goroutine可用
    p.wg.Wait()
}

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK