10

图解kubernetes中etcd增删改查的工业实现

 4 years ago
source link: https://studygolang.com/articles/26987
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

kubernetes中基于etcd实现集中的数据存储,今天来学习下基于etcd如何实现数据读取一致性、更新一致性、事务的具体实现

1. 数据的存储与版本

1.1 数据存储的转换

2f182efbaac041d71d19c3ec40759060.png 在k8s中有部分数据的存储是需要经过处理之后才能存储的,比如secret这种加密的数据,既然要存储就至少包含两个操作,加密存储,解密读取,transformer就是为了完成该操作而实现的,其在进行etcd数据存储的时候回对数据进行加密,而在读取的时候,则会进行解密

1.2 资源版本revision

836950ce8d3a4e52ea57781f4002d2af.png 在etcd中进行修改(增删改)操作的时候,都会递增revision,而在k8s中也通过该值来作为k8s资源的ResourceVersion,该机制也是实现watch的关键机制,在操作etcd解码从etcd获取的数据的时候,会通过versioner组件来为资源动态的修改该值

1.3 数据模型的映射

5362825a10540ff3cb6efcaa431c2719.png 将数据从etcd中读取后,数据本身就是一个字节数组,如何将对应的数据转换成我们真正的运行时对象呢?还记得我们之前的scheme与codec么,在这里我们知道对应的数据编码格式,也知道资源对象的类型,则通过codec、字节数组、目标类型,我们就可以完成对应数据的反射

2. 查询接口一致性

03bc5f21adb159dd7953601c00156344.png etcd中的数据写入是基于leader单点写入和集群quorum机制实现的,并不是一个强一致性的数据写入,则如果如果我们访问的节点不存在quorum的半数节点内,则可能造成短暂的数据不一致,针对一些强一致的场景,我们可以通过其revision机制来进行数据的读取, 保证我们读取到更新之后的数据

// 省略非核心代码
func (s *store) Get(ctx context.Context, key string, resourceVersion string, out runtime.Object, ignoreNotFound bool) error {
    // 获取key
    getResp, err := s.client.KV.Get(ctx, key, s.getOps...)

    // 检测当前版本,是否达到最小版本的
    if err = s.ensureMinimumResourceVersion(resourceVersion, uint64(getResp.Header.Revision)); err != nil {
        return err
    }

    // 执行数据转换
    data, _, err := s.transformer.TransformFromStorage(kv.Value, authenticatedDataString(key))
    if err != nil {
        return storage.NewInternalError(err.Error())
    }
    // 解码数据
    return decode(s.codec, s.versioner, data, out, kv.ModRevision)
}
复制代码

3. 创建接口实现 9fbdbb2aa016701cffd65f5da6df4c85.png

创建一个接口数据则会首先进行资源对象的检查,避免重复创建对象,此时会先通过资源对象的version字段来进行初步检查,然后在利用etcd的事务机制来保证资源创建的原子性操作

// 省略非核心代码
func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
    if version, err := s.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
        return errors.New("resourceVersion should not be set on objects to be created")
    }
    if err := s.versioner.PrepareObjectForStorage(obj); err != nil {
        return fmt.Errorf("PrepareObjectForStorage failed: %v", err)
    }
    // 将数据编码
    data, err := runtime.Encode(s.codec, obj)
    if err != nil {
        return err
    }
    
    // 转换数据
    newData, err := s.transformer.TransformToStorage(data, authenticatedDataString(key))
    if err != nil {
        return storage.NewInternalError(err.Error())
    }

    startTime := time.Now()
    // 事务操作
    txnResp, err := s.client.KV.Txn(ctx).If(
        notFound(key), // 如果之前不存在 这里是利用的etcd的ModRevision即修改版本为0, 寓意着对应的key不存在
    ).Then(
        clientv3.OpPut(key, string(newData), opts...), // put修改数据
    ).Commit()
    metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime)
    if err != nil {
        return err
    }
    if !txnResp.Succeeded {
        return storage.NewKeyExistsError(key, 0)
    }

    if out != nil {
        // 获取对应的Revision
        putResp := txnResp.Responses[0].GetResponsePut()
        return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
    }
    return nil
}

func notFound(key string) clientv3.Cmp {
    return clientv3.Compare(clientv3.ModRevision(key), "=", 0)
}复制代码

4. 删除接口的实现

8f6ba9390422874870046ff3fe613f3a.png 删除接口主要是通过CAS和事务机制来共同实现,确保在etcd不发生异常的情况,即使并发对同个资源来进行删除操作也能保证至少有一个节点成功

// 省略非核心代码
func (s *store) conditionalDelete(ctx context.Context, key string, out runtime.Object, v reflect.Value, preconditions *storage.Preconditions, validateDeletion storage.ValidateObjectFunc) error {
    startTime := time.Now()
    // 获取当前的key的数据
    getResp, err := s.client.KV.Get(ctx, key)
    for {
        // 获取当前的状态
        origState, err := s.getState(getResp, key, v, false)
        if err != nil {
            return err
        }
        txnResp, err := s.client.KV.Txn(ctx).If(
            clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev), // 如果修改版本等于当前状态,就尝试删除
        ).Then(
            clientv3.OpDelete(key), // 删除
        ).Else(
            clientv3.OpGet(key),    // 获取
        ).Commit()
        if !txnResp.Succeeded {
            // 获取最新的数据重试事务操作
            getResp = (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
            klog.V(4).Infof("deletion of %s failed because of a conflict, going to retry", key)
            continue
        }
        // 将最后一个版本的数据解码到out里面,然后返回
        return decode(s.codec, s.versioner, origState.data, out, origState.rev)
    }
}复制代码

5. 更新接口的实现

b726a2778e6d40f5c7222994fcef9ba5.png 更新接口实现上与删除接口并无本质上的差别,但是如果多个节点同时进行更新,CAS并发操作必然会有一个节点成功,当发现已经有节点操作成功,则当前节点其实并不需要再做过多的操作,直接返回即可

// 省略非核心代码
func (s *store) GuaranteedUpdate(
    ctx context.Context, key string, out runtime.Object, ignoreNotFound bool,
    preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, suggestion ...runtime.Object) error {
    // 获取当前key的最新数据
    getCurrentState := func() (*objState, error) {
        startTime := time.Now()
        getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
        metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime)
        if err != nil {
            return nil, err
        }
        return s.getState(getResp, key, v, ignoreNotFound)
    }

    // 获取当前数据
    var origState *objState
    var mustCheckData bool
    if len(suggestion) == 1 && suggestion[0] != nil {
        // 如果提供了建议的数据,则会使用,
        origState, err = s.getStateFromObject(suggestion[0])
        if err != nil {
            return err
        }
        //但是需要检测数据
        mustCheckData = true
    } else {
        // 尝试重新获取数据
        origState, err = getCurrentState()
        if err != nil {
            return err
        }
    }

    transformContext := authenticatedDataString(key)
    for {
        // 检查对象是否已经更新, 主要是通过检测uuid/revision来实现
        if err := preconditions.Check(key, origState.obj); err != nil {
            // If our data is already up to date, return the error
            if !mustCheckData {
                return err
            }
            // 如果检查数据一致性错误,则需要重新获取
            origState, err = getCurrentState()
            if err != nil {
                return err
            }
            mustCheckData = false
            // Retry
            continue
        }

        // 删除当前的版本数据revision
        ret, ttl, err := s.updateState(origState, tryUpdate)
        if err != nil {
            // If our data is already up to date, return the error
            if !mustCheckData {
                return err
            }

            // It's possible we were working with stale data
            // Actually fetch
            origState, err = getCurrentState()
            if err != nil {
                return err
            }
            mustCheckData = false
            // Retry
            continue
        }

        // 编码数据
        data, err := runtime.Encode(s.codec, ret)
        if err != nil {
            return err
        }
        if !origState.stale && bytes.Equal(data, origState.data) {
            // 如果我们发现我们当前的数据与获取到的数据一致,则会直接跳过
            if mustCheckData {
                origState, err = getCurrentState()
                if err != nil {
                    return err
                }
                mustCheckData = false
                if !bytes.Equal(data, origState.data) {
                    // original data changed, restart loop
                    continue
                }
            }
            if !origState.stale {
                // 直接返回数据
                return decode(s.codec, s.versioner, origState.data, out, origState.rev)
            }
        }

        // 砖汉数据
        newData, err := s.transformer.TransformToStorage(data, transformContext)
        if err != nil {
            return storage.NewInternalError(err.Error())
        }

        opts, err := s.ttlOpts(ctx, int64(ttl))
        if err != nil {
            return err
        }
        trace.Step("Transaction prepared")

        startTime := time.Now()
        // 事务更新数据
        txnResp, err := s.client.KV.Txn(ctx).If(
            clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev),
        ).Then(
            clientv3.OpPut(key, string(newData), opts...),
        ).Else(
            clientv3.OpGet(key),
        ).Commit()
        metrics.RecordEtcdRequestLatency("update", getTypeName(out), startTime)
        if err != nil {
            return err
        }
        trace.Step("Transaction committed")
        if !txnResp.Succeeded {
            // 重新获取数据
            getResp := (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
            klog.V(4).Infof("GuaranteedUpdate of %s failed because of a conflict, going to retry", key)
            origState, err = s.getState(getResp, key, v, ignoreNotFound)
            if err != nil {
                return err
            }
            trace.Step("Retry value restored")
            mustCheckData = false
            continue
        }
        // 获取put响应
        putResp := txnResp.Responses[0].GetResponsePut()

        return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
    }
}复制代码

6. 未曾讲到的地方

transformer的实现和注册地方我并没有找到,只看到了几个覆盖资源类型的地方,还有list/watch接口,后续再继续学习,今天就先到这里,下次再见

微信号:baxiaoshi2020 998959c1ff7881630ebd5e5159151dd3.jpg

关注公告号阅读更多源码分析文章 4656d4c311a561b234e475beae198576.jpg

更多文章关注 www.sreguide.com

本文由博客一文多发平台 OpenWrite 发布


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK