0

Dubbo之服务消费原理

 2 years ago
source link: https://ytao.top/2020/03/08/17-dubbo-reference/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

上篇文章《Dubbo之服务暴露》分析 Dubbo 服务是如何暴露的,本文接着分析 Dubbo 服务的消费流程。主要从以下几个方面进行分析:注册中心的暴露通过注册中心进行服务消费通知直连服务进行消费
服务消费端启动时,将自身的信息注册到注册中心的目录,同时还订阅服务提供方的目录,当服务提供方的 URL 发生更改时,实时获取新的数据。

服务消费端流程

下面是一个服务消费的流程图:

上图中可以看到,服务消费的流程与服务暴露的流程有点类似逆向的。同样,Dubbo 服务也是分为两个大步骤:第一步就是将远程服务通过Protocol转换成Invoker(概念在上篇文章中有解释)。第二步通过动态代理将Invoker转换成消费服务需要的接口。

org.apache.dubbo.config.ReferenceConfig类是ReferenceBean的父类,与生产端服务的ServiceBean一样,存放着解析出来的 XML 和注解信息。类关系如下:

服务初始化中转换的入口

当我们消费端调用本地接口就能实现远程服务的调用,这是怎么实现的呢?根据上面的流程图,来分析消费原理。
在消费端进行初始化时ReferenceConfig#init,会执行ReferenceConfig#createProxy来完成这一系列操作。以下为ReferenceConfig#createProxy主要的代码部分:

private T createProxy(Map<String, String> map) {
// 判断是否为 Jvm 本地引用
if (shouldJvmRefer(map)) {
// 通过 injvm 协议,获取本地服务
URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
invoker = REF_PROTOCOL.refer(interfaceClass, url);
} else {
urls.clear();
// 判断是否有自定义的直连地址,或注册中心地址
if (url != null && url.length() > 0) {
String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (StringUtils.isEmpty(url.getPath())) {
url = url.setPath(interfaceName);
}
if (UrlUtils.isRegistry(url)) {
// 如果是注册中心Protocol类型,则向地址中添加 refer 服务消费元数据
urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
} else {
// 直连服务提供端
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
} else {
// 组装注册中心的配置
if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {
// 检查配置中心
checkRegistry();
List<URL> us = ConfigValidationUtils.loadRegistries(this, false);
if (CollectionUtils.isNotEmpty(us)) {
for (URL u : us) {
URL monitorUrl = ConfigValidationUtils.loadMonitor(this, u);
if (monitorUrl != null) {
// 监控上报信息
map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
// 注册中心地址添加 refer 服务消费元数据
urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
}
}
}
}

// 只有一条注册中心数据,即单注册中心
if (urls.size() == 1) {
// 将远程服务转化成 Invoker
invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
} else {
// 因为多注册中心就会存在多个 Invoker,这里用保存在 List 中
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
// 将每个注册中心转换成 Invoker 数据
invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
if (UrlUtils.isRegistry(url)) {
// 会覆盖前遍历的注册中心,使用最后一条注册中心数据
registryURL = url;
}
}
if (registryURL != null) {
// 默认使用 zone-aware 策略来处理多个订阅
URL u = registryURL.addParameterIfAbsent(CLUSTER_KEY, ZoneAwareCluster.NAME);
// 将转换后的多个 Invoker 合并成一个
invoker = CLUSTER.join(new StaticDirectory(u, invokers));
} else {
invoker = CLUSTER.join(new StaticDirectory(invokers));
}
}
}
// 利用动态代理,将 Invoker 转换成本地接口代理
return (T) PROXY_FACTORY.getProxy(invoker);
}

上面转换的过程中,主要可概括为:先分为本地引用和远程引用两类。本地就是以 inJvm 协议的获取本地服务,这不做过多说明;远程引用分为直连服务和通过注册中心。注册中心分为单注册中心和多注册中心的情况,单注册中心好解决,直接使用即可,多注册中心时,将转换后的 Invoker 合并成一个 Invoker。最后通过动态代理将 Invoker 转换成本地接口代理。

获取 Invoker 实例

由于本地服务时直接从缓存中获取,这里就注册中心的消费进行分析,上面代码片段中使用的是REF_PROTOCOL.refer进行转换,该方法代码:

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
// 获取服务的注册中心url,里面会设置注册中心的协议和移除 registry 的参数
url = getRegistryUrl(url);
// 获取注册中心实例
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}

// 获取服务消费元数据
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
// 从服务消费元数据中获取分组信息
String group = qs.get(GROUP_KEY);
if (group != null && group.length() > 0) {
if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
// 执行 Invoker 转换工作
return doRefer(getMergeableCluster(), registry, type, url);
}
}
// 执行 Invoker 转换工作
return doRefer(cluster, registry, type, url);
}

上面主要是获取服务消费的注册中心实例和进行服务分组,最后调用doRefer方法进行转换工作,以下为doRefer的代码:

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
// 创建 RegistryDirectory 对象
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
// 设置注册中心
directory.setRegistry(registry);
// 设置协议
directory.setProtocol(protocol);
// directory.getUrl().getParameters() 是服务消费元数据
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
// 消费消息注册到注册中心
registry.register(directory.getRegisteredConsumerUrl());
}

directory.buildRouterChain(subscribeUrl);
// 服务消费者订阅:服务提供端,动态配置,路由的通知
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));

// 多个Invoker合并为一个
Invoker invoker = cluster.join(directory);
return invoker;
}

上面实现主要是完成创建 RegistryDirectory 对象,将消费服务元数据注册到注册中心,通过 RegistryDirectory 对象里的信息,实现服务提供端,动态配置及路由的订阅相关功能。

RegistryDirectory 这个类实现了 NotifyListener 这个通知监听接口,当订阅的服务,配置或路由发生变化时,会接收到通知,进行相应改变:

public synchronized void notify(List<URL> urls) {
// 将服务提供方配置,路由配置,服务提供方的服务分别以不同的 key 保存在 Map 中
Map<String, List<URL>> categoryUrls = urls.stream()
.filter(Objects::nonNull)
.filter(this::isValidCategory)
.filter(this::isNotCompatibleFor26x)
.collect(Collectors.groupingBy(url -> {
if (UrlUtils.isConfigurator(url)) {
return CONFIGURATORS_CATEGORY;
} else if (UrlUtils.isRoute(url)) {
return ROUTERS_CATEGORY;
} else if (UrlUtils.isProvider(url)) {
return PROVIDERS_CATEGORY;
}
return "";
}));

// 更新服务提供方配置
List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);

// 更新路由配置
List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
toRouters(routerURLs).ifPresent(this::addRouters);

// 加载服务提供方的服务信息
List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
/**
* 3.x added for extend URL address
*/
ExtensionLoader<AddressListener> addressListenerExtensionLoader = ExtensionLoader.getExtensionLoader(AddressListener.class);
List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null);
if (supportedListeners != null && !supportedListeners.isEmpty()) {
for (AddressListener addressListener : supportedListeners) {
providerURLs = addressListener.notify(providerURLs, getUrl(),this);
}
}
// 重新加载 Invoker 实例
refreshOverrideAndInvoker(providerURLs);
}

RegistryDirectory#notify里面最后会刷新 Invoker 进行重新加载,下面是核心代码的实现:

private void refreshOverrideAndInvoker(List<URL> urls) {
// mock zookeeper://xxx?mock=return null
overrideDirectoryUrl();
// 刷新 invoker
refreshInvoker(urls);
}

private void refreshInvoker(List<URL> invokerUrls) {
Assert.notNull(invokerUrls, "invokerUrls should not be null");

if (invokerUrls.size() == 1
&& invokerUrls.get(0) != null
&& EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {

......

} else {
// 刷新之前的 Invoker
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
// 加载新的 Invoker Map
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
// 获取新的 Invokers
List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
// 缓存新的 Invokers
routerChain.setInvokers(newInvokers);
this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
this.urlInvokerMap = newUrlInvokerMap;

try {
// 通过新旧 Invokers 对比,销毁无用的 Invokers
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
}
}

获取刷新前后的 Invokers,将新的 Invokers 重新缓存起来,通过对比,销毁无用的 Invoker。

上面将 URL 转换 Invoker 是在RegistryDirectory#toInvokers中进行。

private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>();

Set<String> keys = new HashSet<>();
String queryProtocols = this.queryMap.get(PROTOCOL_KEY);
for (URL providerUrl : urls) {

// 过滤消费端不匹配的协议,及非法协议
......

// 合并服务提供端配置数据
URL url = mergeUrl(providerUrl);
// 过滤重复的服务提供端配置数据
String key = url.toFullString();
if (keys.contains(key)) {
continue;
}
keys.add(key);

// 缓存键是不与使用者端参数合并的url,无论使用者如何合并参数,如果服务器url更改,则再次引用
Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);

// 缓存无对应 invoker,再次调用 protocol#refer 是否有数据
if (invoker == null) {
try {
boolean enabled = true;
if (url.hasParameter(DISABLED_KEY)) {
enabled = !url.getParameter(DISABLED_KEY, false);
} else {
enabled = url.getParameter(ENABLED_KEY, true);
}
if (enabled) {
invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
}
} catch (Throwable t) {
logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
}
// 将新的 Invoker 缓存起来
if (invoker != null) { // Put new invoker in cache
newUrlInvokerMap.put(key, invoker);
}
} else {
// 缓存里有数据,则进行重新覆盖
newUrlInvokerMap.put(key, invoker);
}
}
keys.clear();
return newUrlInvokerMap;
}

通过《Dubbo之服务暴露》和本文两篇文章对 Dubbo 服务暴露和服务消费原理的了解。我们可以看到,不管是暴露还是消费,Dubbo 都是以 Invoker 为数据交换主体进行,通过对 Invoker 发起调用,实现一个远程或本地的实现。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK