23

etcd实现服务发现和注册

 4 years ago
source link: https://studygolang.com/articles/24456
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

原理

etcd实现服务发现和注册,使用的是kv存储、租约、watch.

向etcd 注册 该服务(其实就是 存一个值)然后向etcd 发送心跳,当etcd 没有检测到心跳就会 把这个键值对 删了(这整个动作是etcd里的租约模式),网关那边 就只需要 watch 这个 key ,就能够知道 所有服务的所有动态了. 注册的时候可以使用前缀这样在watch的时候可以watch所有的服务器.

服务注册

  1. 租约模式,客户端申请一个租约设置过期时间,keepalive没个固定时间进行租约续期,通过租约存储key.过期不续租则etcd会删除租约上的所有key
  2. 相同服务存储的key的前缀可以设置成一样
  3. 注册服务就是向服务端使用租约模式写入一个key
package main

import (
    "context"
    "fmt"
    "time"
    "go.etcd.io/etcd/clientv3"
)

//创建租约注册服务
type ServiceRegister struct {
    etcdClient    *clientv3.Client //etcd client
    lease         clientv3.Lease //租约
    leaseResp     *clientv3.LeaseGrantResponse //设置租约时间返回
    canclefunc    func() //租约撤销
    //租约keepalieve相应chan
    keepAliveChan <-chan  *clientv3.LeaseKeepAliveResponse
    key           string //注册的key
}

func NewServiceRegister(addr []string, timeNum int64) (*ServiceReg, error) {
    conf := clientv3.Config{
        Endpoints:   addr,
        DialTimeout: 5 * time.Second,
    }

    var (
        client *clientv3.Client
    )

    //连接etcd
    if clientTem, err := clientv3.New(conf); err == nil {
        etcdClient = clientTem
    } else {
        return nil, err
    }

    ser := &ServiceRegister{
        etcdClient: client,
    }

    //申请租约设置时间keepalive
    if err := ser.setLease(timeNum); err != nil {
        return nil, err
    }
    
    //监听续租相应chan
    go ser.ListenLeaseRespChan()
    return ser, nil
}

//设置租约
func (this *ServiceRegister) setLease(timeNum int64) error {
    //申请租约
    lease := clientv3.NewLease(this.etcdClient)

    //设置租约时间
    leaseResp, err := lease.Grant(context.TODO(), timeNum)
    if err != nil {
        return err
    }

    //设置续租 定期发送需求请求
    ctx, cancelFunc := context.WithCancel(context.TODO())
    leaseRespChan, err := lease.KeepAlive(ctx, leaseResp.ID)

    if err != nil {
        return err
    }

    this.lease = lease
    this.leaseResp = leaseResp
    this.canclefunc = cancelFunc
    this.keepAliveChan = leaseRespChan
    return nil
}

//监听 续租情况
func (this *ServiceRegister) ListenLeaseRespChan() {
    for {
        select {
        case leaseKeepResp := <-this.keepAliveChan:
            if leaseKeepResp == nil {
                fmt.Printf("已经关闭续租功能\n")
                return
            } else {
                fmt.Printf("续租成功\n")
            }
        }
    }
}

//通过租约 注册服务
func (this *ServiceRegister) PutService(key, val string) error {
    //带租约的模式写入数据即注册服务
    kv := clientv3.NewKV(this.etcdClient)
    _, err := kv.Put(context.TODO(), key, val, clientv3.WithLease(this.leaseResp.ID))
    return err
}


//撤销租约
func (this *ServiceRegister) RevokeLease() error {
    this.canclefunc()
    time.Sleep(2 * time.Second)
    _, err := this.lease.Revoke(context.TODO(), this.leaseResp.ID)
    return err
}

func main() {
    ser,_ := NewServiceRegister([]string{"127.0.0.1:2379"}, 5)
    ser.PutService("/server/node1", "node1")
    select{}
}

服务发现

  1. 创建一个client 连到etcd.
  2. 匹配到所有相同前缀的 key. 存储信息到本地
  3. watch这个key前缀,当有增加或者删除的时候就修改本地
  4. 本地维护server的列表
import (
    "go.etcd.io/etcd/clientv3"
    "time"
    "context"
    "go.etcd.io/etcd/mvcc/mvccpb"
    "sync"
    "log"
)

type ServiceDiscovery struct {
    client        *clientv3.Client
    serverList    map[string]string
    lock          sync.Mutex
}

func NewServiceDiscovery (addr []string)( *ServiceDiscovery, error){
    conf := clientv3.Config{
        Endpoints:   addr,
        DialTimeout: 5 * time.Second,
    }
    if client, err := clientv3.New(conf); err == nil {
        return &ClientDis{
            client:client,
            serverList:make(map[string]string),
        }, nil
    } else {
        return nil ,err
    }
}


func (this * ServiceDiscovery) GetService(prefix string) ([]string ,error){
    //使用key前桌获取所有的etcd上所有的server
    resp, err := this.client.Get(context.Background(), prefix, clientv3.WithPrefix())
    if err != nil {
        return nil, err
    }
    //解析出所有的server放入本地
    addrs := this.extractAddrs(resp)

    //warch server前缀 将变更写入本地
    go this.watcher(prefix)
    return addrs ,nil
}

// 监听key前缀
func (this *ServiceDiscovery) watcher(prefix string) {
    //监听 返回监听事件chan
    rch := this.client.Watch(context.Background(), prefix, clientv3.WithPrefix())
    for wresp := range rch {
        for _, ev := range wresp.Events {
            switch ev.Type {
            case mvccpb.PUT: //修改或者新增
                this.SetServiceList(string(ev.Kv.Key),string(ev.Kv.Value))
            case mvccpb.DELETE: //删除
                this.DelServiceList(string(ev.Kv.Key))
            }
        }
    }
}

func (this *ServiceDiscovery) extractAddrs(resp *clientv3.GetResponse) []string {
    addrs := make([]string,0)
    if resp == nil || resp.Kvs == nil {
        return addrs
    }
    for i := range resp.Kvs {
        if v := resp.Kvs[i].Value; v != nil {
            this.SetServiceList(string(resp.Kvs[i].Key),string(resp.Kvs[i].Value))
            addrs = append(addrs, string(v))
        }
    }
    return addrs
}

func (this *ServiceDiscovery) SetServiceList(key,val string) {
    this.lock.Lock()
    defer this.lock.Unlock()
    this.serverList[key] = string(val)
    log.Println("set data key :",key,"val:",val)
}

func (this *ServiceDiscovery) DelServiceList(key string) {
    this.lock.Lock()
    defer this.lock.Unlock()
    delete(this.serverList,key)
    log.Println("del data key:", key)
}


func (this *ServiceDiscovery) SerList2Array()[]string {
    this.lock.Lock()
    defer this.lock.Unlock()
    addrs := make([]string,0)

    for _, v := range this.serverList {
        addrs = append(addrs,v)
    }
    return addrs
}

func main () {
    cli,_ := NewServiceDiscovery([]string{"127.0.0.1:2379"})
    cli.GetService("/server")
    select {}
}

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK