71

golang如何取消子 goroutine

 4 years ago
source link: https://www.tuicool.com/articles/aEBvuan
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

之前写了个工具,用于检测gitlab runner是否能承受住当前runner job的构建,根据Prometheus的监控,在资源使用过载的情况下,就临期启动服务器加入到集群中用于分担runner job构建时的压力。在运行一段时间后发现内存有时占用有点高( goroutine 过多),于是就有了下面一步步的优化。

正文

首先我们一开始有以下一段代码逻辑:

package main

import (
"fmt"
"sync"
"time"

)

var (
wg sync.WaitGroup
)

func work() error {
defer wg.Done()

//假设这个任务要干1000次,一次任务需要做2秒完成
for i := 0; i < 1000; i++ {
select {
case <-time.After(2 * time.Second):
fmt.Println("Doing some work ", i)
}
}
return nil
}


func main() {
fmt.Println("Hey, I'm going to do some work")

wg.Add(1)
go work()
wg.Wait()

fmt.Println("Finished. I'm going home")
}

运行结果如下:

$ go run work.go
Hey, I'm going to do some work
Doing some work 0
Doing some work 1
Doing some work 2
Doing some work 3
...
Doing some work 999
Finished. I'm going home

现在我们假设下我们调用的 work 这个方式是来自用户的交互或者一个http请求,我们可能不想一直等待直到 goroutine 完成,因此,常见的做法是采用超时机制,代码如下:

package main

import (
"fmt"
"log"
"time"
)

func work() error {
for i := 0; i < 1000; i++ {
select {
case <-time.After(2 * time.Second):
fmt.Println("Doing some work ", i)
}
}
return nil
}

func main() {
fmt.Println("Hey, I'm going to do some work")

ch := make(chan error, 1)
go func() {
ch <- work()
}()

select {
case err := <-ch:
if err != nil {
log.Fatal("Something went wrong :(", err)
}
case <-time.After(4 * time.Second):
fmt.Println("等的不耐烦了,就这样吧...")
}

fmt.Println("Finished. I'm going home")
}

运行结果如下:

$ go run work.go
Hey, I'm going to do some work
Doing some work 0
Doing some work 1
Life is to short to wait that long
Finished. I'm going home

现在情况比上一个好一些,main的执行不在需要等待work完成。

但上述代码还存在一些问题,比如这段代码写在一个http服务中,即使利用超时机制不等待 work 的完成,但 work 这个 goroutine 还是会在后台一直运行并消耗资源。这时候就需要想个办法来取消这个子 goroutine 。于是我想到了 context 这个包,于是又有了如下的代码

package main

import (
"fmt"
"sync"
"time"

"golang.org/x/net/context"
)

var (
wg sync.WaitGroup
)

func work(ctx context.Context) error {
defer wg.Done()

for i := 0; i < 1000; i++ {
select {
case <-time.After(2 * time.Second):
fmt.Println("Doing some work ", i)

// we received the signal of cancelation in this channel
case <-ctx.Done():
fmt.Println("Cancel the context ", i)
return ctx.Err()
}
}
return nil
}

func main() {
ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
defer cancel()

fmt.Println("Hey, I'm going to do some work")

wg.Add(1)
go work(ctx)
wg.Wait()

fmt.Println("Finished. I'm going home")
}

运行结果如下:

$ go run work.go
Hey, I'm going to do some work
Doing some work 0
Cancel the context 1
Finished. I'm going home

这看上去非常的好,代码看上去也易于管理超时,现在我们确保函数正常工作也不会浪费任何资源。现在为了让例子更加真实,我们在实际的http服务中来进行模拟。

以下是http server代码,模拟有部分概率会有慢响应:

package main

// Lazy and Very Random Server
import (
"fmt"
"math/rand"
"net/http"
"time"
)

func main() {
http.HandleFunc("/", LazyServer)
http.ListenAndServe(":1111", nil)
}

// sometimes really fast server, sometimes really slow server
func LazyServer(w http.ResponseWriter, req *http.Request) {
headOrTails := rand.Intn(2)

if headOrTails == 0 {
time.Sleep(6 * time.Second)
fmt.Fprintf(w, "Go! slow %v", headOrTails)
fmt.Printf("Go! slow %v", headOrTails)
return
}

fmt.Fprintf(w, "Go! quick %v", headOrTails)
fmt.Printf("Go! quick %v", headOrTails)
return
}

使用curl来请求查看结果;

$ curl http://localhost:1111/
Go! quick 1
$ curl http://localhost:1111/
Go! quick 1
$ curl http://localhost:1111/
*some seconds later*
Go! slow 0

现在,我们将在 goroutine 中向该服务器发出http请求,但是如果服务器速度较慢,我们将取消该请求并快速返回,以便我们可以管理取消并释放连接。 代码如下:

package main

import (
"fmt"
"io/ioutil"
"net/http"
"sync"
"time"

"golang.org/x/net/context"
)

var (
wg sync.WaitGroup
)

// main is not changed
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

fmt.Println("Hey, I'm going to do some work")

wg.Add(1)
go work(ctx)
wg.Wait()

fmt.Println("Finished. I'm going home")

}

func work(ctx context.Context) error {
defer wg.Done()

tr := &http.Transport{}
client := &http.Client{Transport: tr}

// anonymous struct to pack and unpack data in the channel
c := make(chan struct {
r *http.Response
err error
}, 1)

req, _ := http.NewRequest("GET", "http://localhost:1111", nil)
go func() {
resp, err := client.Do(req)
fmt.Println("Doing http request is a hard job")
pack := struct {
r *http.Response
err error
}{resp, err}
c <- pack
}()

select {
case <-ctx.Done():
tr.CancelRequest(req)
<-c // Wait for client.Do
fmt.Println("Cancel the context")
return ctx.Err()
case ok := <-c:
err := ok.err
resp := ok.r
if err != nil {
fmt.Println("Error ", err)
return err
}

defer resp.Body.Close()
out, _ := ioutil.ReadAll(resp.Body)
fmt.Printf("Server Response: %s\n", out)

}
return nil
}

运行结果如下:

$ go run work.go
Hey, I'm going to do some work
Doing http request is a hard job
Server Response: Go! quick 1
Finished. I'm going home


$ go run work.go
Hey, I'm going to do some work
Doing http request is a hard job
Cancel the context
Finished. I'm going home

如您在输出中所看到的,我们避免了服务器的缓慢响应。在客户端中,tcp连接已取消,因此不会忙于等待响应缓慢,因此我们不会浪费资源。

还有一个例子,有一个常驻的任务,即要控制 goroutine 的增长,又需要防止在 goroutine 超时后 goroutine 在后台运行造成资源的浪费,让我们来看下如何实现:

package main

import (
"fmt"
"runtime"
"time"

"golang.org/x/net/context"
)

//var wg sync.WaitGroup

func work1(ch chan bool) {
//defer wg.Done()
ch <- true

//任务超过3秒就超时
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
chT := make(chan bool)

go func() {
defer close(chT)
//具体的任务,这里模拟做的任务需要5秒完成
time.Sleep(time.Second * 5)
}()

select {
case <-chT:
fmt.Println("job1 finsh...", runtime.NumGoroutine())
case <-ctx.Done():
fmt.Println("job1 timeout...", runtime.NumGoroutine())
}

fmt.Println("job1 exit..")
<-ch //释放chanel
}

func work2(ch chan bool) {
//defer wg.Done()
ch <- true

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
chT := make(chan bool)

go func() {
defer close(chT)
//具体的任务,这里模拟做的任务需要1秒完成
time.Sleep(time.Second * 1)
}()

select {
case <-chT:
fmt.Println("job2 finsh...", runtime.NumGoroutine())
case <-ctx.Done():
fmt.Println("job2 timeout...", runtime.NumGoroutine())
}

<-ch //释放chanel
fmt.Println("job2 exit..")
}

func main() {
fmt.Println("Hey, I'm going to do some work")

//控制goroutine数量
ch := make(chan bool, 2)

//永久运行
for {
//因为是永久运行,所以这里的sync.Waitgroup可以不再需要
//wg.Add(2)
go work1(ch)
go work2(ch)
time.Sleep(2 * time.Second)
}

//wg.Wait()
}

希望以上例子可以给你带来一些帮助!Happy coding gophers!.


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK