29

Kubernetes ApiServer 并发安全机制

 4 years ago
source link: http://yangxikun.com/kubernetes/2020/01/30/kubernetes-apiserver-concurrent-safe.html?
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Kubernetes ApiServer 并发安全机制

30 January 2020

笔者在这篇文章中想要探讨的问题是:当向 Kubernetes ApiServer 并发的发起多个请求,对同一个 API 资源对象进行更新时,Kubernetes ApiServer 是如何确保更新不丢失,以及如何做冲突检测的?

将 Kubectl 作为切入点

Kubectl 的命令有很多,其中笔者经常用于修改 API 资源对象的命令是 apply 和 edit。接下来以修改 Deployment 资源对象为例子,通过观察这2个命令发出的请求,发现它们都是使用 HTTP PATCH:

PATCH /apis/extensions/v1beta1/namespaces/default/deployments/nginx HTTP/1.1
Host: 127.0.0.1:8080
User-Agent: kubectl/v0.0.0 (linux/amd64) kubernetes/$Format
Content-Length: 246
Accept: application/json
Content-Type: application/strategic-merge-patch+json
Uber-Trace-Id: 5ca3fde0b9f9aaf1:0c0358897c8e0ef8:0f38135280523f87:1
Accept-Encoding: gzip

{"spec":{"template":{"spec":{"$setElementOrder/containers":[{"name":"nginx"}],"containers":[{"$setElementOrder/env":[{"name":"DEMO_GREETING"}],"env":[{"name":"DEMO_GREETING","value":"Hello from the environment#kubectl edit"}],"name":"nginx"}]}}}}

跟踪 ApiServer 对 PATCH 请求的处理

ApiServer 处理 PATCH 请求的调用栈如下:

restfulPatchResource
    // 参数
    r rest.Patcher // registry.Store
    scope handlers.RequestScope
    admit admission.Interface
    supportedTypes []string // 支持的资源对象 PATCH 类型
    // 返回值
    restful.RouteFunction // go-restful 的路由处理回调

handlers.PatchResource
    // 创建 handlers.patcher 对象
    // 参数
    r rest.Patcher // registry.Store
    scope *RequestScope
    admit admission.Interface
    patchTypes []string // 支持的资源对象 PATCH 类型
    // 返回值
    http.HandlerFunc

k8s.io/apiserver/pkg/endpoints/handlers.(*patcher).patchResource
    // 参数
    ctx context.Context // http.Request.Context
    scope *RequestScope
    // 返回值
    runtime.Object // 更新后的资源对象
    bool // 是否创建了资源对象(当更新一个不存在的资源对象时,可以直接创建)
    error

k8s.io/apiserver/pkg/registry/generic/registry.(*Store).Update
    // 参数
    ctx context.Context
    name string // 资源对象的名称,例如:/deployments/default/nginx
    objInfo rest.UpdatedObjectInfo // 更新资源对象所需要的信息
    createValidation rest.ValidateObjectFunc
    updateValidation rest.ValidateObjectUpdateFunc
    forceAllowCreate bool
    options *metav1.UpdateOptions
    // 返回值
    runtime.Object // 更新后的资源对象
    bool // 是否创建了资源对象(当更新一个不存在的资源对象时,可以直接创建)
    error

k8s.io/apiserver/pkg/registry/generic/registry.(*DryRunnableStorage).GuaranteedUpdate
    // 实现对资源对象进行模拟更新的逻辑
    // 如果 dryRun = true,则执行完后不会再调用持久化操作
    // 参数
    ctx context.Context
    key string // 资源对象的名称,例如:/deployments/default/nginx
    ptrToType runtime.Object, // 资源对象的指针类型,由 registry.Store.NewFunc 创建
    ignoreNotFound bool, // ???
    preconditions *storage.Preconditions, // 执行更新前的检查,检查 UID 和 ResourceVersion
    tryUpdate storage.UpdateFunc, // registry.Store.Update 方法中的闭包函数,执行资源对象的更新操作
    dryRun bool, // 是否空跑(即执行输入检查,在内存中尝试更新,但不持久化)
    suggestion ...runtime.Object
    // 返回值
    error

k8s.io/apiserver/pkg/storage/cacher.(*Cacher).GuaranteedUpdate
    // 从内存缓存 cacher.Cacher.watchCache 中查询 key 是否存在,若存在,作为 suggestion 传递下去
    // 参数
    ctx context.Context
    key string // 资源对象的名称,例如:/deployments/default/nginx
    ptrToType runtime.Object // 资源对象的指针类型,由 registry.Store.NewFunc 创建
    ignoreNotFound bool
    preconditions *storage.Preconditions // 执行更新前的检查,检查 UID 和 ResourceVersion
    tryUpdate storage.UpdateFunc // registry.Store.Update 方法中的闭包函数,执行资源对象的更新操作
    _ ...runtime.Object
    // 返回值
    error

k8s.io/apiserver/pkg/storage/etcd3.(*store).GuaranteedUpdate
    // 调用 tryUpdate 更新资源对象,并通过 Etcd 的事务更新操作,持久化到 Etcd
    // 参数
    ctx context.Context
    key string // 资源对象的名称,例如:/deployments/default/nginx
    out runtime.Object // 资源对象的指针类型,由 registry.Store.NewFunc 创建
    ignoreNotFound bool
    preconditions *storage.Preconditions // 执行更新前的检查,检查 UID 和 ResourceVersion
    tryUpdate storage.UpdateFunc // registry.Store.Update 方法中的闭包函数,执行资源对象的更新操作
    suggestion ...runtime.Object // 如果在内存缓存中能找到,则可以避免直接访问 Etcd
    // 返回值
    error

在整个调用栈中,有几个关键点需要展开分析下:

storage.Preconditions

执行 tryUpdate 前的检查。

// Preconditions must be fulfilled before an operation (update, delete, etc.) is carried out.
type Preconditions struct {
    // Specifies the target UID.
    // +optional
    UID *types.UID `json:"uid,omitempty"`
    // Specifies the target ResourceVersion
    // +optional
    ResourceVersion *string `json:"resourceVersion,omitempty"`
}

storage.Preconditions 只有一个方法:func (p *Preconditions) Check(key string, obj runtime.Object) error,检查 obj 的 UID 和 ResourceVersion 是否与 Preconditions 一致。

在 PATCH 请求处理中,Preconditions 从 rest.UpdatedObjectInfo.Preconditions() 获取。

rest.UpdatedObjectInfo

tryUpdate 闭包中会调用 rest.UpdatedObjectInfo.UpdatedObject() 对资源对象执行更新操作。

// UpdatedObjectInfo provides information about an updated object to an Updater.
// It requires access to the old object in order to return the newly updated object.
type UpdatedObjectInfo interface {
    // Returns preconditions built from the updated object, if applicable.
    // May return nil, or a preconditions object containing nil fields,
    // if no preconditions can be determined from the updated object.
    Preconditions() *metav1.Preconditions

    // UpdatedObject returns the updated object, given a context and old object.
    // The only time an empty oldObj should be passed in is if a "create on update" is occurring (there is no oldObj).
    UpdatedObject(ctx context.Context, oldObj runtime.Object) (newObj runtime.Object, err error)
}

在 PATCH 请求处理中,rest.UpdatedObjectInfo 的实现为 rest.defaultUpdatedObjectInfo:

// defaultUpdatedObjectInfo implements UpdatedObjectInfo
type defaultUpdatedObjectInfo struct {
    // obj is the updated object
    obj runtime.Object

    // transformers is an optional list of transforming functions that modify or
    // replace obj using information from the context, old object, or other sources.
    transformers []TransformFunc
}

// DefaultUpdatedObjectInfo returns an UpdatedObjectInfo impl based on the specified object.
func DefaultUpdatedObjectInfo(obj runtime.Object, transformers ...TransformFunc) UpdatedObjectInfo {
    return &defaultUpdatedObjectInfo{obj, transformers}
}

// Preconditions satisfies the UpdatedObjectInfo interface.
// 注意这里只获取了 UID
func (i *defaultUpdatedObjectInfo) Preconditions() *metav1.Preconditions {
    // Attempt to get the UID out of the object
    accessor, err := meta.Accessor(i.obj)
    if err != nil {
        // If no UID can be read, no preconditions are possible
        return nil
    }

    // If empty, no preconditions needed
    uid := accessor.GetUID()
    if len(uid) == 0 {
        return nil
    }

    return &metav1.Preconditions{UID: &uid}
}

// UpdatedObject satisfies the UpdatedObjectInfo interface.
// It returns a copy of the held obj, passed through any configured transformers.
func (i *defaultUpdatedObjectInfo) UpdatedObject(ctx context.Context, oldObj runtime.Object) (runtime.Object, error) {
    var err error
    // Start with the configured object
    newObj := i.obj

    // If the original is non-nil (might be nil if the first transformer builds the object from the oldObj), make a copy,
    // so we don't return the original. BeforeUpdate can mutate the returned object, doing things like clearing ResourceVersion.
    // If we're re-called, we need to be able to return the pristine version.
    if newObj != nil {
        newObj = newObj.DeepCopyObject()
    }

    // Allow any configured transformers to update the new object
    for _, transformer := range i.transformers {
        newObj, err = transformer(ctx, newObj, oldObj) // if http.method == patch: patcher.applyPatch, patcher.applyAdmission
        if err != nil {
            return nil, err
        }
    }

    return newObj, nil
}

创建时 defaultUpdatedObjectInfo.obj 为 nil,则 defaultUpdatedObjectInfo.Preconditions() 返回 nil,即不会进行 Preconditions 检查。

// patchResource divides PatchResource for easier unit testing
func (p *patcher) patchResource(ctx context.Context, scope *RequestScope) (runtime.Object, bool, error) {
    ......
    p.updatedObjectInfo = rest.DefaultUpdatedObjectInfo(nil, p.applyPatch, p.applyAdmission)
    ......
}
tryUpdate 闭包更新
// Update performs an atomic update and set of the object. Returns the result of the update
// or an error. If the registry allows create-on-update, the create flow will be executed.
// A bool is returned along with the object and any errors, to indicate object creation.
func (e *Store) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
    ......
    err = e.Storage.GuaranteedUpdate(ctx, key, out, true, storagePreconditions, func(existing runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) {
        // Given the existing object, get the new object
        obj, err := objInfo.UpdatedObject(ctx, existing) // defaultUpdatedObjectInfo.UpdatedObject
        if err != nil {
            return nil, nil, err
        }

        // If AllowUnconditionalUpdate() is true and the object specified by
        // the user does not have a resource version, then we populate it with
        // the latest version. Else, we check that the version specified by
        // the user matches the version of latest storage object.
        resourceVersion, err := e.Storage.Versioner().ObjectResourceVersion(obj)
        if err != nil {
            return nil, nil, err
        }
        doUnconditionalUpdate := resourceVersion == 0 && e.UpdateStrategy.AllowUnconditionalUpdate()

        version, err := e.Storage.Versioner().ObjectResourceVersion(existing)
        if err != nil {
            return nil, nil, err
        }
        // version == 0 说明 key 对应的资源对象不存在
        if version == 0 {
            if !e.UpdateStrategy.AllowCreateOnUpdate() && !forceAllowCreate {
                return nil, nil, kubeerr.NewNotFound(qualifiedResource, name)
            }
            creating = true
            creatingObj = obj
            if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil {
                return nil, nil, err
            }
            // at this point we have a fully formed object.  It is time to call the validators that the apiserver
            // handling chain wants to enforce.
            if createValidation != nil {
                if err := createValidation(obj.DeepCopyObject()); err != nil {
                    return nil, nil, err
                }
            }
            ttl, err := e.calculateTTL(obj, 0, false)
            if err != nil {
                return nil, nil, err
            }

            return obj, &ttl, nil
        }

        creating = false
        creatingObj = nil
        if doUnconditionalUpdate {
            // Update the object's resource version to match the latest
            // storage object's resource version.
            // 无条件更新
            err = e.Storage.Versioner().UpdateObject(obj, res.ResourceVersion)
            if err != nil {
                return nil, nil, err
            }
        } else {
            // Check if the object's resource version matches the latest
            // resource version.
            if resourceVersion == 0 {
                // TODO: The Invalid error should have a field for Resource.
                // After that field is added, we should fill the Resource and
                // leave the Kind field empty. See the discussion in #18526.
                qualifiedKind := schema.GroupKind{Group: qualifiedResource.Group, Kind: qualifiedResource.Resource}
                fieldErrList := field.ErrorList{field.Invalid(field.NewPath("metadata").Child("resourceVersion"), resourceVersion, "must be specified for an update")}
                return nil, nil, kubeerr.NewInvalid(qualifiedKind, name, fieldErrList)
            }
            // 这里是关键,更新前后的资源对象版本不一致,说明出现了并发更新冲突
            // PATCH 请求中 resourceVersion 始终是等于 version 的!!!
            // PUT 请求中才可能会出现 resourceVersion != version
            if resourceVersion != version {
                return nil, nil, kubeerr.NewConflict(qualifiedResource, name, fmt.Errorf(OptimisticLockErrorMsg))
            }
        }
        if err := rest.BeforeUpdate(e.UpdateStrategy, ctx, obj, existing); err != nil {
            return nil, nil, err
        }
        // at this point we have a fully formed object.  It is time to call the validators that the apiserver
        // handling chain wants to enforce.
        if updateValidation != nil {
            if err := updateValidation(obj.DeepCopyObject(), existing.DeepCopyObject()); err != nil {
                return nil, nil, err
            }
        }
        // Check the default delete-during-update conditions, and store-specific conditions if provided
        if ShouldDeleteDuringUpdate(ctx, key, obj, existing) &&
            (e.ShouldDeleteDuringUpdate == nil || e.ShouldDeleteDuringUpdate(ctx, key, obj, existing)) {
            deleteObj = obj
            return nil, nil, errEmptiedFinalizers
        }
        ttl, err := e.calculateTTL(obj, res.TTL, true)
        if err != nil {
            return nil, nil, err
        }
        if int64(ttl) != res.TTL {
            return obj, &ttl, nil
        }
        return obj, nil, nil
    }, dryrun.IsDryRun(options.DryRun))
    ......
}
持久化到 Etcd
// GuaranteedUpdate implements storage.Interface.GuaranteedUpdate.
func (s *store) GuaranteedUpdate(
    ctx context.Context, key string, out runtime.Object, ignoreNotFound bool,
    preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, suggestion ...runtime.Object) error {
    trace := utiltrace.New(fmt.Sprintf("GuaranteedUpdate etcd3: %s", getTypeName(out)))
    defer trace.LogIfLong(500 * time.Millisecond)

    v, err := conversion.EnforcePtr(out) // out 必须是非 nil 值的指针类型
    if err != nil {
        panic("unable to convert output object to pointer")
    }
    key = path.Join(s.pathPrefix, key)

    // 从 Etcd 存储中获取 key 对象的状态
    getCurrentState := func() (*objState, error) {
        startTime := time.Now()
        getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
        metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime)
        if err != nil {
            return nil, err
        }
        return s.getState(getResp, key, v, ignoreNotFound)
    }

    var origState *objState
    var mustCheckData bool // = true 说明 origState 有可能不是最新的
    // 如果上层调用提供了 key 对象的值(从 k8s.io/apiserver/pkg/storage/cacher.Cacher.watchCache.GetByKey(key) 获取)
    // 则不需要访问 Etcd
    if len(suggestion) == 1 && suggestion[0] != nil {
        span.LogFields(log.Object("suggestion[0]", suggestion[0]))
        origState, err = s.getStateFromObject(suggestion[0])
        span.LogFields(log.Object("origState", origState))
        if err != nil {
            return err
        }
        mustCheckData = true
    } else {
        origState, err = getCurrentState()
        if err != nil {
            return err
        }
    }
    trace.Step("initial value restored")

    transformContext := authenticatedDataString(key)
    for {
        // 检查 origState.obj 的 UID、 ResourceVersion 是否与 preconditions 一致
        // PATCH 和 PUT 请求都不做检查!!!
        if err := preconditions.Check(key, origState.obj); err != nil {
            // If our data is already up to date, return the error
            // 如果 origState 已经是最新的状态了,则返回错误
            if !mustCheckData {
                return err
            }

            // 可能 origState 不是最新的状态,从 Etcd 获取最新的状态
            // It's possible we were working with stale data
            // Actually fetch
            origState, err = getCurrentState()
            if err != nil {
                return err
            }
            mustCheckData = false
            // Retry
            continue
        }

        // 在 origState.obj 的基础上进行修改
        ret, ttl, err := s.updateState(origState, tryUpdate)
        if err != nil {
            // origState 可能不是最新的状态,会从 Etcd 中获取最新的状态,再尝试更新一次
            // If our data is already up to date, return the error
            if !mustCheckData {
                return err
            }

            // It's possible we were working with stale data
            // Actually fetch
            origState, err = getCurrentState()
            if err != nil {
                return err
            }
            mustCheckData = false
            // Retry
            continue
        }

        data, err := runtime.Encode(s.codec, ret)
        if err != nil {
            return err
        }
        // 目前 origState.stale 始终为 false
        // 判断修改后的资源对象是否有变化
        if !origState.stale && bytes.Equal(data, origState.data) {
            // if we skipped the original Get in this loop, we must refresh from
            // etcd in order to be sure the data in the store is equivalent to
            // our desired serialization
            // mustCheckData = true 说明 origState 有可能不是最新的
            // 必须再确认一遍
            if mustCheckData {
                origState, err = getCurrentState()
                if err != nil {
                    return err
                }
                mustCheckData = false
                if !bytes.Equal(data, origState.data) {
                    // original data changed, restart loop
                    continue
                }
            }
            // recheck that the data from etcd is not stale before short-circuiting a write
            if !origState.stale {
                return decode(s.codec, s.versioner, origState.data, out, origState.rev)
            }
        }

        // 将对象序列化为二进制数据存储到 Etcd
        newData, err := s.transformer.TransformToStorage(data, transformContext)
        if err != nil {
            return storage.NewInternalError(err.Error())
        }

        // 设置 key 的过期时间
        opts, err := s.ttlOpts(ctx, int64(ttl))
        if err != nil {
            return err
        }
        trace.Step("Transaction prepared")

        span.LogFields(log.Uint64("ttl", ttl), log.Int64("origState.rev", origState.rev))

        // Etcd 事务
        // 注意这里会比较 Etcd 中的资源对象版本跟 origState.rev 是否一致
        // 如果一致,则更新
        // 否则,更新失败并获取当前最新的资源对象
        startTime := time.Now()
        txnResp, err := s.client.KV.Txn(ctx).If(
            clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev),
        ).Then(
            clientv3.OpPut(key, string(newData), opts...),
        ).Else(
            clientv3.OpGet(key),
        ).Commit()
        metrics.RecordEtcdRequestLatency("update", getTypeName(out), startTime)
        if err != nil {
            return err
        }
        trace.Step("Transaction committed")
        if !txnResp.Succeeded {
            // 事务执行失败
            getResp := (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
            klog.V(4).Infof("GuaranteedUpdate of %s failed because of a conflict, going to retry", key)
            // 获取最新的状态,并重试
            origState, err = s.getState(getResp, key, v, ignoreNotFound)
            if err != nil {
                return err
            }
            trace.Step("Retry value restored")
            mustCheckData = false
            continue
        }
        // 事务执行成功
        putResp := txnResp.Responses[0].GetResponsePut()

        return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
    }
}
并发更新是如何确保更新不丢失的?

在将更新后的对象持久化到 Etcd 中时,通过事务保证的,当事务执行失败时会不断重试,事务的伪代码逻辑如下:

// oldObj = FromMemCache(key) or EtcdGet(key)
if inEtcd(key).rev == inMemory(oldObj).rev:
    EtcdSet(key) = newObj
    transaction = success
else:
    EtcdGet(key)
    transaction = fail
并发更新是如何做冲突检测的?

在上面的分析中,可以看到有两处冲突检测的判断:

  1. Preconditions
  2. tryUpdate 中的 resourceVersion != version

对于 kubectl apply 和 edit (发送的都是 PATCH 请求),创建的 Preconditions 是零值,所以不会通过 Preconditions 进行冲突检测,而在 tryUpdate 中调用 objInfo.UpdatedObject(ctx, existing) 得到的 newObj.rv 始终是等于 existing.rv 的,所以也不会进行冲突检测。

那什么时候会进行冲突检测?其实 kubectl 还有个 replace 的命令,通过抓包发现 replace 命令发送的是 PUT 请求,并且请求中会带有 resourceVersion:

PUT /apis/extensions/v1beta1/namespaces/default/deployments/nginx HTTP/1.1
Host: localhost:8080
User-Agent: kubectl/v0.0.0 (linux/amd64) kubernetes/$Format
Content-Length: 866
Accept: application/json
Content-Type: application/json
Uber-Trace-Id: 6e685772cc06fc16:2514dc54a474fe88:4f488c05a7cef9c8:1
Accept-Encoding: gzip

{"apiVersion":"extensions/v1beta1","kind":"Deployment","metadata":{"labels":{"app":"nginx"},"name":"nginx","namespace":"default","resourceVersion":"744603"},"spec":{"progressDeadlineSeconds":600,"replicas":1,"revisionHistoryLimit":10,"selector":{"matchLabels":{"app":"nginx"}},"strategy":{"rollingUpdate":{"maxSurge":"25%","maxUnavailable":"25%"},"type":"RollingUpdate"},"template":{"metadata":{"creationTimestamp":null,"labels":{"app":"nginx"}},"spec":{"containers":[{"env":[{"name":"DEMO_GREETING","value":"Hello from the environment#kubectl replace"}],"image":"nginx","imagePullPolicy":"IfNotPresent","name":"nginx","resources":{},"terminationMessagePath":"/dev/termination-log","terminationMessagePolicy":"File"}],"dnsPolicy":"ClusterFirst","restartPolicy":"Always","schedulerName":"default-scheduler","securityContext":{},"terminationGracePeriodSeconds":30}}}}

PUT 请求在 ApiServer 中的处理与 PATCH 请求类似,都是调用 k8s.io/apiserver/pkg/registry/generic/registry.(*Store).Update,创建的 rest.UpdatedObjectInfo 为 rest.DefaultUpdatedObjectInfo(obj, transformers…),注意这里传了 obj 参数值(通过 decode 请求 body 得到),而不是 nil。

在 PUT 请求处理中,创建的 Preconditions 也是零值,不会通过 Preconditions 做检查。但是在 tryUpdate 中,resourceVersion, err := e.Storage.Versioner().ObjectResourceVersion(obj) 得到的 resourceVersion 是请求 body 中的值,而不像 PATCH 请求一样,来自 existing(看下 defaultUpdatedObjectInfo.UpdatedObject 方法就明白了)。

所以在 PUT 请求处理中,会通过 tryUpdate 中的 resourceVersion != version,检测是否发生了并发写冲突。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK