19

Dubbo系列笔记之服务引用过程

 3 years ago
source link: https://aysaml.com/articles/2020/09/08/1599558950363.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

100

一、引言

服务引用有 直连注册中心 两种方式,一般来说直连方式不推荐用于生产,仅提供测试或预发布的调试使用。所以本篇重点分析通过注册中心引用服务的过程。

二、服务引用的起点

Dubbo 服务引用的起点有两个,一般来说我们都是以 ReferenceBean 对应的服务注入形式使用,例如常用的注解形式 @DubboReference@Reference 注解在新版 Dubbo 中已废弃);另一种是 Spring 容器调用 ReferenceBean#afterPropertiesSet 方法时引用服务,这种方式需要配置 <dubbo:reference> 的 init 属性开启,Dubbo 默认使用第一种方式。

我们从 ReferenceBean 入手:

100

ReferenceBean 实现了 FactoryBeanInitializingBean

  • InitializingBean
    看一下 #afterPropertiesSet 方法:
@Override
 @SuppressWarnings({"unchecked"})
 public void afterPropertiesSet() throws Exception {

     // 初始化 Dubbo config bean
     prepareDubboConfigBeans();

     // lazy init by default.
     if (init == null) {
         init = false;
     }

     // eager init if necessary.
     if (shouldInit()) {
         getObject();
     }
  }
  • FactoryBean
    Spring 通过调用 #getBean 方法可以返回 bean 的实例,而实现了 FactoryBean 接口,会调用 #getObject 方法返回 bean 的实例。
@Override
    public Object getObject() {
	// 调用 ReferenceConfig 的 get 方法获取 bean 实例
        return get();
    }

比较上面两个方法,我们可以知道 Spring 在实例化 Dubbo 的 ReferenceBean 时会调用 ReferenceConfig#get 方法获取 bean 实例,执行 Dubbo 服务引用的过程。

三、配置检查处理

继续跟随 ReferenceConfig#get 方法:

public synchronized T get() {
        if (destroyed) {
            throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
        }
        // 若服务引用代理为空,执行 init 方法
        if (ref == null) {
            // 处理配置,调用 createProxy 生成代理类
            init();
        }
        return ref;
    }
  • ReferenceConfiginit( ) 方法
    public synchronized void init() {
          // 标识是否已经初始化,避免重复初始化
          if (initialized) {
              return;
          }
          // 获取 DubboBootstrap 引导类实例
          if (bootstrap == null) {
              bootstrap = DubboBootstrap.getInstance();
              bootstrap.init();
          }
          // 检查接口、consumer 等配置是否合法,并对相应的配置赋值
          checkAndUpdateSubConfigs();
          // 本地存根检查
          checkStubAndLocal(interfaceClass);
          ConfigValidationUtils.checkMock(interfaceClass, this);
    
          Map<String, String> map = new HashMap<String, String>();
          map.put(SIDE_KEY, CONSUMER_SIDE);
          // 加入运行时参数,Dubbo 版本号、时间戳、进程号等
          ReferenceConfigBase.appendRuntimeParameters(map);
          // 是否为泛化接口
          if (!ProtocolUtils.isGeneric(generic)) {
              String revision = Version.getVersion(interfaceClass, version);
              if (revision != null && revision.length() > 0) {
                  map.put(REVISION_KEY, revision);
              }
              // 获取接口的方法列表,加入 map
              String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
              if (methods.length == 0) {
                  logger.warn("No method found in service interface " + interfaceClass.getName());
                  map.put(METHODS_KEY, ANY_VALUE);
              } else {
                  map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), COMMA_SEPARATOR));
              }
          }
          map.put(INTERFACE_KEY, interfaceName);
          // 将 ApplicationConfig、ConsumerConfig、ReferenceConfig 等对象的字段信息添加到 map 中
          AbstractConfig.appendParameters(map, getMetrics());
          AbstractConfig.appendParameters(map, getApplication());
          AbstractConfig.appendParameters(map, getModule());
          AbstractConfig.appendParameters(map, consumer);
          AbstractConfig.appendParameters(map, this);
          Map<String, AsyncMethodInfo> attributes = null;
          if (CollectionUtils.isNotEmpty(getMethods())) {
              attributes = new HashMap<>();
              for (MethodConfig methodConfig : getMethods()) {
                  AbstractConfig.appendParameters(map, methodConfig, methodConfig.getName());
                  String retryKey = methodConfig.getName() + ".retry";
                  if (map.containsKey(retryKey)) {
                      String retryValue = map.remove(retryKey);
                      if ("false".equals(retryValue)) {
                          map.put(methodConfig.getName() + ".retries", "0");
                      }
                  }
                  AsyncMethodInfo asyncMethodInfo = AbstractConfig.convertMethodConfig2AsyncInfo(methodConfig);
                  if (asyncMethodInfo != null) {
                      attributes.put(methodConfig.getName(), asyncMethodInfo);
                  }
              }
          }
          // 从系统变量中获取服务消费者 ip
          String hostToRegistry = ConfigUtils.getSystemProperty(DUBBO_IP_TO_REGISTRY);
          if (StringUtils.isEmpty(hostToRegistry)) {
              hostToRegistry = NetUtils.getLocalHost();
          } else if (isInvalidLocalHost(hostToRegistry)) {
              throw new IllegalArgumentException("Specified invalid registry ip from property:" + DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);
          }
          map.put(REGISTER_IP_KEY, hostToRegistry);
    
          // 存储配置数据
          serviceMetadata.getAttachments().putAll(map);
          // 创建代理
          ref = createProxy(map);
    
          serviceMetadata.setTarget(ref);
          serviceMetadata.addAttribute(PROXY_CLASS_REF, ref);
          ConsumerModel consumerModel = repository.lookupReferredService(serviceMetadata.getServiceKey());
          consumerModel.setProxyObject(ref);
          consumerModel.init(attributes);
    
          initialized = true;
    
          // 发布 ReferenceConfigInitializedEvent 事件
          dispatch(new ReferenceConfigInitializedEvent(this, invoker));
      }

代码较长,主要是各种配置的检查和初始化,并收集这些信息加入 map 存储,以及创建代理。

四、引用服务

接着上面我们继续看 #createProxy 方法,其不仅执行创建代理的逻辑,同时还会调用其他方法创建、合并 Invoker 实例。

private T createProxy(Map<String, String> map) {
    // 判断是否本地暴露,包含指定服务 url 直连的情况判断、或根据参数配置是否进行本地暴露,如协议、scope、injvm 等
    if (shouldJvmRefer(map)) { // 本地引用
        // 创建 URL,协议为 njvm
        URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
        // 调用 refer 方法创建 InjvmInvoker 实例
        invoker = REF_PROTOCOL.refer(interfaceClass, url);
        if (logger.isInfoEnabled()) {
            logger.info("Using injvm service " + interfaceClass.getName());
        }
        // 远程引用
    } else {
        urls.clear();
        // 若 url 不为空
        if (url != null && url.length() > 0) {
            // 配置多个 url 时,用分号分隔
            String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
            if (us != null && us.length > 0) {
                for (String u : us) {
                    URL url = URL.valueOf(u);
                    if (StringUtils.isEmpty(url.getPath())) {
                        // 设置 url 路径为接口全限定名
                        url = url.setPath(interfaceName);
                    }
                    // 协议为 registry 时,指定注册中心
                    if (UrlUtils.isRegistry(url)) {
                        // 将 map 转换为查询字符串,赋值给 refer
                        urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
                    } else {
                        // 合并 url,移除服务提供者的一些配置(这些配置来源于用户配置的 url 属性),
                        // 比如线程池相关配置。并保留服务提供者的部分配置,比如版本,group,时间戳等
                        // 最后将合并后的配置设置为 url 查询字符串中。
                        urls.add(ClusterUtils.mergeUrl(url, map));
                    }
                }
            }
        } else { // 从注册中心的配置中组装 URL
            // 协议不是 injvm
            if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {
                checkRegistry();
                List<URL> us = ConfigValidationUtils.loadRegistries(this, false);
                if (CollectionUtils.isNotEmpty(us)) {
                    for (URL u : us) {
                        URL monitorUrl = ConfigValidationUtils.loadMonitor(this, u);
                        if (monitorUrl != null) {
                            map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                        }
                        // 添加 refer 参数到 url,并加入 urls 集合
                        urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
                    }
                }
                // 没有配置注册中心,抛出异常
                if (urls.isEmpty()) {
                    throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
                }
            }
        }
        // 只有一个注册中心或者服务提供者
        if (urls.size() == 1) {
            // 构建 invoker 实例
            invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
        } else { // 多个注册中心或多个服务提供者,或者两者混合
            List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
            URL registryURL = null;
            for (URL url : urls) {
                invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
                if (UrlUtils.isRegistry(url)) {
                    // 最后一个注册中心 URL
                    registryURL = url;
                }
            }
            if (registryURL != null) {
                // 如果注册中心链接不为空,则将使用 ZoneAwareCluster
                String cluster = registryURL.getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME);
                // 创建 StaticDirectory 实例,并由 Cluster 对多个 Invoker 进行合并
                invoker = Cluster.getCluster(cluster, false).join(new StaticDirectory(registryURL, invokers));
            } else { // not a registry url, must be direct invoke.
                String cluster = CollectionUtils.isNotEmpty(invokers)
                        ? (invokers.get(0).getUrl() != null ? invokers.get(0).getUrl().getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME) : Cluster.DEFAULT)
                        : Cluster.DEFAULT;
                invoker = Cluster.getCluster(cluster).join(new StaticDirectory(invokers));
            }
        }
    }

    if (shouldCheck() && !invoker.isAvailable()) {
        invoker.destroy();
        throw new IllegalStateException("Failed to check the status of the service "
                + interfaceName
                + ". No provider available for the service "
                + (group == null ? "" : group + "/")
                + interfaceName +
                (version == null ? "" : ":" + version)
                + " from the url "
                + invoker.getUrl()
                + " to the consumer "
                + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
    }
    if (logger.isInfoEnabled()) {
        logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
    }
    // create service proxy
    return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));
}

#createProxy 方法中,首先检查配置是否是本地暴露,如果是,则根据自适应扩展机制获取 InjvmProtocol,并调用 #refer 方法生成 InjvmInvoker 实例,完成服务引用。相反,则读取直连 url 配置,或读取注册中心 url ,并将其存储到 urls 集合中,根据 urls 的大小进行不同的处理。如果 urls 大小为 1,则直接根据自适应扩展调用调用 #refer 方法生成 invoker 。如果 urls 大于 1,则分别根据 url 生成 invoker,然后再通过 Cluster 合并多个 invoker ,最后调用 ProxyFactory 生成代理类。

五、创建 Invoker

讲到服务暴露时,我们同样分析了 Invoker 的创建过程。Invoker 作为 Dubbo 的通用模型,代表着一个可执行体。在服务提供者来看,Invoker 用于调用真实的服务实现类;而在服务消费者来看,Invoker 用于执行远程调用,在上面创建代理的方法中,我们注意到创建 Invoker 的一个关键方法 Protocol#refer(Class<T> type, URL url)

Protocol 的实现有很多,我们还是以常见的 DubboProtocol 和 RegistryProtocol 来分析 refer 方法如何构建 Invoker 。

1. DubboProtocol

DubboProtocol 继承了 AbstractProtocol 抽象类,从其 refer 方法入手:

@Override
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
    }

其调用了模板方法 #protocolBindingRefer(type, url)

回到 DubboProtocol#protocolBindingRefer(type, url) 方法:

@Override
    public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
        optimizeSerialization(url);

        // create rpc invoker.
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
        invokers.add(invoker);

        return invoker;
    }

上面代码非常简单,关注其中调用了 #getClients(url) 方法用于获取客户端实例,实例类型为 ExchangeClient。

ExchangeClient 实际上并不具备通信能力,它需要基于更底层的客户端实例进行通信。比如 NettyClient、MinaClient 等,默认情况下,Dubbo 使用 NettyClient 进行通信。

对于 DubboProtocol 的引用逻辑我们先大概了解这么多,关于集群、通信后面会详细说。下面再看 RegistryProtocol 的 refer 方法:

@Override
    @SuppressWarnings("unchecked")
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        // 通过参数获取 Registry 的协议,并将其设置为协议头
        url = getRegistryUrl(url);
        // 获取注册中心实例
        Registry registry = registryFactory.getRegistry(url);
        if (RegistryService.class.equals(type)) {
            return proxyFactory.getInvoker((T) registry, type, url);
        }

        // url 查询字符串转换为 map
        Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
        // 获取 group 配置
        String group = qs.get(GROUP_KEY);
        // 多个 group
        if (group != null && group.length() > 0) {
            if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
                return doRefer(Cluster.getCluster(MergeableCluster.NAME), registry, type, url);
            }
        }

        Cluster cluster = Cluster.getCluster(qs.get(CLUSTER_KEY));
        return doRefer(cluster, registry, type, url);
    }

继续看 #doRefer 方法:

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        // 创建 RegistryDirectory 实例
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        // 设置注册中心和协议
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        // all attributes of REFER_KEY
        Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
        // 创建服务消费者 URL
        URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
        if (directory.isShouldRegister()) {
            directory.setRegisteredConsumerUrl(subscribeUrl);
            registry.register(directory.getRegisteredConsumerUrl());
        }
        directory.buildRouterChain(subscribeUrl);
        directory.subscribe(toSubscribeUrl(subscribeUrl));
        // 将多个服务提供者合并
        Invoker<T> invoker = cluster.join(directory);
        // 注册中心此时没有其他服务提供者
        List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
        if (CollectionUtils.isEmpty(listeners)) {
            return invoker;
        }
        // 多个服务提供者时,通过 Wrapper 包裹,并通知 RegistryProtocol 的监听器
        RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker);
        for (RegistryProtocolListener listener : listeners) {
            listener.onRefer(this, registryInvokerWrapper);
        }
        return registryInvokerWrapper;
    }

如此,生成 Invoker 创建完毕,再根据服务接口生成代理对象,便可执行远程调用,生成代理的部分逻辑和上篇服务暴露的入口一致,即 ProxyFactory 的 getProxy 方法,感兴趣的小伙伴可自行查看。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK