![](/style/images/good.png)
![](/style/images/bad.png)
Golang 笔记 - Pipelines-and-cancellation
source link: https://greenhathg.github.io/2022/04/19/Golang%E7%AC%94%E8%AE%B0-Pipelines-and-cancellation/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
Golang 笔记 - Pipelines-and-cancellation
来源:Go Concurrency Patterns: Pipelines and cancellation - The Go Programming Language
What is pipeline
- receive values from upstream via inbound channels
- perform some function on that data, usually producing new values
- send values downstream via outbound channels
Squaring numbers
- Generator Pattern converts a list of integers to a channel that emits the integers in the list
func gen(nums ...int) <-chan int{ |
- receives integers from a channel and returns a channel that emits the square of each received integer
func sq(in <-chan int) <-chan int{ |
- receives integers from a channel and returns a channel that emits the square of each received integer
func main() { |
- we can compose it any number of times
func main() { |
Fan-out, Fan-in
- Multiple functions can read from the same channel until that channel is closed; this is called fan-out. This provides a way to distribute work amongst a group of workers to parallelize CPU use and I/O.
- A function can read from multiple inputs and proceed until all are closed by multiplexing the input channels onto a single channel that’s closed when all the inputs are closed. This is called fan-in.
func merge(cs ...<-chan int) <-chan int{ |
Resource leak
Stages don’t always receive all the inbound values
- The receiver may only need a subset of values to make progress
- More often, a stage exits early because an inbound value represents an error in an earlier stage
- If a stage fails to consume all the inbound values, the goroutines attempting to send those values will block indefinitely
// Consume the first value from the output. |
- This is a resource leak: goroutines consume memory and runtime resources, and heap references in goroutine stacks keep data from being garbage collected. Goroutines are not garbage collected; they must exit on their own.
- One way to do this is to change the outbound channels to have a buffer. But it depends on knowing the number of values merge will receive and the number of values downstream stages will consume.
- Instead, we need to provide a way for downstream stages to indicate to the senders that they will stop accepting input.
Explicit cancellation
We need a way to tell an unknown and unbounded number of goroutines to stop sending their values downstream. In Go, we can do this by closing a channel, because a receive operation on a closed channel can always proceed immediately, yielding the element type’s zero value.
func gen(done <-chan struct{}, nums ...int) <-chan int{ |
Digesting a tree
Taking a single directory as an argument and prints the digest values for each regular file under that directory, sorted by path name.
func main() { |
No concurrency and simply reads and sums each file as it walks the tree.
// MD5All reads all the files in the file tree rooted at root and returns a map |
Parallel digestion
- We split
MD5All
into a two-stage pipeline. - The first stage,
sumFiles
, walks the tree, digests each file in a new goroutine, and sends the results on a channel with value typeresult
func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {
// For each regular file, start a goroutine that sums the file and sends
// the result on c. Send the result of the walk on errc.
c := make(chan result)
errc := make(chan error, 1)
go func() {
var wg sync.WaitGroup
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
wg.Add(1)
go func() {
data, err := ioutil.ReadFile(path)
select {
case c <- result{path, md5.Sum(data), err}:
case <-done:
}
wg.Done()
}()
// Abort the walk if done is closed.
select {
case <-done:
return errors.New("walk canceled")
default:
return nil
}
})
// Walk has returned, so all calls to wg.Add are done. Start a
// goroutine to close c once all the sends are done.
go func() {
wg.Wait()
close(c)
}()
// No select needed here, since errc is buffered.
errc <- err
}()
return c, errc
} - MD5All receives the digest values from c. MD5All returns early on error, closing done via a defer
func MD5All(root string) (map[string][md5.Size]byte, error) {
// MD5All closes the done channel when it returns; it may do so before
// receiving all the values from c and errc.
done := make(chan struct{})
defer close(done)
c, errc := sumFiles(done, root)
m := make(map[string][md5.Size]byte)
for r := range c {
if r.err != nil {
return nil, r.err
}
m[r.path] = r.sum
}
if err := <-errc; err != nil {
return nil, err
}
return m, nil
}
Bounded parallelism
Our pipeline now has three stages: walk the tree, read and digest the files, and collect the digests.
type result struct { |
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK