26

第八章 并发

 5 years ago
source link: https://studygolang.com/articles/13696?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

1. 并发和并行的区别

并行:多核cpu在同一时间片内并行处理多个任务。

并发:如单核cpu在多个任务间进行时间片切换,并非同一时间片执行多个任务,只是上下文切换时间很短,看似多个任务并行。

多线程和多线程是并行的基本前提条件,单线程也可用协程做到并发。

在golang中是通过goroutine来实现并发的,goroutine并不能简单的归纳为协程,其运行时会创建多个线程来实现并发任务,且任务单元可被调度到其他线程并行执行。所以goroutine更像是多线程和协程的综合体,能最大限度提升执行效率,发挥多核处理能力。

goroutine

关键字go并非执行并发操作,而是创建一个并发任务单元。新建任务被放置在系统队列中,等待调度器安排合适的系统线程去获取执行权。

当前流程不会阻塞,不会等待该任务启动,且运行时也不保证并发任务的执行顺序。

每个任务单元除保存函数指针、调用参数外,还会分配执行所需的栈内存空间。相比系统默认MB级别的线程栈,goroutine自定义栈仅需2KB,所以才能创建成千上万的并发任务。自定义栈采取按需分配策略,在需要时仅需扩容,最大能到GB规模。

与defer一样,goroutine也会因延迟执行而立即计算并复制执行参数。

var c int
func counter()int{
    c++
    return c
}
func main() {
    a:=100
    go func(x,y int) {
        time.Sleep(time.Second)
        fmt.Println("go:",x,y)
    }(a,counter())
    a+=100
    fmt.Println("main:",a,counter())
    time.Sleep(time.Second*3)
}

输出:

main: 200 2
go: 100 1

wait

进程退出时不会等待并发任务结束,可用channel阻塞,然后发出退出信号。

func main() {
    exit:=make(chan interface{})  //创建通道。因为仅是通知,此处channel可为任何类型。
    go func() {
        time.Sleep(time.Second)
        fmt.Println("goroutine done")
        close(exit)                      //关闭通道,发出信号。
    }()

    fmt.Println("main...")
    <-exit                             //通道关闭则立即解除。
    fmt.Println("main exit")
}

输出:

main...
goroutine done
main exit

除了关闭通道外,向通道内写入数据也可解除阻塞。channel的更多信息,后面再做详述。

如要等待多个任务结束,推荐使用sync.WaitGruop。通过设定计数器,让每个goroutine在退出前递减,直至归零时解除阻塞。

func main() {
    var wg sync.WaitGroup

    for i:=0;i<10;i++{
        wg.Add(1)         //累加计数
        go func(id int) {
            defer wg.Done()        //递减计数
            time.Sleep(time.Second)
            fmt.Println("goroutine",id,"done")

        }(i)
    }

    fmt.Println("main...")
    wg.Wait()                   //阻塞,直到计数归零
    fmt.Println("main exit")
}

尽管WaitGroup.Add实现了原子操作,但建议在goroutine外累加计数器,以免Add尚未执行,Wait以及推出。

func main() {
    var wg sync.WaitGroup
    go func() {
        wg.Add(1)           //可以运行试一下,不是每次都能设置上
        defer wg.Done() //递减计数
        fmt.Println("goroutine", "done")

    }()

    fmt.Println("main...")
    wg.Wait() //阻塞,直到计数归零
    fmt.Println("main exit")
}

可在多处用Wait阻塞,他们都能接收到通知。上栗就可在go func前加wg.Wait().

GOMAXPROCS

运行时可能会创建很多线程,但任何时候仅有限的几个线程参与并发任务执行。该数量默认与CPU核数相等,可用runtime.GOMAXPROCS函数(或环境变量)修改。

如参数小于1,GOMAXPROCS仅返回当前设置值,不做任何调整。

import (
    "math"
    "fmt"
    "sync"
    "runtime"
)

//测试目标函数
func count(){
    x:=0
    for i:=0;i<math.MaxUint32;i++{
        x+=i
    }
    fmt.Println(x)
}

//循环执行
func test(n int){
    for i:=0;i<n;i++{
        count()
    }
}

//并发执行
func test2(n int){
    var wg sync.WaitGroup
    wg.Add(n)
    for i:=0;i<n;i++{
        go func() {
            count()
            wg.Done()
        }()
    }
    wg.Wait()
}

func main() {
    n:=runtime.GOMAXPROCS(0)
    n1:=runtime.NumCPU()
    fmt.Println(n1)
    test(n)
}
n:=runtime.GOMAXPROCS(0)
    n1:=runtime.NumCPU()

上述两个都可用来获取当前系统的cpu核数。

Local Storage

与线程不同,goroutine任务无法设置优先级,无法获取编号,没有局部存储(TLS),甚至连返回值都会被抛弃。但除优先级外,其他功能都很容易实现。

func main() {
    var wg sync.WaitGroup
    var gs [5]struct{             //用于实现类似TLS功能
        id  int  //编号
        result  int //返回值
    }
    for i:=0;i<len(gs);i++{
        wg.Add(1)
        go func(id int) {   //使用参数避免参数闭包延迟求值
            defer wg.Done()
            gs[id].id = id
            gs[id].result=(id + 1) * 100
        }(i)
    }
    wg.Wait()
    fmt.Printf("%+v\n",gs)
}

输出:

[{id:0 result:100} {id:1 result:200} {id:2 result:300} {id:3 result:400} {id:4 result:500}]

**如使用map作为局部存储容器,建议做同步处理,因为运行时会对其做并发读写检查。

Gosched

暂停,释放线程去执行其他任务。当前任务被放回队列,等待下次调度时恢复执行。

func main() {
    runtime.GOMAXPROCS(1)
    exit:=make(chan struct{})

    go func() {    //任务a
        defer close(exit)

        go func() {   //任务b,放在此处是为了确保a先执行。
            fmt.Println("b")
        }()
        for i:=0;i<4;i++{
            fmt.Println("a:", i)
            if i==1{
                runtime.Gosched()
            }
        }
    }()

    <-exit
}

输出:

a: 0
a: 1
b
a: 2
a: 3

该函数很少被使用,因为运行时会主动向长时间运行(10ms)的任务发出抢占调度。

Goexit

Goexit立即终止当前任务,运行时确保所有已注册延迟调用被执行。该函数不会影响其他并发任务,不会引发panic,自然也就无法捕获。

func main() {
    exit:=make(chan struct{})

    go func() {
        defer close(exit)
        defer println("a")
        func(){
            defer func() {
                println("b",recover() ==nil)   //执行recover返回nil
            }()
            func(){                                 //在多层调用中执行Goexit
                println("c")
                runtime.Goexit()                    //立即终止整个调用堆栈
                println("c done.")            //不会执行
            }()
            println("b done.") //不会被执行
        }()
        println("a done.")  //不会执行
    }()

    <- exit
    println("main exit.")
}

输出:

c
b true
a
main exit.

如果在main.main里调用Goexit,它会等待其他任务结束,然后让进程直接崩溃。

无论身处哪一层,Goexit都能立即终止整个调用堆栈,这与return仅退出当前函数不同。 标准库函数os.Exit可终止进程,但不会执行延迟调用。

2. 通道

Go并未实现严格的并发安全。

允许全局变量、指针、引用类型这些非安全内存共享操作,就需要开发人员自行维护数据一致性和完整性。Go鼓励使用CSP通道,以通信代替内存共享,实现并发安全。

通过消息来避免竟态的模型除了CSP,还有Actor。但两者区别较大。

作为CSP核心,通道是显式的,要求操作双方必须知道数据类型和具体通道,并不关心另一端操作者身份和数量。可如果另一端未准备妥当,或消息未能及时处理时,会阻塞当前端。

相比起来,Actor是透明的,它不在乎数据类型及通道,只要知道接收者信箱即可。默认就是异步方式,发送方对消息是否被接收和处理并不关心。

从底层实现上来说,通道只是一个队列。同步模式下,发送和接收双方配对,然后直接赋值数据给对方。如配对失败,则置入等待队列,直到另一方出现后才被唤醒。异步模式抢夺的则是数据缓冲槽。发送方要求有空槽可供写入,而接收方则要求有缓冲数据可读。需求不符时,同样加入等待队列,直到有另一方写入数据或腾出空槽后被唤醒。

除传递消息外,通道还被用作时间通知。

func main() {
    done:=make(chan struct{})
    c:=make(chan string)
    go func() {
        s:=<-c
        fmt.Println(s)
        close(done)
    }()

    c<-"hello!"
    <-done   //阻塞,直到有数据或通道关闭
}

同步模式必须有配对操作的goroutine出现,否则会一直阻塞。而异步模式在缓冲区未满或数据未读完前,不会阻塞。

func main() {
    c:=make(chan int,3)     //创建带三个缓冲槽的异步通道
    c<-1                    //缓冲区未满,不会阻塞
    c<-2

    println(<-c)
    println(<-c)
}
输出:
1
2

多数时候,异步通道有助于提升性能,减少排队阻塞。

缓冲区大小仅仅是内部属性,不属于类型组成部分。另外通道变量本身就是指针,可用相等操作符判断是否为同一对象或nil。

func main(){
    var a,b chan int = make(chan int,3),make(chan int)
    var c chan bool
    fmt.Println(a==b)
    fmt.Println(c==nil)

    fmt.Printf("%p,%d\n",a,unsafe.Sizeof(a))
}
输出:
false
true
0xc04207a000,8

虽然可传递指针来避免数据复制,但须额外注意数据并发安全。

内置函数cap和len返回缓冲区大小和当前已缓冲数量;而对于同步通道则都返回0,据此可判断通道是同步还是异步。

func main(){
    a,b:=make(chan int),make(chan int,3)
    b<-1
    b<-2
    println("a",len(a),cap(a))
    println("b",len(b),cap(b))
}

输出:

a 0 0
b 2 3

收发

除使用简单的发送和接收操作符外,还可用ok-idom或range模式处理数据。

func main() {
    done :=make(chan struct{})
    c:=make(chan int)

    go func() {
        defer close(done)
        for{
            x,ok:=<-c
            if !ok{               //据此判断通道是否关闭
                return
            }
            fmt.Println(x)
        }
    }()

    c<-1
    c<-2
    c<-3
    close(c)
    <-done

}

输出:1,2,3

对于循环接收数据,range模式更简洁一些。

[...]
    go func() {
        defer close(done)
        for x:=range c{       //循环获取消息,直到通道被关闭。
            println(x)
        }
    }()

[...]

及时用close函数关闭通道引发结束通知,否则可能会导致死锁。

通知可以是群体性的。也未必就是通知结束,可以是任何需要表达的事件。

一次性事件用close效率更好,没有多余开销。连续或多样性事件,可传递不同数据标志实现。还可使用sync.Cond实现单播或广播事件。

对于closed或nil通道,发送或接收操作都有相应规则:

  • 向已关闭通道发送数据,引发panic。
  • 从已关闭通道接收数据,返回已缓冲数据或零值。
  • 无论收发,nil通道都会阻塞。
func main(){
    c:=make(chan int,3)
    
    c<-10
    c<-20
    close(c)

    for i:=0;i<cap(c)+1;i++{
        x,ok:=<-c
        println(i,":",ok,x)
    }
}

输出:

0 : true 10
1 : true 20
2 : false 0
3 : false 0

重复关闭,或关闭nil通道都会引发panic错误。

单向

通道默认都是双向的,并不区分发送和接收端。但某些时候,我们可限制收发操作的方向来获得更严谨的操作逻辑。

尽管可用make创建单向通道,但那没有任何意义。通常使用类型转换来获取单向通道,并分别赋予操作双方。

func main() {
    var wg sync.WaitGroup
    wg.Add(2)

    c:=make(chan int)
    var send chan<- int =c
    var recv <-chan int =c

    go func() {
        defer wg.Done()
        for x:=range recv{
            println(x)
        }
    }()

    go func() {
        defer wg.Done()
        defer close(c)
        for i:=0;i<3;i++{
            send<-i
        }
    }()

    wg.Wait()

}

不能在单向通道上做逆向操作。close也不能用于接收端。也无法将单向通道重新转换回去。

选择

如要同时处理多个通道,可选用Select语句。它会随机选择一个可用通道做收发操作。

func main() {
    var wg sync.WaitGroup
    wg.Add(2)
    a, b := make(chan int), make(chan int)

    go func() {             //接收端
        defer wg.Done()
        for {
            var (
                name string
                x    int
                ok   bool
            )
            select {
            case x, ok = <-a:            //随机选择可用channel接收数据
                name = "a"
            case x, ok = <-b:
                name = "b"
            }
            if !ok {                     //如果任一通道关闭则终止接收。
                return
            }
            println(name, x)  //输出接收的数据信息

        }
    }()

    go func() {         //发送端
        defer wg.Done()
        defer close(a)
        defer close(b)

        for i := 0; i < 10; i++ {
            select {
            case a <- i:
            case b <- i * 10:

            }
        }

    }()

    wg.Wait()
}

输出:

b 0
a 1
a 2
b 30
a 4
b 50
a 6
a 7
a 8
b 90

如果要等全部的通道消息处理结束,可将已完成的通道设置为nil,这样她就会被阻塞,而不再被Select选中。

以下示例是两个独立的通道,逻辑是等两个通道都结束了收发才最终close,哪个先完成哪个阻塞住在那等待。

func main() {
    var wg sync.WaitGroup
    wg.Add(3)
    a, b := make(chan int), make(chan int)

    go func() { //接收端
        defer wg.Done()
        for {
            select {
            case x, ok := <-a:
                if !ok {
                    a = nil
                    break
                }
                println("a",x)
            case x, ok := <-b:
                if !ok {
                    b = nil
                    break
                }
                println("b", x)
            }


            if a == nil && b == nil {
                return
            }

        }
    }()

    go func() {
        defer wg.Done()
        defer close(a)

        for i := 0; i < 10; i++ {
            a <- i
        }

    }()

    go func() {
        defer wg.Done()
        defer close(b)

        for i := 100; i < 105; i++ {
            b <- i
        }

    }()

    wg.Wait()
}

输出:

a 0
a 1
b 100
a 2
a 3
a 4
b 101
b 102
b 103
a 5
b 104
a 6
a 7
a 8
a 9

即使是同一通道,也会随机选择case执行。

func main() {
    var wg sync.WaitGroup
    wg.Add(2)
    c := make(chan int)

    go func() { //接收端
        defer wg.Done()
        for {
            var x int
            var ok bool
            select {
            case x, ok = <-c:
                println("a1",x)
            case x, ok = <-c:
                println("a2", x)
            }
            if !ok {
                return
            }

        }
    }()

    go func() {
        defer wg.Done()
        defer close(c)

        for i := 0; i < 10; i++ {
            select {
            case c<-i:
            case c<-i*10:
            }
        }
    }()
    wg.Wait()
}

输出:

a1 0
a1 1
a2 2
a2 3
a1 4
a1 50
a1 60
a1 7
a2 80
a2 90
a2 0

当所有通道都不可用时,Select会执行default语句。如此可避开Select阻塞,但须注意处理外层循环,以免陷入空耗。

func main() {
    c:=make(chan int)
    done:=make(chan bool)

    go func() {
        defer close(done)
        for {

            select {
            case x,ok:=<-c:
                if !ok{
                    return
                }
                fmt.Println("data:",x)
            default:                               //避免Select阻塞

            }

            fmt.Println(time.Now())
            time.Sleep(time.Second)
        }
    }()
    time.Sleep(5*time.Second)
    c<-100
    close(c)
    <-done
}

输出:略

也可以用default处理一些默认逻辑。

func main() {
    done := make(chan struct{})
    data := []chan int{           //数据缓冲区
        make(chan int, 3),
    }

    go func() { //生产数据
        defer close(done)
        for i := 0; i < 10; i++ {
            select {
            case data[len(data)-1] <- i:   //生产数据
            default: //数据通道已满则新建chan
                data = append(data, make(chan int, 3))
            }
        }

    }()
    <-done

    for x := 0; x < len(data); x++ {
        c := data[x]
        close(c)  //关闭通道后也能从中读取数据
        for i := range (c) {
            fmt.Println(i)
        }
    }

}

输出:

可以看到,channel缓存满了后的第一个数据会被丢弃,直接走default创建新的通道了。

模式

通常使用工厂方法将goroutine和通道绑定。

type receiver struct {
    wg   sync.WaitGroup
    data chan int
}

func newReceiver() *receiver {
    r := &receiver{
        data: make(chan int),
    }
    r.wg.Add(1)
    go func() {
        defer r.wg.Done()
        for x := range r.data { //接收消息,直到通道关闭
            println("recv:", x)
        }
    }()
    return r
}

func main() {

    r := newReceiver()
    r.data <- 1
    r.data <- 2
    close(r.data)  //关闭通道,发出结束通知
    r.wg.Wait()   //等待接收者处理结束

}

输出:

recv: 1
recv: 2

鉴于通道本身就是一个并发安全的队列,可用作ID generator、Pool等用途。

type pool chan []byte

func newPool(cap int)pool{
    return make(chan []byte,cap)
}

func (p pool)get() []byte{
    var v []byte
    select {
    case v=<-p:                       //返回
    default:
        v=make([]byte,10)    //返回失败,新建
    }
    return v
}

func (p pool)put(b []byte){
    select {
    case p<-b:                //放回
    default:            //放回失败,新建

    }
}

用通道实现信号量(semaphore)。

func main() {
    runtime.GOMAXPROCS(4)
    var wg sync.WaitGroup

    sem:=make(chan struct{}, 2)        //最多允许两个并发同时执行
    for i:=0;i<5;i++{
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            sem<- struct{}{}           // acquire: 获取信号
            defer func() {<-sem}()     //release: 释放信号
            time.Sleep(time.Second * 2)
            fmt.Println(id,time.Now())
        }(i)
    }
    wg.Wait()
}

标准库time提供了timeout和tick channel实现。

package main

import (
    "time"
    "fmt"
    "os"
)

func main() {
    go func() {
        for{
            select {
            case <-time.After(time.Second*5):
                fmt.Println("timeout...")
                os.Exit(0)
            }
        }
    }()

    go func() {
        tick:=time.Tick(time.Second)
        //for _=range tick{
        //  fmt.Println(time.Now(),"test")
        //}
        for {
            select {
            case <-tick:
                fmt.Println(time.Now())
            }
        }
    }()

    <-(chan struct {})(nil)      //直接用nil channel阻塞进程
}

捕获INT、TERM信号,顺便实现一个简易的atexit函数。

atexit函数是一个特殊的函数,它是在正常程序退出时调用的函数,我们把他叫为登记函数(函数原型:int atexit (void (*)(void))):

⼀个进程可以登记若⼲个(具体⾃⼰验证⼀下)个函数,这些函数由exit⾃动调⽤,这些函数被称为终⽌处理函数, atexit函数可以登记这些函数。 exit调⽤终⽌处理函数的顺序和atexit登记的顺序相反(网上很多说造成顺序相反的原因是参数压栈造成的,参数的压栈是先进后出,和函数的栈帧相同),如果⼀个函数被多次登记,也会被多次调⽤。

python中有专门的atexit模块,简介如下:

从模块的名字也可以看出来,atexit模块主要的作用就是在程序即将结束之前执行的代码,atexit模块使用register函数用于注册程序退出时的回调函数,然后在回调函数中做一些资源清理的操作。

注意:

1,如果程序是非正常crash,或通过os._exit()退出,注册的回调函数将不会被调用。

2,也可以通过sys.exitfunc来注册回调,但通过它只能注册一个回调,而且还不支持参数。

3,建议使用atexit来注册回调函数。

import (
    "sync"
    "os"
    "os/signal"
    "syscall"
    "fmt"
)

//type atexits struct {
//  sync.WaitGroup
//  signal chan os.Signal
//  funcs []func()
//}
var exits=&struct {
    sync.RWMutex
    signals chan os.Signal
    funcs []func()
}{}

func atexit(f func()){
    exits.Lock()
    defer exits.Unlock()
    exits.funcs=append(exits.funcs,f)
}

func waitExit(){
    if exits.signals==nil{
        exits.signals = make(chan os.Signal)
        signal.Notify(exits.signals,syscall.SIGINT,syscall.SIGTERM)
        fmt.Println("test")
    }
    exits.RLock()
    for _,f:=range exits.funcs{
        defer f()      //延迟调用函数采用FILO顺序执行。即便某些函数panic,延迟调用也能确保后续函数执行。
    }
    fmt.Println("after range exits.funcs")
    exits.RUnlock()
    fmt.Println("after exits.Runlock")
    <-exits.signals
}

func main() {
    atexit(func() {
        println("exit1...")
    })
    atexit(func() {
        println("exit2...")
    })
    fmt.Println("befor exit")

    waitExit()
}

性能

将发往通道的数据打包,减少传输次数,可有效提升性能。从实现上来说,通道队列依旧使用锁同步机制,单次获取更多数据(批处理),可改善因频繁加锁造成的性能问题。

const (
    max     = 500000 //数据统计上限
    block   = 500    //数据块大小
    bufsize = 100    //缓冲区大小
)

func test() { //普通模式,每次传递一个整数
    done := make(chan struct{})
    c := make(chan int, bufsize)

    go func() {
        count := 0
        for x := range c {
            count += x
        }
        close(done)
    }()

    for i := 0; i < max; i++ {
        c <- i
    }
    close(c)
    <-done
}

func testBlock() { //块模式:每次将500个数字打包成块传输
    done := make(chan struct{})
    c:=make(chan [block]int,bufsize)

    go func() {
        count:=0
        for a:=range c{
            for _,x:=range a{  //a 是[block]int数组
                count +=x
            }
        }
        fmt.Println(count)
        close(done)
    }()

    for i:=0;i<max;i+=block{
        var b [block]int       //使用数组对数据打包
        for n:=0; n<block;n++{
            b[n] = i+n
            if i+n == max -1{
                break
            }
        }

        c <- b
    }

    close(c)
    <-done
}

BenchmarkTest

虽然单次消耗更多内存,但性能提升非常明显。如将数组改成切片会造成更多内存分配次数。

资源泄漏

通道可能会引发goroutine leak,确切地说,是指goroutine处于发送或接受阻塞状态,但一直未被唤醒。垃圾回收器并不手机此类资源,导致它们会在等待队列里长久休眠,形成资源泄漏。

3. 同步

通道并不是用来取代锁的,它们有各自不同的应用场景。通道倾向于解决逻辑层次的并发处理架构,而锁则用来保护局部范围内的数据安全。

标准库sync提供了互斥和读写锁,另有原子操作等。mutex、rwmutex的使用并不复杂,只有几个地方需要注意。

将Mutex作为匿名字段时,相关方法必须实现为pointer-receiver,否则会因复制导致锁机制失效。

type data struct {
    sync.Mutex
}

func (d data)test(s string){
    d.Lock()
    defer d.Unlock()
    for i:=0;i<5;i++{
        fmt.Println(s, i)
        time.Sleep(time.Second)
    }
}
func main() {
    var wg sync.WaitGroup
    var d data
    wg.Add(2)

    go func() {
        defer wg.Done()
        d.test("Read")
    }()

    go func() {
        defer wg.Done()
        d.test("write")
    }()

    wg.Wait()

}
上述代码运行后会发现锁机制已失效,解决方案是将data 改为

data.

也可用嵌入

Mutex来避免复制问题,但那需要专门初始化。

应将Mutex锁粒度控制在最小范围内,及早释放。

Mutex不支持递归锁,即锁里面不允许有锁,否则即使在同一goroutine下也会导致死锁。

在设计并发安全类型时,千万注意此类问题。

import "sync"

type cache struct {
    sync.Mutex
    data []int
}

func (c *cache)count()int{
    c.Lock()
    n:=len(c.data)
    c.Unlock()
    return n
}

func (c *cache)get() int{
    c.Lock()
    defer c.Unlock()
    var d int
    if n:=c.count();n>0{    //锁中套锁
        d=c.data[0]
        c.data=c.data[1:]
    }
    return d
}

func main() {

    c:=cache{data:[]int{1,2,3,4}}
    c.get()
}

输出:

fatal error: all goroutines are asleep - deadlock!

相关建议:

  • 对性能要求较高时,应避免使用defer Unlock.
  • 读写并发时,用RWMutex性能会更好一些。
  • 对单个数据读写保护,可尝试用原子操作。
  • 执行严格测试,尽可能打开数据竞争检查。

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK