

Golang分布式应用之etcd
source link: https://qingwave.github.io/golang-distributed-system-x-etcd/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Golang分布式应用之etcd
etcd是一个可靠的分布式KV存储数据库,由CoreOS开源。Kuberentes使用etcd作为其存储引擎,随着云原生的火热,etcd也逐渐广泛应用起来。
etcd除了作为普通的KV存储、配置存储,还可以用在以下分布式场景中:
- 分布式队列
- 分布式系统协调
本文结合Golang来编写对应的中间件,所有代码见https://github.com/qingwave/gocorex
在分布式系统中,如何能找到所需要访问的服务即服务发现。服务较少时可以直接访问其IP,但随着业务规模的扩大,维护其地址越来越复杂,如果服务频繁的扩缩容,必须能够实时感应服务的断点变化。 通常有多种方式可以解决
- 系统级别,如LVS、DNS、Kubernetes中的Service、Istio等
- 微服务注册中心,如Spring Cloud中的Enruka,Dubbo等
- 借助分布式协调系统etcd、ZK、Consul等
服务发现提供的功能包括:
- 服务注册、注销
- 服务宕机或异常时,自动注销
- 感知服务端点变化
借助etcd实现服务发现
- 可以通过将端点写同一个目录(相同前缀,如/services/job/endpoint1, /services/job/endpoint2),并通过Lease设置一个过期时间,不断刷新Lease,如果服务宕机,Lease过期对应端点会自动删除
- 通过Watch API可以监听端点变化
主要代码如下
func New(config EtcdDiscoveryConfig) (*EtcdDiscovery, error) {
// 创建session,session会自动续约
session, err := concurrency.NewSession(config.Client, concurrency.WithTTL(config.TTLSeconds))
if err != nil {
return nil, err
}
config.Prefix = strings.TrimSuffix(config.Prefix, "/") + "/"
return &EtcdDiscovery{
EtcdDiscoveryConfig: config,
session: session,
myKey: config.Prefix + config.Key,
services: make(map[string]string),
}, nil
}
func (d *EtcdDiscovery) Register(ctx context.Context) error {
lease := d.session.Lease()
// 注册服务
_, err := d.Client.Put(ctx, d.myKey, d.Val, clientv3.WithLease(lease))
return err
}
func (d *EtcdDiscovery) UnRegister(ctx context.Context) error {
// 注销服务
_, err := d.Client.Delete(ctx, d.myKey)
return err
}
// 监听端点变化
func (d *EtcdDiscovery) Watch(ctx context.Context) error {
// context用来停止监听
d.watchContext, d.watchCancel = context.WithCancel(ctx)
// 首先获取所有端点
resp, err := d.Client.Get(d.watchContext, d.Prefix, clientv3.WithPrefix())
services := make(map[string]string)
for _, kv := range resp.Kvs {
services[string(kv.Key)] = string(kv.Value)
}
d.setServices(services)
// 回调点,用户可自定义
if d.Callbacks.OnStartedDiscovering != nil {
d.Callbacks.OnStartedDiscovering(d.ListServices())
}
defer func() {
if d.Callbacks.OnStoppedDiscovering != nil {
d.Callbacks.OnStoppedDiscovering()
}
}()
defer d.watchCancel()
// 监听目录,通过WithPrefix可以添加子目录变化
ch := d.Client.Watch(d.watchContext, d.Prefix, clientv3.WithPrefix())
for {
select {
case <-d.watchContext.Done():
return nil
case wr, ok := <-ch:
if !ok {
return fmt.Errorf("watch closed")
}
if wr.Err() != nil {
return wr.Err()
}
// 将添加事件同步到本地端点列表
for _, ev := range wr.Events {
key, val := string(ev.Kv.Key), string(ev.Kv.Value)
switch ev.Type {
case mvccpb.PUT:
d.addService(key, val)
case mvccpb.DELETE:
d.delService(key)
}
if d.Callbacks.OnServiceChanged != nil {
event := DiscoveryEvent{Type: mvccpb.Event_EventType_name[int32(ev.Type)], Service: d.serviceFromKv(key, val)}
d.Callbacks.OnServiceChanged(d.ListServices(), event)
}
}
}
}
}
主要实现逻辑如下:
- 创建Session, Session中Lease会自动续约
- 服务注册时,在目录下创建对应的子目录,并附带Lease
- 通过Watch接口监听目录变化,同步到本地
简单测试下,通过worker模拟不同的端点
func main() {
client, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
DialTimeout: 3 * time.Second,
})
if err != nil {
log.Fatalf("failed to create etcd lock: %v", err)
}
defer client.Close()
worker := func(i int, run bool) {
id := fmt.Sprintf("worker-%d", i)
val := fmt.Sprintf("10.0.0.%d", i)
sd, err := etcdiscovery.New(etcdiscovery.EtcdDiscoveryConfig{
Client: client,
Prefix: "/services",
Key: id,
Val: val,
TTLSeconds: 2,
Callbacks: etcdiscovery.DiscoveryCallbacks{
OnStartedDiscovering: func(services []etcdiscovery.Service) {
log.Printf("[%s], onstarted, services: %v", id, services)
},
OnStoppedDiscovering: func() {
log.Printf("[%s], onstoped", id)
},
OnServiceChanged: func(services []etcdiscovery.Service, event etcdiscovery.DiscoveryEvent) {
log.Printf("[%s], onchanged, services: %v, event: %v", id, services, event)
},
},
})
if err != nil {
log.Fatalf("failed to create service etcdiscovery: %v", err)
}
defer sd.Close()
if !run {
if sd.UnRegister(context.Background()); err != nil {
log.Fatalf("failed to unregister service [%s]: %v", id, err)
}
return
}
if err := sd.Register(context.Background()); err != nil {
log.Fatalf("failed to register service [%s]: %v", id, err)
}
if err := sd.Watch(context.Background()); err != nil {
log.Fatalf("failed to watch service: %v", err)
}
}
wg := group.NewGroup()
for i := 0; i < 3; i++ {
id := i
wg.Go(func() { worker(id, true) })
}
go func() {
time.Sleep(2 * time.Second)
worker(3, true)
}()
// unregister
go func() {
time.Sleep(4 * time.Second)
worker(2, false)
}()
wg.Wait()
}
通过结果可以看到,服务可以正常的注册注销,并能实时监听端点变化
2022/08/08 08:44:02 [worker-1], onstarted, services: [{/services/worker-1 worker-1 10.0.0.1} {/services/worker-2 worker-2 10.0.0.2} {/services/worker-0 worker-0 10.0.0.0}]
2022/08/08 08:44:02 [worker-2], onstarted, services: [{/services/worker-0 worker-0 10.0.0.0} {/services/worker-1 worker-1 10.0.0.1} {/services/worker-2 worker-2 10.0.0.2}]
2022/08/08 08:44:02 [worker-0], onstarted, services: [{/services/worker-0 worker-0 10.0.0.0} {/services/worker-1 worker-1 10.0.0.1} {/services/worker-2 worker-2 10.0.0.2}]
2022/08/08 08:44:04 [worker-2], onchanged, services: [{/services/worker-0 worker-0 10.0.0.0} {/services/worker-1 worker-1 10.0.0.1} {/services/worker-2 worker-2 10.0.0.2} {/services/worker-3 worker-3 10.0.0.3}], event: {PUT {/services/worker-3 worker-3 10.0.0.3}}
2022/08/08 08:44:04 [worker-1], onchanged, services: [{/services/worker-0 worker-0 10.0.0.0} {/services/worker-1 worker-1 10.0.0.1} {/services/worker-2 worker-2 10.0.0.2} {/services/worker-3 worker-3 10.0.0.3}], event: {PUT {/services/worker-3 worker-3 10.0.0.3}}
2022/08/08 08:44:04 [worker-0], onchanged, services: [{/services/worker-3 worker-3 10.0.0.3} {/services/worker-0 worker-0 10.0.0.0} {/services/worker-1 worker-1 10.0.0.1} {/services/worker-2 worker-2 10.0.0.2}], event: {PUT {/services/worker-3 worker-3 10.0.0.3}}
2022/08/08 08:44:04 [worker-3], onstarted, services: [{/services/worker-0 worker-0 10.0.0.0} {/services/worker-1 worker-1 10.0.0.1} {/services/worker-2 worker-2 10.0.0.2} {/services/worker-3 worker-3 10.0.0.3}]
2022/08/08 08:44:06 [worker-1], onchanged, services: [{/services/worker-0 worker-0 10.0.0.0} {/services/worker-1 worker-1 10.0.0.1} {/services/worker-3 worker-3 10.0.0.3}], event: {DELETE {/services/worker-2 worker-2 }}
2022/08/08 08:44:06 [worker-3], onchanged, services: [{/services/worker-3 worker-3 10.0.0.3} {/services/worker-0 worker-0 10.0.0.0} {/services/worker-1 worker-1 10.0.0.1}], event: {DELETE {/services/worker-2 worker-2 }}
2022/08/08 08:44:06 [worker-0], onchanged, services: [{/services/worker-0 worker-0 10.0.0.0} {/services/worker-1 worker-1 10.0.0.1} {/services/worker-3 worker-3 10.0.0.3}], event: {DELETE {/services/worker-2 worker-2 }}
2022/08/08 08:44:06 [worker-2], onchanged, services: [{/services/worker-0 worker-0 10.0.0.0} {/services/worker-1 worker-1 10.0.0.1} {/services/worker-3 worker-3 10.0.0.3}], event: {DELETE {/services/worker-2 worker-2 }}
在ECTD官方库go.etcd.io/etcd/client/v3/concurrency中,已经支持分布式锁。
主要原理与之前通过Redis实现的分布式锁类似,如果目录创建成功则加锁成功,解锁直接删除即可。
etcd锁的使用
// 创建session并不断刷新
session, err := concurrency.NewSession(client, concurrency.WithTTL(2*time.Second))
if err != nil {
return nil, err
}
mutex := concurrency.NewMutex(session, config.Prefix)
mutex.Lock()
defer mutext.UnLock()
do()....
加锁的核心逻辑如下
func (m *Mutex) tryAcquire(ctx context.Context) (*v3.TxnResponse, error) {
s := m.s
client := m.s.Client()
m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
// put self in lock waiters via myKey; oldest waiter holds lock
put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
// reuse key in case this session already holds the lock
get := v3.OpGet(m.myKey)
// fetch current holder to complete uncontended path with only one RPC
getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
if err != nil {
return nil, err
}
m.myRev = resp.Header.Revision
if !resp.Succeeded {
m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
}
return resp, nil
}
tryAcquire通过事务来执行加锁逻辑:
- 判断当前Key是否为空,即代码中Revision为0
- 如果为空,使用Put设置并附加Lease
- 如果不为空,获取当前锁的所有者,即最先加锁的对象,避免惊群效应
func (m *Mutex) Lock(ctx context.Context) error {
resp, err := m.tryAcquire(ctx)
if err != nil {
return err
}
// if no key on prefix / the minimum rev is key, already hold the lock
ownerKey := resp.Responses[1].GetResponseRange().Kvs
if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
m.hdr = resp.Header
return nil
}
client := m.s.Client()
_, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
// release lock key if wait failed
if werr != nil {
m.Unlock(client.Ctx())
return werr
}
// make sure the session is not expired, and the owner key still exists.
gresp, werr := client.Get(ctx, m.myKey)
return nil
}
Lock方法会一直阻塞,直到获取锁返回执行出错:
- 调用tryAcquire
- 如果已经加锁成功,或者已经加过锁(可重入),则直接返回
- 调用waitDeletes方法,等待所有小于当前Revsion的Key删除
分布式选主
对于有状态的服务,为了提供其服务水平SLA减少宕机时间,通过会有多个副本,当主节点宕机时,副本节点可以快速切换。
通过etcd可以实现选主服务,与分布式比较类似
- 选主成功,不断上报心跳
- 通过Watch接口,当节点失效时,去竞争主(类似加锁过程)
在ECTD官方库go.etcd.io/etcd/client/v3/concurrency中,已经支持了分布式选主。
选主核心逻辑如下
func (e *Election) Campaign(ctx context.Context, val string) error {
s := e.session
client := e.session.Client()
k := fmt.Sprintf("%s%x", e.keyPrefix, s.Lease())
txn := client.Txn(ctx).If(v3.Compare(v3.CreateRevision(k), "=", 0))
txn = txn.Then(v3.OpPut(k, val, v3.WithLease(s.Lease())))
txn = txn.Else(v3.OpGet(k))
resp, err := txn.Commit()
if err != nil {
return err
}
e.leaderKey, e.leaderRev, e.leaderSession = k, resp.Header.Revision, s
if !resp.Succeeded {
kv := resp.Responses[0].GetResponseRange().Kvs[0]
e.leaderRev = kv.CreateRevision
if string(kv.Value) != val {
if err = e.Proclaim(ctx, val); err != nil {
e.Resign(ctx)
return err
}
}
}
_, err = waitDeletes(ctx, client, e.keyPrefix, e.leaderRev-1)
if err != nil {
// clean up in case of context cancel
select {
case <-ctx.Done():
e.Resign(client.Ctx())
default:
e.leaderSession = nil
}
return err
}
e.hdr = resp.Header
return nil
}
以上逻辑与ECTD锁中的实现非常相似
- 开启事务,首先判断当前服务Key是否存在
- 不存在,通过Put设置对应值
- 存在获得当前目录最小Revision的值,即当前主节点
- 通过waitDeletes,直到当前进程的Revision
简单封装下,支持回调,参考了Kubernetes的选主实现
func New(config LeaderElectionConfig) (*EctdLeaderElection, error) {
session, err := concurrency.NewSession(config.Client, concurrency.WithTTL(config.LeaseSeconds))
if err != nil {
return nil, err
}
election := concurrency.NewElection(session, config.Prefix)
return &EctdLeaderElection{
LeaderElectionConfig: config,
session: session,
election: election,
}, nil
}
// 运行选主
func (le *EctdLeaderElection) Run(ctx context.Context) error {
defer func() {
le.Callbacks.OnStoppedLeading()
}()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// 添加选主变化
go le.observe(ctx)
// 开始选主
if err := le.election.Campaign(ctx, le.Identity); err != nil {
return err
}
// 选主完成,运行OnStarted,运行结束则退出选主
le.Callbacks.OnStartedLeading(ctx)
return nil
}
// 监听Key变化,执行回调
func (le *EctdLeaderElection) observe(ctx context.Context) {
if le.Callbacks.OnNewLeader == nil {
return
}
ch := le.election.Observe(ctx)
for {
select {
case <-ctx.Done():
return
case resp, ok := <-ch:
if !ok {
return
}
if len(resp.Kvs) == 0 {
continue
}
leader := string(resp.Kvs[0].Value)
if leader != le.Identity {
go le.Callbacks.OnNewLeader(leader)
}
}
}
}
func (le *EctdLeaderElection) Close() error {
return le.session.Close()
}
测试选主服务
func main() {
client, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
DialTimeout: 3 * time.Second,
})
if err != nil {
log.Fatalf("failed to create etcd lock: %v", err)
}
defer client.Close()
prefix := "/worker/election"
worker := func(i int) {
id := fmt.Sprintf("worker-%d", i)
le, err := leaderelection.New(leaderelection.LeaderElectionConfig{
Client: client,
LeaseSeconds: 15,
Prefix: prefix,
Identity: id,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
log.Printf("OnStarted[%s]: acquire new leader", id)
time.Sleep(3 * time.Second)
log.Printf("OnStarted[%s]: worker done", id)
},
OnStoppedLeading: func() {
log.Printf("OnStopped[%s]: exit", id)
},
OnNewLeader: func(identity string) {
log.Printf("OnNewLeader[%s]: new leader %s", id, identity)
},
},
})
if err != nil {
log.Fatalf("failed to create leader election: %v", err)
}
defer le.Close()
le.Run(context.Background())
}
wg := sync.WaitGroup{}
for i := 1; i <= 3; i++ {
wg.Add(1)
id := i
go func() {
defer wg.Done()
worker(id)
}()
}
wg.Wait()
}
2022/08/08 09:33:32 OnNewLeader[worker-2]: new leader worker-3
2022/08/08 09:33:32 OnNewLeader[worker-1]: new leader worker-3
2022/08/08 09:33:32 OnStarted[worker-3]: acquire new leader
2022/08/08 09:34:02 OnStarted[worker-3]: worker done
2022/08/08 09:34:02 OnStopped[worker-3]: exit
2022/08/08 09:34:02 OnStarted[worker-2]: acquire new leader
2022/08/08 09:34:02 OnNewLeader[worker-1]: new leader worker-2
2022/08/08 09:34:32 OnStarted[worker-2]: worker done
2022/08/08 09:34:32 OnStopped[worker-2]: exit
2022/08/08 09:34:32 OnStarted[worker-1]: acquire new leader
2022/08/08 09:35:02 OnStarted[worker-1]: worker done
2022/08/08 09:35:02 OnStopped[worker-1]: exit
本文主要j结合Golang总结了etcd中服务发现、分布式锁、选主等实现方式。
本文所有代码见https://github.com/qingwave/gocorex,欢迎批评指正。
Explore more in https://qingwave.github.io
Recommend
-
33
gRPC是Google出品,支持多种语言,但是国内安装会有点问题,下面整理一下,方便今后配环境的复习。 安装grpc go get google.golang.org/grpc 结果出现了如下错误: package google.golang.or...
-
16
Redis系列(十三)应用之分布式锁 Posted1 by 呼延十 on March 15, 2020...
-
6
etcd分布式锁内存泄露 ...
-
6
为什么需要分布式锁?在单进程的系统中,当存在多个线程可以同时改变某个变量时,就需要对变量或代码块做同步,使其在修改这种变量时能够线性执行消除并发修改变量。而同步本质上通过锁来实现。为了实现多个线程在一个时刻同一个代码块只能有一个线程...
-
6
分布式锁是控制分布式系统之间同步访问共享资源的一种方式。在分布式系统中,常常需要协调他们的动作。如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些资源的时候,往往需要互斥来防止彼此干扰来保证一致性,在这...
-
8
Golang分布式应用之定时任务 Jul 28, 2022 在系统开发中,有一类任务不是立即执行,而是在未来某个时间点或者按照一定间隔去执行,比如日志定期压缩、报表制作、过期数据清理等,这就...
-
8
Golang分布式应用之Redis Jul 22, 2022 Redis作是一个高性能的内存数据库,常被应用于分布式系统中,除了作为分布式缓存或简单的内存数据库还有一些特殊的应用场景,本文结合Golang来...
-
7
Golang分布式应用之ZooKeeper Aug 08, 2022 ZooKeeper是Apache下一个开源项目,提供分布式配置、同步服务以及命名注册等,是一个高可靠的分布式协调系统。 其应用场景与etcd类...
-
5
在分布式文件存储中,我们通常会面临集群选主,配置共享和节点状态监控的问题。通过 etcd(基于 Raft 协议))可以实现超大规模集群的管理,以及多节点的服务可靠性。今天,我们就聊聊 etcd 在分布式存储中的具体应用。什么是 etcd ?
-
10
深入了解分布式键值存储etcd 解道Jdon ...
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK