20

从源码说一说zookeeper的watcher机制 - fredal的博客

 4 years ago
source link: https://fredal.xin/zookeeper-watcher?
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

从源码说一说zookeeper的watcher机制

更新于 2020-09-01   |   条评论   |   热度 °C

我们可以使用 zookeeper 作为注册中心来实现服务的注册与发现,curator 框架提供了 curator-x-discovery 扩展实现了开箱即用的服务注册发现,但更多时候我们还是选择自己去实现,那这个时候我们需要额外关注 zookeeper 的 1 个特性,即 wathcer。

在微服务场景中,watcher 机制主要提供了服务通知功能,比如 Instance1 这个实例在 Service1 服务节点下注册了一个 emphemeral 子节点后,它的某个服务消费者根据依赖配置在 Service1 节点上注册了一个子节点 watcher,就如图中的红钥匙。子节点类型的 watcher 会观测 Service1 的子节点,即 InstanceX 节点,但不会观测孙子节点 config1。那么当 Instance1 节点挂掉之后,watcher 可以做到通知给注册自身的那个服务消费者,通知完一次后 wacther 也就被销毁了。

2019-12-31-120542.png

wacther 原理框架

2019-12-31-120441.png

zookeeper 的 watcher 主要由 client、server 以及 watchManager 之间配合完成,包括 watcher 注册以及触发 2 个阶段。

在 client 端注册表为 ZkWatchManager,其中包括了 dataWatches、existWatches 以及 childWatches。在 server 端的注册表在 DataTree 类中,封装了 2 类 WatchManager,即 dataWatches 和 existWatches。dataWatches 代表当前节点的数据监听,childWathes 代表子节点监听,与 client 比少的 existWatches 也很容易理解,因为节点是否存在需要客户端去判断。

注册阶段客户端的 getData 和 exists 请求可以注册 dataWatches,getChilden 可以注册 childWatches。而触发阶段,setData 请求会触发当前节点 dataWatches,create 请求会触发当前节点 dataWatches 以及父节点的 childWatches,delete 请求则会触发当前节点、父节点、子节点的 dataWatches,以及父节点的 childWatches。

watchManager包含两个非常重要的数据结构:watchTable和watch2Paths。前者表示path-set<watcher>,后者表示watcher-set<path>。注意这里的watcher含义表示远程连接,所以watchTable表示一个目录下可能有多个消费者的监听连接,而watch2Paths表示一个消费者可能会对多个目录建立监听,显然多目录的监听会复用一个连接。

请求阶段的传输数据(包括 watcher 信息)会封装在 request 和 response 中,比如 getData 请求会封装 getDataRequest/getDataResponse。而触发阶段的 watcher 通知则通过事件 event 进行通信,server 端会发送一个 watcherEvent,而 client 端则会将其转换成 watchedEvent 再进行处理。

每个客户端都会维护 2 个线程,SendThread 负责处理客户端与服务端的请求通信,比如发送 getDataRequest,而 EventThread 则负责处理服务端的事件通知,即 watcher 的事件。

watcher 注册源码

我们来看看 watcher 注册的部分源码。首先是在客户端,以 Zookeeper 中 getData 方法为例,会入队一个 watch 为 true 的 packet。

public byte[] getData(final String path, Watcher watcher, Stat stat)
      throws KeeperException, InterruptedException
   {
      ...
      GetDataRequest request = new GetDataRequest();
      request.setPath(serverPath);
      request.setWatch(watcher != null);
      GetDataResponse response = new GetDataResponse();
      ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
      ...
  }

可以看到这边封装了 GetDataRequest 以及 GetDataResponse,而 request 中设置了 watch 参数为 true,最后将其进行 submitRequest,submitRequest 干的事儿其实就是将这些放入事件队列等待 sendThread 调度发送。

接着这个请求会被服务端所接收到,所有请求的服务端处理都在 FinalRequestProcessor#processRequest 方法中进行。

case OpCode.getData: {
              lastOp = "GETD";
              GetDataRequest getDataRequest = new GetDataRequest();
              ...
              byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
                      getDataRequest.getWatch() ? cnxn : null);
              ...
          }

这边会通过一些 case 来判断请求类型,还是以 getData 为例,最终会调用到 DataTree 的 getData 方法,我们之前讲到 DataTree 里包含了 2 种 watcher,那这边除了获取数据外,自然是注册 dataWatchers 了。

 public byte[] getData(String path, Stat stat, Watcher watcher) throws NoNodeException {
        DataNode n = (DataNode)this.nodes.get(path);
        if (n == null) {
            throw new NoNodeException();
        } else {
            synchronized(n) {
                n.copyStat(stat);
                if (watcher != null) {
                    this.dataWatches.addWatch(path, watcher);
                }
                return n.data;
            }
        }
    }

addWatch 方法主要是将数据节点的路径以及 ServerCnxn(远程通信信息) 信息存储到 WatchManager 的 watchTable 和 watch2Paths 中。至此服务端已经接受到了 watcher 并注册到了 watchManager 中了。

我们将客户端自己也会保存一个 watchManager,这里其实是在接收到 getData 响应后进行的,在 ClientCnxn$SendThread 类的 readResponse->finishPacket 方法中。

 private void finishPacket(ClientCnxn.Packet p) {
        if (p.watchRegistration != null) {
            p.watchRegistration.register(p.replyHeader.getErr());
        }

        if (p.cb == null) {
            synchronized(p) {
                p.finished = true;
                p.notifyAll();
            }
        } else {
            p.finished = true;
            this.eventThread.queuePacket(p);
        }

    }

可以看到这边调用了 watchRegistration 的 register 方法,而它就是根据请求类型来装入对应的 watchManager 中了(dataWatches、existWatches、childWatches)。

整个大致的时序图可以参考下面:

2020-01-01-090255.png

watcher 触发源码

wathcer 触发部分,我们还以 服务端 DataTree 类处理 setData 请求 为例。

public Stat setData(String path, byte data[], int version, long zxid,
           long time) throws KeeperException.NoNodeException {
       ...
       dataWatches.triggerWatch(path, EventType.NodeDataChanged);
       return s;
   }

可以看到在处理完数据后调用了 triggerWatch,它干的事儿是从之前的 watchManager 中获得 watchers,然后一个个调用 process 方法。

public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
       WatchedEvent e = new WatchedEvent(type,
               KeeperState.SyncConnected, path);
       HashSet<Watcher> watchers;
       synchronized (this) {
           watchers = watchTable.remove(path);
           if (watchers == null || watchers.isEmpty()) {
               if (LOG.isTraceEnabled()) {
                   ZooTrace.logTraceMessage(LOG,
                           ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                           "No watchers for " + path);
               }
               return null;
           }
           for (Watcher w : watchers) {
               HashSet<String> paths = watch2Paths.get(w);
               if (paths != null) {
                   paths.remove(path);
               }
           }
       }
       for (Watcher w : watchers) {
           if (supress != null && supress.contains(w)) {
               continue;
           }
           w.process(e);
       }
       return watchers;
   }

获取了需要本次触发的监听后,在 watchTable 和 watch2Paths 中还移除了自身,所以 watcher 是单次的。这里封装好了 watchedEvent 后塞入到了 Watcher的process 方法中,process 方法其实就是发送通知,以 Watcher的一个实现类NioServerCnxn 为例就是调用了其 sendResponse 方法将通知事件发送到客户端,发送前会将 watchedEvent 转换成 watcherEvent 进行发送。

那么客户端首先接收到请求的仍然是 ClientCnxn$sendThread 的 readResponse 方法,这里讲 watcherEvent 转换为 watchedEvent 后入列 eventThread 的事件队列 等待后续进行处理。

        ...
        WatchedEvent we = new WatchedEvent(event);
        if (ClientCnxn.LOG.isDebugEnabled()) {
            ClientCnxn.LOG.debug("Got " + we + " for sessionid 0x" + Long.toHexString(ClientCnxn.this.sessionId));
        }

        ClientCnxn.this.eventThread.queueEvent(we);
        ...

我们直接看下 EventThread 的 run 方法吧,方法很简单,就是不断从 waitingEvents 事件队列中取通知事件。然后调用 processEvent 方法处理事件。

private void processEvent(Object event) {
       try {
           if (event instanceof WatcherSetEventPair) {
               // each watcher will process the event
               WatcherSetEventPair pair = (WatcherSetEventPair) event;
               for (Watcher watcher : pair.watchers) {
                   try {
                       watcher.process(pair.event);
                   } catch (Throwable t) {
                       LOG.error("Error while calling watcher ", t);
                   }
               }
           } else {
                 ...省略


           }

这里就是简单地取出本次事件需要通知的 watcher 集合,然后循环调用每个 watcher 的 process 方法了。那么在自己实现服务注册发现的场景里,显然 watcher 的 process 方法是我们自定义的啦。

整个 watcher 触发的时序图可以参考下面:

2020-01-01-090355.png

至此,zookeeper 的整个 watcher 交互逻辑就已经结束了。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK