4

Dubbo源码(五) - 服务目录 - 王谷雨

 1 year ago
source link: https://www.cnblogs.com/konghuanxi/p/16531775.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Dubbo源码(五) - 服务目录

本文基于Dubbo2.6.x版本,中文注释版源码已上传github:xiaoguyu/dubbo

今天,来聊聊Dubbo的服务目录(Directory)。下面是官方文档对服务目录的定义:

服务目录中存储了一些和服务提供者有关的信息,通过服务目录,服务消费者可获取到服务提供者的信息,比如 ip、端口、服务协议等。

服务目录持有Invoker对象集合,Dubbo的服务调用均由Invoker发起。

当服务提供者信息发生变化时(比如某一个服务挂了),服务目录也需要动态调整。

服务目录目前内置的实现有两个,分别为 StaticDirectory 和 RegistryDirectory。它们均继承自AbstractDirectory,而 AbstractDirectory 实现了 Directory 接口。Directory 接口提供了list(Invocation invocation) 方法,这个方法就是用来获取 invoker 集合的。

再看 RegistryDirectory 实现了 NotifyListener 接口,这个接口中只有一个方法,notify(List urls),当注册中心节点信息发生变化后,触发此方法调整服务目录中的配置信息以及 invoker 集合。

上面我们讲了,服务调用需求用到 invoker,而服务目录持有 invoker 集合,并通过 list 方法提供 invoker。下面放上服务消费者Demo中DemoService#sayHello 方法的调用路径

AbstractDirectory 实现了 Directory 接口的 list 方法

public List<Invoker<T>> list(Invocation invocation) throws RpcException { if (destroyed) { throw new RpcException("Directory already destroyed .url: " + getUrl()); } // 调用 doList 方法列举 Invoker,doList 是模板方法,由子类实现 List<Invoker<T>> invokers = doList(invocation); // 获取路由 Router 列表 List<Router> localRouters = this.routers; // local reference if (localRouters != null && !localRouters.isEmpty()) { for (Router router : localRouters) { try { // 获取 runtime 参数,并根据参数决定是否进行路由 if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) { // 进行服务路由 invokers = router.route(invokers, getConsumerUrl(), invocation); } } catch (Throwable t) { logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t); } } } return invokers;}

此方法就两段逻辑:

  1. 通过 doList 获取 invoker 集合
  2. 通过路由选择合适的 invoker

路由非本文重点,略过。

doList 是模板方法,由子类实现。

StaticDirectory

StaticDirectory 是一个静态服务目录,其 invokers 集合通过构造方法注入,不应被改变。

// StaticDirectory的doList啥都没做,直接返回持有的invokersprotected List<Invoker<T>> doList(Invocation invocation) throws RpcException { // 列举 Inovker,也就是直接返回 invokers 成员变量 return invokers;}

StaticDirectory 的其它方法就不分析了,同样很简单。

RegistryDirectory

RegistryDirectory 是动态调整的服务目录,其持有的 invokers 有内部方法生成。

在上篇博文《Dubbo源码(四) - 服务引用(消费者)》中,我留了一个坑,也就是服务引用过程中,创建了注册中心之后,如何订阅节点数据。在RegistryProtocol#doRefer方法中。

其中调用了RegistryDirectory#subscribe(URL url)方法

public void subscribe(URL url) { setConsumerUrl(url); registry.subscribe(url, this);}

我们用的注册中心是 zookeeper,所以 registry 是 ZookeeperRegistry,而 subscribe 方法的实现在其父类FailbackRegistry

public void subscribe(URL url, NotifyListener listener) { super.subscribe(url, listener); removeFailedSubscribed(url, listener); try { // Sending a subscription request to the server side doSubscribe(url, listener); } catch (Exception e) { ...... // 订阅失败处理 addFailedSubscribed(url, listener); }}

模板方法,调用子类的 doSubscribe 方法

protected void doSubscribe(final URL url, final NotifyListener listener) { try { if (Constants.ANY_VALUE.equals(url.getServiceInterface())) { ... } else { List<URL> urls = new ArrayList<URL>(); // 切割路径(providers、configurators、routers等) for (String path : toCategoriesPath(url)) { ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url); if (listeners == null) { zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>()); listeners = zkListeners.get(url); } // 缓存操作,获取节点监听器 ChildListener zkListener = listeners.get(listener); if (zkListener == null) { listeners.putIfAbsent(listener, new ChildListener() { @Override public void childChanged(String parentPath, List<String> currentChilds) { // 这里和方法末尾的 notify(url, listener, urls); 是调用的同一个方法 // 节点变更时触发变更操作 ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)); } }); zkListener = listeners.get(listener); } zkClient.create(path, false); // 注册节点监听器 List<String> children = zkClient.addChildListener(path, zkListener); if (children != null) { urls.addAll(toUrlsWithEmpty(url, path, children)); } } // 触发节点变更操作 notify(url, listener, urls); } } catch (Throwable e) { throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); }}

订阅方法做了3个操作:

  1. 切割url,拆分订阅路径
  2. 创建节点监听器
  3. 触发节点变更操作

这里注意下,订阅时节点数据并没有发生变更,所以需要手动触发 notify 方法。

下面继续看节点变更操作做了什么,调用路径有点深,就不一步一步调试了,直接把路径写在注释上。

// FailbackRegistry#notify(URL url, NotifyListener listener, List<URL> urls) ->// FailbackRegistry#doNotify(URL url, NotifyListener listener, List<URL> urls) ->// AbstractRegistry#notify(URL url, NotifyListener listener, List<URL> urls)protected void notify(URL url, NotifyListener listener, List<URL> urls) { ...... Map<String, List<URL>> result = new HashMap<String, List<URL>>(); // 将urls按分类分组转成map ...... for (Map.Entry<String, List<URL>> entry : result.entrySet()) { String category = entry.getKey(); List<URL> categoryList = entry.getValue(); categoryNotified.put(category, categoryList); saveProperties(url); listener.notify(categoryList); }}

此处的listener变量,就是本节的主角RegistryDirectory,下面来分析 listener.notify(categoryList)

public synchronized void notify(List<URL> urls) { // 定义三个集合,分别用于存放服务提供者 url,路由 url,配置器 url List<URL> invokerUrls = new ArrayList<URL>(); List<URL> routerUrls = new ArrayList<URL>(); List<URL> configuratorUrls = new ArrayList<URL>(); // 根据 category 参数分别对3种url进行处理 ...... // 刷新 Invoker 列表 refreshInvoker(invokerUrls);}

此方法分别对服务提供者 url,路由 url,配置器 url各自进行了处理,这里我省略了对路由 url 和配置器 url 的处理,感兴趣的自行去看源码。咱们聚焦在 Invoker 的处理中

private void refreshInvoker(List<URL> invokerUrls) { // invokerUrls 仅有一个元素,且 url 协议头为 empty,此时表示禁用所有服务 if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) { // 设置 forbidden 为 true this.forbidden = true; // Forbid to access this.methodInvokerMap = null; // Set the method invoker map to null // 销毁所有 Invoker destroyAllInvokers(); // Close all invokers } else { this.forbidden = false; // Allow to access Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) { // 添加缓存 url 到 invokerUrls 中 invokerUrls.addAll(this.cachedInvokerUrls); } else { this.cachedInvokerUrls = new HashSet<URL>(); // 缓存 invokerUrls this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison } if (invokerUrls.isEmpty()) { return; } // 将 url 转成 Invoker Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map // 将 newUrlInvokerMap 转成方法名到 Invoker 列表的映射 Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map // state change // If the calculation is wrong, it is not processed. // 转换出错,直接打印异常,并返回 if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) { logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString())); return; } // 合并多个组的 Invoker this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap; this.urlInvokerMap = newUrlInvokerMap; try { // 销毁无用 Invoker destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker } catch (Exception e) { logger.warn("destroyUnusedInvokers error. ", e); } }}

此方法中的逻辑有点多,

  1. 判断是否要销毁所有 invoker
  2. 创建 invoker
  3. 销毁无用 invoker

我们关注下 invoker 的创建,toInvokers(invokerUrls)

private Map<String, Invoker<T>> toInvokers(List<URL> urls) { ...... // 获取服务消费端配置的协议 String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY); for (URL providerUrl : urls) { ...... // 将本地 Invoker 缓存赋值给 localUrlInvokerMap Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key); if (invoker == null) { // Not in the cache, refer again try { boolean enabled = true; if (url.hasParameter(Constants.DISABLED_KEY)) { // 获取 disable 配置,取反,然后赋值给 enable 变量 enabled = !url.getParameter(Constants.DISABLED_KEY, false); } else { // 获取 enable 配置,并赋值给 enable 变量 enabled = url.getParameter(Constants.ENABLED_KEY, true); } if (enabled) { // 调用 refer 获取 Invoker invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl); } } catch (Throwable t) { logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t); } if (invoker != null) { // Put new invoker in cache // 缓存 Invoker 实例 newUrlInvokerMap.put(key, invoker); } // 缓存命中 } else { // 将 invoker 存储到 newUrlInvokerMap 中 newUrlInvokerMap.put(key, invoker); } } keys.clear(); return newUrlInvokerMap;}

这里的判断有点复杂,会对协议各种判断(是否支持、是否为empty)等,然后如果缓存未命中,则需要创建invoker,也就是protocol.refer(serviceType, url)这一段代码。

此时,我们上一篇文章留下的另一个坑也填上了,也就是DubboProtocol#refer的调用时机。

获取invoker集合

public List<Invoker<T>> doList(Invocation invocation) { ...... List<Invoker<T>> invokers = null; // 获取 Invoker 本地缓存 Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) { // 获取方法名和参数列表 String methodName = RpcUtils.getMethodName(invocation); Object[] args = RpcUtils.getArguments(invocation); // 检测参数列表的第一个参数是否为 String 或 enum 类型 if (args != null && args.length > 0 && args[0] != null && (args[0] instanceof String || args[0].getClass().isEnum())) { // 通过 方法名 + 第一个参数名称 查询 Invoker 列表,具体的使用场景暂时没想到 invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // The routing can be enumerated according to the first parameter } if (invokers == null) { // 通过方法名获取 Invoker 列表 invokers = localMethodInvokerMap.get(methodName); } if (invokers == null) { // 通过星号 * 获取 Invoker 列表 invokers = localMethodInvokerMap.get(Constants.ANY_VALUE); } // 冗余逻辑,pull request #2861 移除了下面的 if 分支代码 if (invokers == null) { Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator(); if (iterator.hasNext()) { invokers = iterator.next(); } } } // 返回 Invoker 列表 return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;}

这里的逻辑也很简单,就是从类变量 methodInvokerMap 中获取invoker,所有我们需要去看看 methodInvokerMap 的赋值。

我们在上一小节的 refreshInvoker 方法中,讲了 invoker 的生成。refreshInvoker 方法中还有对methodInvokerMap 的处理。也就是 toMethodInvokers(newUrlInvokerMap) 方法

这里面会将 url-invoker 的映射转成 方法名-invoker 的映射。

Dubbo的服务调用,需要通过服务目录拿到 invoker 才能发起。当注册中心发生变化时,服务目录同样需要动态调整,并刷新持有的 invoker 集合。服务目录是 Dubbo 集群容错的一部分,也是比较基础的部分。

PS:以上讲的不包含本地服务调用,别杠


参考资料

Dubbo开发指南


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK