4

Go Retry ——Go 重试方法实现

 2 years ago
source link: https://ppsteven.github.io/pages/94edd4/#%E5%8F%82%E8%80%83%E6%95%99%E7%A8%8B
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Go Retry ——Go 重试方法实现

# 简单循环

实现重试最简单的方法是,使用一个函数将要执行的代码包起来,使用循环重试3次。

但是这种方法存在三个问题:

  • 多次尝试之间没有存在时间间隔

    当在短时间内,因为网络等原因造成某些请求的失败,如果请求时间间隔很短,那这种重试是没有效果的

  • 当错误发生后,无法根据错误的类型调整重试策略

    比如我们插入一端SQL代码,当SQL代码因为语法错误报错后,就没有必要再进行重试

  • 惊群问题 (Thundering Herd Problem)

    当服务端一次断开大量连接,客户端会同时发送重试请求,这容易造成 Thundering Herd Problem,简单说就是,当许多进程都在等待被同一事件唤醒的时候,当事件发生后最后只有一个进程能获得处理。其余进程又造成阻塞,这会造成上下文切换的浪费。

package main

import "fmt"

func DoSomeThing(body interface{}) (interface{}, error) {
	var ret interface{}
	var err error
	// ...
	return ret, err
}

func SimpleRetry(body interface{}) (interface{}, error) {
	var ret interface{}
	var err error
	for i := 0; i < 3; i++ {
		ret, err = DoSomeThing(body)
		if err == nil {
			break
		}
	}
	return ret, err
}

func main() {
	var body interface{}
	ret, err := SimpleRetry(body)
	if err != nil {
		fmt.Printf("SimpleRetry failed, err msg:[%v]", err)
	}
	fmt.Printf("ret: [%v]", ret)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

# 改进的简单重试策略

package main

import (
	"math/rand"
	"time"
)

// set rand seed
func init(){
	rand.Seed(time.Now().UnixNano())
}

type stopErr struct {
	error
}

func StopRetry(err error) {
  return stopErr{err}
}

func RetryDo(attempts int, sleep time.Duration, f func() error) error {
	if err := f(); err != nil {
		if s, ok := err.(stopErr); ok {
			return s.error
		}

		if attempts--; attempts > 0 {
			jitter := time.Duration(rand.Int63n(int64(sleep)))
			sleep = sleep + jitter/2

			time.Sleep(sleep)
			return RetryDo(attempts, 2 * sleep, f, params)
		}
		return err
	}

	return nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

针对上面的三个问题,这里 RetryDO 使用了递归调用本身实现重试,并且自定义了错误类型 stopErr 类型,当错误发生时,可以通过 StopRetry(err) 简单封装后,重试就会停止。

对于 惊群问题,这里加入了 扰动(jitter) 策略,睡眠时间由两部分构成,2 * sleep 保证了睡眠等待时间指数级上升,time.Duration(rand.Int63n(int64(sleep))) 保证了一定的扰动量。最后睡眠时间的长度为

t1 = t0 + 1/2 * [0, t0)
t2 = 2*t1 + [0, t0)
t3 = 2*t2 + [0, t2)

# 复杂重试策略

以上提供的代码较为简单,对于实现基础的功能来说就足够,目前有一些开源的 go retry 库也值得学习,如 https://github.com/avast/retry-go

# 简单使用

url := "http://example.com"
var body []byte

err := retry.Do(
	func() error {
		resp, err := http.Get(url)
		if err != nil {
			return err
		}
		defer resp.Body.Close()
		body, err = ioutil.ReadAll(resp.Body)
		if err != nil {
			return err
		}

		return nil
	},
)

fmt.Println(body)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

程序直接通过添加函数的方式来添加方法和配置,这样避免了 go 中参数调用的麻烦,不然上面的方法需要写成如下方式,使用起来不是很方便。

func RetryDo(attempts int, sleep time.Duration, f func(params ...interface{}) error, params interface{}) error {}

# 源码分析

程序的主体部分很简单

func Do(retryableFunc RetryableFunc, opts ...Option) error {
	var n uint

	//默认配置
	config := newDefaultRetryConfig()

	//添加自定义配置
	for _, opt := range opts {
		opt(config)
	}

	if err := config.context.Err(); err != nil {
		return err
	}

  // Error 是 []error类型,会将所有的重试的错误都储存下来
	var errorLog Error

  // lastErrorOnly = true 只保留最后一个错误结果
	if !config.lastErrorOnly {
		errorLog = make(Error, config.attempts)
	} else {
		errorLog = make(Error, 1)
	}

	lastErrIndex := n
	for n < config.attempts {
		err := retryableFunc()

		if err != nil {
			errorLog[lastErrIndex] = unpackUnrecoverable(err)

      // 是否停止重试
			if !config.retryIf(err) {
				break
			}
      // 重试策略
			config.onRetry(n, err)

      // 如果是最后一个重试,不用等待立即执行
			if n == config.attempts-1 {
				break
			}
      // 调整重试等待时间
			delayTime := config.delayType(n, err, config)
			if config.maxDelay > 0 && delayTime > config.maxDelay {
				delayTime = config.maxDelay
			}

			select {
			case <-time.After(delayTime):
			case <-config.context.Done():
				return config.context.Err()
			}

		} else {
			return nil
		}

		n++
		if !config.lastErrorOnly {
			lastErrIndex = n
		}
	}

	if config.lastErrorOnly {
		return errorLog[lastErrIndex]
	}
	return errorLog
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70

对于重试等待时间,是由两部分构成,一部分是 BackOffDelay 指数级增加等待时间,另一部是 RandomDelay 增加扰动等待时间

func newDefaultRetryConfig() *Config {
	return &Config{
		attempts:      uint(10),
		delay:         100 * time.Millisecond,
		maxJitter:     100 * time.Millisecond,
		onRetry:       func(n uint, err error) {},
		retryIf:       IsRecoverable,
+ 	delayType:     CombineDelay(BackOffDelay, RandomDelay),
		lastErrorOnly: false,
		context:       context.Background(),
	}
}
1
2
3
4
5
6
7
8
9
10
11
12

对于BackOffDelay实现比较细致,使用移位操作来指数级增加等待时间,同时考虑了uint的位数,对可能的越界问题进行了限制。

func BackOffDelay(n uint, _ error, config *Config) time.Duration {
	// 1 << 63 would overflow signed int64 (time.Duration), thus 62.
	const max uint = 62

	if config.maxBackOffN == 0 {
		if config.delay <= 0 {
			config.delay = 1
		}
    // uint(math.Floor(math.Log2(float64(config.delay)))) 计算出当前等待时间的位数
    // 如 config.delay 是 1s,那就是 1000 * 1000 * 1000, 即 9 位
    // 那 n 绝对不能超过 62 - 9 = 53,否则就会越界
		config.maxBackOffN = max - uint(math.Floor(math.Log2(float64(config.delay))))
	}
  // 如果输入 n 超过了 maxBackOffN,那在执行 config.delay << n 就会发生越位问题
	if n > config.maxBackOffN {
		n = config.maxBackOffN
	}

	return config.delay << n

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

重试策略也是一样的扰动规则

func RandomDelay(_ uint, _ error, config *Config) time.Duration {
	return time.Duration(rand.Int63n(int64(config.maxJitter)))
}

结束重试的方法

// IsRecoverable checks if error is an instance of `unrecoverableError`
// 当 err 包装的类型是 unrecoverableError 的时候,就结束重试
func IsRecoverable(err error) bool {
	_, isUnrecoverable := err.(unrecoverableError)
	return !isUnrecoverable
}
1
2
3
4
5
6

更多的用法可以去看源码

# 参考教程

  • https://upgear.io/blog/simple-golang-retry-function/
  • https://github.com/avast/retry-go

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK