1

Kubernetes GoRoutineMap工具包代码详解 - 人艰不拆_zmc

 2 years ago
source link: https://www.cnblogs.com/zhangmingcheng/p/17439413.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Kubernetes GoRoutineMap工具包代码详解

GoRoutineMap 定义了一种类型,可以运行具有名称的 goroutine 并跟踪它们的状态。它防止创建具有相同名称的多个goroutine,并且在上一个具有该名称的 goroutine 完成后的一段退避时间内可能阻止重新创建 goroutine。

使用GoRoutineMap场景:

  • 使用协程的方式运行函数逻辑,如果函数成功执行,则退出该协程;如果函数执行报错,在指数退避的时间内禁止再次执行该函数逻辑。

使用GoRoutineMap大体步骤如下:

1)通过goRoutineMap.NewGoRoutineMap(exponentialBackOffOnError bool) GoRoutineMap {....}方法创建GoRoutineMap结构体对象,用于管理goroutine 并跟踪它们的状态;

2)调用GoRoutineMap结构体对象Run(operationName, operation)方法,其能够防止创建具有相同名称的多个goroutine,并使用协程的方式运行函数逻辑,如果函数成功执行,则退出该协程;如果函数执行报错,在指数退避的时间内禁止再次执行该函数逻辑。

注意 1:本文代码基于Kubernetes 1.24.10版本,包路径kubernetes-1.24.10/pkg/util/goroutinemap/goroutinemap.go。

注意 2:概述中涉及的代码会在下文进行详细解释。

2、goroutinemap工具包代码详解

2.1 相关类型详解

GoRoutineMap工具包接口定义:

type GoRoutineMap interface {
// Run adds operation name to the list of running operations and spawns a
// new go routine to execute the operation.
// If an operation with the same operation name already exists, an
// AlreadyExists or ExponentialBackoff error is returned.
// Once the operation is complete, the go routine is terminated and the
// operation name is removed from the list of executing operations allowing
// a new operation to be started with the same operation name without error.
Run(operationName string, operationFunc func() error) error
// Wait blocks until operations map is empty. This is typically
// necessary during tests - the test should wait until all operations finish
// and evaluate results after that.
Wait()
// WaitForCompletion blocks until either all operations have successfully completed
// or have failed but are not pending. The test should wait until operations are either
// complete or have failed.
WaitForCompletion()
IsOperationPending(operationName string) bool
}

goRoutineMap结构体实现GoRoutineMap接口,定义如下:

// goRoutineMap结构体实现GoRoutineMap接口,
type goRoutineMap struct {
// 用于记录goRoutineMap维护协程的状态
operations                map[string]operation
// 发生错误时是否指数级补偿
exponentialBackOffOnError bool
// 用在多个 Goroutine 等待,一个 Goroutine 通知(事件发生)的场景
cond                      *sync.Cond
lock                      sync.RWMutex
}
// operation结构体对象维护单个goroutine的状态。
type operation struct {
// 是否操作挂起
operationPending bool
// 单个goroutine执行逻辑报错时,实现以指数退避方式
expBackoff       exponentialbackoff.ExponentialBackoff
}

ExponentialBackoff结构体包含最后一次出现的错误、最后一次出现错误的时间以及不允许重试的持续时间。

// ExponentialBackoff contains the last occurrence of an error and the duration
// that retries are not permitted.
type ExponentialBackoff struct {
lastError           error
lastErrorTime       time.Time
durationBeforeRetry time.Duration
}

2.2 GoRoutineMap结构体对象初始化

通过goRoutineMap.NewGoRoutineMap方法创建GoRoutineMap结构体对象,用于管理goroutine 并跟踪它们的状态;  

// NewGoRoutineMap returns a new instance of GoRoutineMap.
func NewGoRoutineMap(exponentialBackOffOnError bool) GoRoutineMap {
g := &goRoutineMap{
operations:                make(map[string]operation),
exponentialBackOffOnError: exponentialBackOffOnError,
}
g.cond = sync.NewCond(&g.lock)
return g
}

2.3  GoRoutineMap.Run方法代码详解

调用GoRoutineMap结构体对象Run(operationName, operation)方法,其能够防止创建具有相同名称的多个goroutine,并使用协程的方式运行函数逻辑,如果函数成功执行,则退出该协程;如果函数执行报错,在指数退避的时间内禁止再次执行该函数逻辑。

// Run函数是外部函数,是goRoutineMap核心方法,其能够防止创建具有相同名称的多个goroutine,并使用协程的方式运行函数逻辑
// 如果函数成功执行,则退出该协程;如果函数执行报错,在指数退避的时间内禁止再次执行该函数逻辑。
func (grm *goRoutineMap) Run(
operationName string,
operationFunc func() error) error {
grm.lock.Lock()
defer grm.lock.Unlock()
// 判断grm.operations这个map中是否存在具有operationName名称的协程
existingOp, exists := grm.operations[operationName]
if exists {
// 如果grm.operations这个map中已经存在operationName名称的协程,并且existingOp.operationPending==true,说明grm.operations中operationName名称这个协程正在执行函数逻辑,在这期间又有一个同名的
// operationName希望加入grm.operations这个map,此时加入map失败并报AlreadyExistsError错误
if existingOp.operationPending {
return NewAlreadyExistsError(operationName)
}
// 到这步说明名称为operationName名称的协程执行函数逻辑失败,此时判断此协程最后一次失败时间 + 指数退避的时间 >= 当前时间,如果不符合条件的话禁止执行该协程函数逻辑。
if err := existingOp.expBackoff.SafeToRetry(operationName); err != nil {
return err
}
}
// 如果grm.operations这个map中不存在operationName名称的协程 或者 此协程最后一次失败时间 + 指数退避的时间 < 当前时间,则在grm.operations这个map中重新维护此协程(注意,operationPending=true)
grm.operations[operationName] = operation{
operationPending: true,
expBackoff:       existingOp.expBackoff,
}
// 以协程方式执行函数逻辑operationFunc()
go func() (err error) {
// 捕获崩溃并记录错误,默认不传参的话,在程序发送崩溃时,在控制台打印一下崩溃日志后再崩溃,方便技术人员排查程序错误。
defer k8sRuntime.HandleCrash()
// 如果执行operationFunc()函数逻辑不报错或者grm.exponentialBackOffOnError=false的话,将从grm.operations这个map中移除此operationName名称协程;
// 如果执行operationFunc()函数逻辑报错并且grm.exponentialBackOffOnError=true,则将产生指数级补偿,到达补偿时间后才能再调用此operationName名称协程的函数逻辑
// Handle completion of and error, if any, from operationFunc()
defer grm.operationComplete(operationName, &err)
// 处理operationFunc()函数发生的panic错误,以便defer grm.operationComplete(operationName, &err)能执行
// Handle panic, if any, from operationFunc()
defer k8sRuntime.RecoverFromPanic(&err)
return operationFunc()
}()
return nil
}

如果给定lastErrorTime的durationBeforeRetry周期尚未过期,则SafeToRetry返回错误。否则返回零。

// SafeToRetry returns an error if the durationBeforeRetry period for the given
// lastErrorTime has not yet expired. Otherwise it returns nil.
func (expBackoff *ExponentialBackoff) SafeToRetry(operationName string) error {
if time.Since(expBackoff.lastErrorTime) <= expBackoff.durationBeforeRetry {
return NewExponentialBackoffError(operationName, *expBackoff)
}
return nil
}

operationComplete是一个内部函数,用于处理在goRoutineMap中已经运行完函数逻辑的协程。

// operationComplete是一个内部函数,用于处理在goRoutineMap中已经运行完函数逻辑的协程
// 如果执行operationFunc()函数逻辑不报错或者grm.exponentialBackOffOnError=false的话,将从grm.operations这个map中移除此operationName名称协程;
// 如果执行operationFunc()函数逻辑报错并且grm.exponentialBackOffOnError=true,则将产生指数级补偿,到达补偿时间后才能再调用此operationName名称协程的函数逻辑
// operationComplete handles the completion of a goroutine run in the
// goRoutineMap.
func (grm *goRoutineMap) operationComplete(
operationName string, err *error) {
// Defer operations are executed in Last-In is First-Out order. In this case
// the lock is acquired first when operationCompletes begins, and is
// released when the method finishes, after the lock is released cond is
// signaled to wake waiting goroutine.
defer grm.cond.Signal()
grm.lock.Lock()
defer grm.lock.Unlock()
if *err == nil || !grm.exponentialBackOffOnError {
// 函数逻辑执行完成无错误或已禁用错误指数级补偿,将从grm.operations这个map中移除此operationName名称协程;
// Operation completed without error, or exponentialBackOffOnError disabled
delete(grm.operations, operationName)
if *err != nil {
// Log error
klog.Errorf("operation for %q failed with: %v",
operationName,
*err)
}
} else {
//  函数逻辑执行完成有错误则将产生指数级补偿,到达补偿时间后才能再调用此operationName名称协程的函数逻辑(注意,指数补充的协程,operationPending=false)
// Operation completed with error and exponentialBackOffOnError Enabled
existingOp := grm.operations[operationName]
existingOp.expBackoff.Update(err)
existingOp.operationPending = false
grm.operations[operationName] = existingOp
// Log error
klog.Errorf("%v",
existingOp.expBackoff.GenerateNoRetriesPermittedMsg(operationName))
}
}

Update是一个外部函数,用于计算指数级别的退避时间。

const (
initialDurationBeforeRetry time.Duration = 500 * time.Millisecond
maxDurationBeforeRetry time.Duration = 2*time.Minute + 2*time.Second
)
func (expBackoff *ExponentialBackoff) Update(err *error) {
if expBackoff.durationBeforeRetry == 0 {
expBackoff.durationBeforeRetry = initialDurationBeforeRetry
} else {
expBackoff.durationBeforeRetry = 2 * expBackoff.durationBeforeRetry
if expBackoff.durationBeforeRetry > maxDurationBeforeRetry {
expBackoff.durationBeforeRetry = maxDurationBeforeRetry
}
}
expBackoff.lastError = *err
expBackoff.lastErrorTime = time.Now()
}

本文对Kubernetes GoRoutineMap工具包代码进行了详解,通过 GoRoutineMap工具包能够防止创建具有相同名称的多个goroutine,并使用协程的方式运行函数逻辑,如果函数成功执行,则退出该协程;如果函数执行报错,在指数退避的时间内禁止再次执行该函数逻辑。使用Kubernetes GoRoutineMap包的好处包括以下几点:

  1. 减轻负载:当出现错误时,使用指数退避时间可以避免过于频繁地重新尝试操作,从而减轻系统的负载。指数退避时间通过逐渐增加重试之间的等待时间,有效地减少了对系统资源的过度使用。

  2. 提高稳定性:通过逐渐增加重试之间的等待时间,指数退避时间可以帮助应对瞬时的故障或错误。这种策略使得系统能够在短时间内自动恢复,并逐渐增加重试频率,直到操作成功为止。这有助于提高应用程序的稳定性和可靠性。

  3. 降低网络拥塞:当网络出现拥塞时,频繁地进行重试可能会加重拥塞问题并影响其他任务的正常运行。指数退避时间通过增加重试之间的等待时间,可以降低对网络的额外负载,有助于缓解网络拥塞问题。

  4. 避免过早放弃:某些错误可能是瞬时的或暂时性的,因此过早放弃重试可能会导致不必要的失败。指数退避时间确保了在错误发生时进行适当的重试,以便系统有更多机会恢复并成功完成操作。

综上所述,使用Kubernetes GoRoutineMap工具包以协程方式处理函数逻辑可以提高系统的可靠性、稳定性和性能,减轻负载并有效应对错误和故障情况。这是在Kubernetes中实施的一种常见的重试策略,常用于处理容器化应用程序中的操作错误。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK