45

concurrency in go 读书笔记

 5 years ago
source link: http://xargin.com/concurrency-in-go-notes/?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

《concurrency in go》这本书出版于 2017 年八月,里面有些观点还是蛮新颖的,烂大街的我就先不写了,重点写写书里提到的,我之前忽视的观点,以及一些奇技淫巧。

锁的粒度太大的话,有可能造成其它的 goroutine 饥饿

package main

import (
	"fmt"
	"sync"
	"time"
)

func main() {
	var wg sync.WaitGroup
	var sharedLock sync.Mutex
	const runtime = 1 * time.Second

	greedyWorker := func() {
		defer wg.Done()

		var count int
		for begin := time.Now(); time.Since(begin) <= runtime; {
			sharedLock.Lock()
			time.Sleep(3 * time.Nanosecond)
			sharedLock.Unlock()
			count++
		}

		fmt.Printf("Greedy worker was able to execute %v work loops\n", count)
	}

	politeWorker := func() {
		defer wg.Done()

		var count int
		for begin := time.Now(); time.Since(begin) <= runtime; {
			sharedLock.Lock()
			time.Sleep(1 * time.Nanosecond)
			sharedLock.Unlock()

			sharedLock.Lock()
			time.Sleep(1 * time.Nanosecond)
			sharedLock.Unlock()

			sharedLock.Lock()
			time.Sleep(1 * time.Nanosecond)
			sharedLock.Unlock()

			count++
		}

		fmt.Printf("Polite worker was able to execute %v work loops.\n", count)
	}

	wg.Add(2)
	go greedyWorker()
	go politeWorker()

	wg.Wait()
}

在我的 Mac 上跑的结果:

Greedy worker was able to execute 507672 work loops
Polite worker was able to execute 297669 work loops.

锁的粒度大,会得到更多的运行时间,而导致其它 goroutine 获得的执行机会变少。

close 各种状态的 channel 的结果对照表:

f2QJjeE.png!web

这里面 close receive only 的 channel 发现和我的直觉有所不同。。会直接 panic。

利用作用域来约束 channel 的逻辑

channel 一般都有发送端和接收端,所以声明、定义的时候一般是这么写的:

var a = make(chan int, 10)

如果我们的消费者和生产者代码紧跟着上面的代码的话,那么可能就写成这样:

var a = make(chan int, 10)

go func() {
    for {
        select {
            case <- a:
                // do some thing
        }
    }
}()

// producer...

当然,非得这么写也不是说就一定会有问题。但随着项目升级,代码被改得面目全非的时候,保不准会有人在 consumer 里向全局的 channel 里塞数据,从而造成错误的结果。我们还可以有更好的写法来规避这个问题:

consumer := func(ch <-chan int) {
    for {
        select {
            case <- ch:
                // do some thing
        }
    }
}

var a = make(chan int, 10)

go consumer(a)

将 channel 的定义放在 consumer 定义的后面,从作用域上彻底根绝了在 consumer 里直接操作 a channel 的可能性。

通过 done channel 避免 goroutine 泄露

package main

import (
	"fmt"
	"time"
)

func main() {
	doWork := func(
		done <-chan interface{},
		strings <-chan string,
	) <-chan interface{} { // <1>
		terminated := make(chan interface{})
		go func() {
			defer fmt.Println("doWork exited.")
			defer close(terminated)
			for {
				select {
				case s := <-strings:
					// Do something interesting
					fmt.Println(s)
				case <-done: // <2>
					return
				}
			}
		}()
		return terminated
	}

	done := make(chan interface{})
	terminated := doWork(done, nil)

	go func() { // <3>
		// Cancel the operation after 1 second.
		time.Sleep(1 * time.Second)
		fmt.Println("Canceling doWork goroutine...")
		close(done)
	}()

	<-terminated // <4>
	fmt.Println("Done.")
}

这个其实平常用的已经挺多的了。。

多个 ch 中有一个返回结果即退出的 or channel

package main

// if one of the channel is ready
// then the or channel will be ready

import (
	"fmt"
	"time"
)

func main() {
	var or func(chs ...chan interface{}) chan interface{}
	or = func(chs ...chan interface{}) chan interface{} {
		if len(chs) == 0 {
			return nil
		}

		if len(chs) == 1 {
			return chs[0]
		}

		orDone := make(chan interface{})

		go func() {
			defer close(orDone)
			switch len(chs) {
			case 2:
				select {
				case <-chs[0]:
				case <-chs[1]:
				}
			default: // len(chs) > 2
				select {
				case <-chs[0]:
				case <-chs[1]:
				case <-chs[2]:
				case <-or(append(chs[3:], orDone)...):
				}
			}
		}()

		return orDone
	}

	start := time.Now()

	sig := func(after time.Duration) chan interface{} {
		ch := make(chan interface{})

		go func() {
			defer close(ch)
			time.Sleep(after)
		}()

		return ch
	}
	<-or(
		sig(time.Second),
		sig(time.Second*2),
		sig(time.Minute),
	)
	fmt.Println(time.Since(start))

}

作者声称是在多个数据源,有一个返回时即可返回给用户的场景下,这种模式比较好使。不过我好像还没碰到过这种场景 orz。

用 fan-in fan-out 加快整个 pipeline 流程

这个实在没啥好说的,channel 然后又 channel 的场景,哪一步慢了,就在那里做并行。

用 bridge channel 简化 channel in channel 的消费代码

package main

import (
	"fmt"
)

func main() {
	orDone := func(done, c <-chan interface{}) <-chan interface{} {
		valStream := make(chan interface{})
		go func() {
			defer close(valStream)
			for {
				select {
				case <-done:
					return
				case v, ok := <-c:
					if ok == false {
						return
					}
					select {
					case valStream <- v:
					case <-done:
					}
				}
			}
		}()
		return valStream
	}
	bridge := func(
		done <-chan interface{},
		chanStream <-chan <-chan interface{},
	) <-chan interface{} {
		valStream := make(chan interface{}) // <1>
		go func() {
			defer close(valStream)
			for { // <2>
				var stream <-chan interface{}
				select {
				case maybeStream, ok := <-chanStream:
					if ok == false {
						return
					}
					stream = maybeStream
				case <-done:
					return
				}
				for val := range orDone(done, stream) { // <3>
					select {
					case valStream <- val:
					case <-done:
					}
				}
			}
		}()
		return valStream
	}
	genVals := func() <-chan <-chan interface{} {
		chanStream := make(chan (<-chan interface{}))
		go func() {
			defer close(chanStream)
			for i := 0; i < 10; i++ {
				stream := make(chan interface{}, 1)
				stream <- i
				close(stream)
				chanStream <- stream
			}
		}()
		return chanStream
	}

	for v := range bridge(nil, genVals()) {
		fmt.Printf("%v ", v)
	}
}

也是作者声称有些场景下需要在 channel 中套 channel,来保证一定的“发送顺序”,不过原谅我又没见过相关的场景。。。

context 使用

嗯,大致意思就是 context 是不可变的,可以用 WithCancel 和 WithTimeout 得到一个 cancel 方法,然后直接调用来取消整个 call graph,不过在 Go 中,如果在子 goroutine 中要支持取消的话,始终需要写:

select {
case <-ctx.Done():
case ...
}

而且理论上要被取消时尽快返回,所以对下游的代码侵入非常大,不知道其它语言是怎么解决的,抽时间调研一下~


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK