58

Dubbo 源码解析五 —— 集群容错

 5 years ago
source link: http://wsccoder.top/2018/11/23/Dubbo-源码解析五-集群容错/?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

欢迎来我的 Star Followers 后期后继续更新Dubbo别的文章

目录

  • 面试中集群容错的经常的问题
  • Dubbo 官方文档关于集群容错的介绍
  • Dubbo集群容错的架构分析
  • Dubbo集群容错源码解析

面试中集群容错的经常的问题

  • 什么是集群容错
  • Dubbo的集群容错知道吗
  • Dubbo 集群容错是如何配置的
  • 集群容错如何实现的
  • Dubbo 集群容错介绍下
  • 介绍下 几种集群容错方式,分析下其优缺点
  • 你来设计一个容错算法,你会怎样的设计

Dubbo 官方文档关于集群容错的介绍

在集群调用失败时,Dubbo 提供了多种容错方案,缺省为 failover 重试。

veY3y2I.jpg!web

各节点关系:

  • 这里的 InvokerProvider 的一个可调用 Service 的抽象, Invoker 封装了 Provider 地址及 Service 接口信息
  • Directory 代表多个 Invoker ,可以把它看成 List<Invoker> ,但与 List 不同的是,它的值可能是动态变化的,比如注册中心推送变更
  • ClusterDirectory 中的多个 Invoker 伪装成一个 Invoker ,对上层透明,伪装过程包含了容错逻辑,调用失败后,重试另一个
  • Router 负责从多个 Invoker 中按路由规则选出子集,比如读写分离,应用隔离等
  • LoadBalance 负责从多个 Invoker 中选出具体的一个用于本次调用,选的过程包含了负载均衡算法,调用失败后,需要重选

集群容错模式

可以自行扩展集群容错策略,参见: 集群扩展

Failover Cluster

失败自动切换,当出现失败,重试其它服务器 [ 1] 。通常用于读操作,但重试会带来更长延迟。可通过 retries="2" 来设置重试次数(不含第一次)。

重试次数配置如下:

<dubbo:service retries="2" />

<dubbo:reference retries="2" />

<dubbo:reference>
    <dubbo:method name="findFoo" retries="2" />
</dubbo:reference>

Failfast Cluster

快速失败,只发起一次调用,失败立即报错。通常用于非幂等性的写操作,比如新增记录。

Failsafe Cluster

失败安全,出现异常时,直接忽略。通常用于写入审计日志等操作。

Failback Cluster

失败自动恢复,后台记录失败请求,定时重发。通常用于消息通知操作。

Forking Cluster

并行调用多个服务器,只要一个成功即返回。通常用于实时性要求较高的读操作,但需要浪费更多服务资源。可通过 forks="2" 来设置最大并行数。

Broadcast Cluster

广播调用所有提供者,逐个调用,任意一台报错则报错 [ 2] 。通常用于通知所有提供者更新缓存或日志等本地资源信息。

集群模式配置

按照以下示例在服务提供方和消费方配置集群模式

<dubbo:service cluster="failsafe" />

<dubbo:reference cluster="failsafe" />

Dubbo集群容错的架构分析

通过官网上这张图我们能大致的了解到一个请求过来,在集群中的调用过程。那么我们就根据这个调用过程来进行分析吧。

3MFzeqF.png!web

整个在调用的过程中 这三个关键词接下来会贯穿全文,他们就是 Directory , Router , LoadBalance

我们只要牢牢的抓住这几个关键字就能贯穿整个调用链

先看下时序图,来看下调用的过程

RjAFfuA.png!web

最初我们一个方法调用

我们使用的是官方的dubbo-demo dubbo-demo-consumer

public static void main(String[] args) {
	DemoService demoService = (DemoService) context.getBean("demoService"); // get remote service proxy
    String hello = demoService.sayHello("world"); // call remote method
    System.out.println(hello); // get result
   
}

调用 InvokerInvocationHandler#invoker方法代理类的调用

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    String methodName = method.getName();
    Class<?>[] parameterTypes = method.getParameterTypes();
    if (method.getDeclaringClass() == Object.class) {
        return method.invoke(invoker, args);
    }
    if ("toString".equals(methodName) && parameterTypes.length == 0) {
        return invoker.toString();
    }
    if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
        return invoker.hashCode();
    }
    if ("equals".equals(methodName) && parameterTypes.length == 1) {
        return invoker.equals(args[0]);
    }

    RpcInvocation invocation;
    if (RpcUtils.hasGeneratedFuture(method)) {
        Class<?> clazz = method.getDeclaringClass();
        String syncMethodName = methodName.substring(0, methodName.length() - Constants.ASYNC_SUFFIX.length());
        Method syncMethod = clazz.getMethod(syncMethodName, method.getParameterTypes());
        invocation = new RpcInvocation(syncMethod, args);
        invocation.setAttachment(Constants.FUTURE_GENERATED_KEY, "true");
        invocation.setAttachment(Constants.ASYNC_KEY, "true");
    } else {
        invocation = new RpcInvocation(method, args);
        if (RpcUtils.hasFutureReturnType(method)) {
            invocation.setAttachment(Constants.FUTURE_RETURNTYPE_KEY, "true");
            invocation.setAttachment(Constants.ASYNC_KEY, "true");
        }
    }
    //这里使用的是动态代理的方式获取到指定的代理类
   // <1>
    return invoker.invoke(invocation).recreate();
}

**1 执行 invoke 就要开始进入MockClusterInvoker#invoker **

public Result invoke(Invocation invocation) throws RpcException {
    Result result = null;
    // 获得 “mock” 配置项,有多种配置方式
    String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
    //【第一种】无 mock
    if (value.length() == 0 || value.equalsIgnoreCase("false")) {
        //no mock
        // 调用原 Invoker ,发起 RPC 调用
        // 调用 invoker方法,进入到集群也就是CLuster类中
        //<2>
        result = this.invoker.invoke(invocation);
    //【第二种】强制服务降级
    } else if (value.startsWith("force")) {
        if (logger.isWarnEnabled()) {
            logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());
        }
        //force:direct mock
        // 直接调用 Mock Invoker ,执行本地 Mock 逻辑
        result = doMockInvoke(invocation, null);
    } else {
        //fail-mock
        try {
            // 【第三种】失败服务降级
            result = this.invoker.invoke(invocation);
        } catch (RpcException e) {
            // 业务性异常,直接抛出
            if (e.isBiz()) {
                throw e;
            } else {
                if (logger.isWarnEnabled()) {
                    logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e);
                }
                // 失败后,调用 Mock Invoker ,执行本地 Mock 逻辑
                result = doMockInvoke(invocation, e);
            }
        }
    }
    return result;
}

**2 进入到 invoke 就要开始进入到集群,也就是 Cluster **

/**
 * 调用服务提供者
 * @param invocation
 * @return
 * @throws RpcException
 */
@Override
public Result invoke(final Invocation invocation) throws RpcException {
    // 校验是否销毁
    checkWhetherDestroyed();

    //TODO
    // binding attachments into invocation.
    Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
    if (contextAttachments != null && contextAttachments.size() != 0) {
        ((RpcInvocation) invocation).addAttachments(contextAttachments);
    }

    // 获得所有服务提供者 Invoker 集合
    // <下面的list方法>
    List<Invoker<T>> invokers = list(invocation);
    // 获得 LoadBalance 对象
    LoadBalance loadbalance = initLoadBalance(invokers, invocation);
    // 设置调用编号,若是异步调用
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    // 执行调用
    return doInvoke(invocation, invokers, loadbalance);
}
/**
 *  获得所有服务提供者 Invoker 集合
 * @param invocation
 * @return
 * @throws RpcException
 */
protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
    // 通过directory 进入到 AbstractDirectory 中选择 directory
    //<3 进入3 里面>
    return directory.list(invocation);
}

3 进入到 AbstractDirectory 进行 directory的选择

/**
 * 获得所有服务 Invoker 集合
 * @param invocation
 * @return Invoker 集合
 * @throws RpcException
 */
@Override
public List<Invoker<T>> list(Invocation invocation) throws RpcException {
    //当销毁时抛出异常
    if (destroyed) {
        throw new RpcException("Directory already destroyed .url: " + getUrl());
    }
    // 获得所有 Invoker 集合
    // <4 RegistryDirectory 选择 invoker>
    List<Invoker<T>> invokers = doList(invocation);
    //根据路由规则,筛选Invoker集合
    List<Router> localRouters = this.routers; // local reference 本地引用,避免并发问题
    if (localRouters != null && !localRouters.isEmpty()) {
        for (Router router : localRouters) {
            try {
                if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {
                  
                    invokers = router.route(invokers, getConsumerUrl(), invocation);
                }
            } catch (Throwable t) {
                logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
            }
        }
    }
      //< 6 获取 router 即将进入 MockInvokersSelector 类中>
    return invokers;
}

调用list方法会进入到RegistryDirectory#doList

/**
 *  获得对应的 Invoker 集合。
 * @param invocation
 * @return
 */
@Override
public List<Invoker<T>> doList(Invocation invocation) {
    if (forbidden) {
        // 1. No service provider 2. Service providers are disabled
        throw new RpcException(RpcException.FORBIDDEN_EXCEPTION,
                "No provider available from registry " + getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " + NetUtils.getLocalHost()
                        + " use dubbo version " + Version.getVersion() + ", please check status of providers(disabled, not registered or in blacklist).");
    }
    List<Invoker<T>> invokers = null;
    //从methodInvokerMap中取出invokers
    Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference
    // 获得 Invoker 集合
    if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
        // 获得方法名、方法参数
        String methodName = RpcUtils.getMethodName(invocation);
        Object[] args = RpcUtils.getArguments(invocation);
        // 【第一】可根据第一个参数枚举路由
        if (args != null && args.length > 0 && args[0] != null
                && (args[0] instanceof String || args[0].getClass().isEnum())) {
            invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // The routing can be enumerated according to the first parameter
        }
        // 【第二】根据方法名获得 Invoker 集合
        if (invokers == null) {
            invokers = localMethodInvokerMap.get(methodName);
        }
        // 【第三】使用全量 Invoker 集合。例如,`#$echo(name)` ,回声方法
        if (invokers == null) {
            invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
        }
        // 【第四】使用 `methodInvokerMap` 第一个 Invoker 集合。防御性编程。
        if (invokers == null) {
            Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
            if (iterator.hasNext()) {
                invokers = iterator.next();
            }
        }
    }
    return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
}

6 进入 MockInvokersSelector 类中根据路由规则拿到正常执行的invokers

/**
 * ,根据 "invocation.need.mock" 路由匹配对应类型的 Invoker 集合:
 * @param invokers Invoker 集合
 * @param url        refer url
 * @param invocation
 * @param <T>
 * @return
 * @throws RpcException
 */
@Override
public <T> List<Invoker<T>> route(final List<Invoker<T>> invokers,
                                  URL url, final Invocation invocation) throws RpcException {
    // 获得普通 Invoker 集合
    if (invocation.getAttachments() == null) {
        //<7> 拿到能正常执行的invokers,并将其返回.也就是序号7
        return getNormalInvokers(invokers);
    } else {
        // 获得 "invocation.need.mock" 配置项
        String value = invocation.getAttachments().get(Constants.INVOCATION_NEED_MOCK);
        // 获得普通 Invoker 集合
        if (value == null)
            return getNormalInvokers(invokers);
        // 获得 MockInvoker 集合
        else if (Boolean.TRUE.toString().equalsIgnoreCase(value)) {
            return getMockedInvokers(invokers);
        }
    }
    // 其它,不匹配,直接返回 `invokers` 集合
    return invokers;
}
/**
 * 获得普通 Invoker 集合
 * @param invokers
 * @param <T>
 * @return
 */
private <T> List<Invoker<T>> getNormalInvokers(final List<Invoker<T>> invokers) {
    // 不包含 MockInvoker 的情况下,直接返回 `invokers` 集合
    if (!hasMockProviders(invokers)) {
        return invokers;
    } else {
        // 若包含 MockInvoker 的情况下,过滤掉 MockInvoker ,创建普通 Invoker 集合
        List<Invoker<T>> sInvokers = new ArrayList<Invoker<T>>(invokers.size());
        for (Invoker<T> invoker : invokers) {
            if (!invoker.getUrl().getProtocol().equals(Constants.MOCK_PROTOCOL)) {
                sInvokers.add(invoker);
            }
        }
        return sInvokers;
    }
}

**8 拿到 invoker 返回到 AbstractClusterInvoker 这个类 **

对于上面的这些步骤,主要用于做两件事

  • Directory 中找出本次集群中的全部 invokers
  • Router 中,将上一步的全部 invokers 挑选出能正常执行的 invokers

在 时序图的序号5和序号7处,做了上诉的处理。

在有多个集群的情况下,而且两个集群都是正常的,那么到底需要执行哪个?

AbstractClusterInvoker#invoke

/**
 * 调用服务提供者
 * @param invocation
 * @return
 * @throws RpcException
 */
@Override
public Result invoke(final Invocation invocation) throws RpcException {
    // 校验是否销毁
    checkWhetherDestroyed();

    //TODO
    // binding attachments into invocation.
    Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
    if (contextAttachments != null && contextAttachments.size() != 0) {
        ((RpcInvocation) invocation).addAttachments(contextAttachments);
    }

    // 获得所有服务提供者 Invoker 集合
    List<Invoker<T>> invokers = list(invocation);
    // 获得 LoadBalance 对象
    LoadBalance loadbalance = initLoadBalance(invokers, invocation);
    // 设置调用编号,若是异步调用
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    // 执行调用
    return doInvoke(invocation, invokers, loadbalance);
}
/**
 *  实现子 Cluster 的 Invoker 实现类的服务调用的差异逻辑,
 * @param invocation
 * @param invokers
 * @param loadbalance
 * @return
 * @throws RpcException
 */
 // 抽象方法,子类自行的实现  因为我们使用的默认配置,所以 我们将会是FailoverClusterInvoker 这个类
 //< 8 doInvoker 方法>
protected abstract Result doInvoke(Invocation invocation, List<Invoker<T>> invokers,
                                   LoadBalance loadbalance) throws RpcException;

8 进入到 相应的集群容错方案 类中 因为我们使用的默认配置,所以 我们将会是FailoverClusterInvoker 这个类

/**
 * 实际逻辑很简单:循环,查找一个 Invoker 对象,进行调用,直到成功
 * @param invocation
 * @param invokers
 * @param loadbalance
 * @return
 * @throws RpcException
 */
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    List<Invoker<T>> copyinvokers = invokers;
    // 检查copyinvokers即可用Invoker集合是否为空,如果为空,那么抛出异常
    checkInvokers(copyinvokers, invocation);
    String methodName = RpcUtils.getMethodName(invocation);
    // 得到最大可调用次数:最大可重试次数+1,默认最大可重试次数Constants.DEFAULT_RETRIES=2
    int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
    if (len <= 0) {
        len = 1;
    }

    // retry loop.
    // 保存最后一次调用的异常
    RpcException le = null; // last exception.
    List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
    Set<String> providers = new HashSet<String>(len);
    // failover机制核心实现:如果出现调用失败,那么重试其他服务器
    for (int i = 0; i < len; i++) {
        //Reselect before retry to avoid a change of candidate `invokers`.
        //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
        // 重试时,进行重新选择,避免重试时invoker列表已发生变化.
        // 注意:如果列表发生了变化,那么invoked判断会失效,因为invoker示例已经改变
        if (i > 0) {
            checkWhetherDestroyed();
            // 根据Invocation调用信息从Directory中获取所有可用Invoker
            copyinvokers = list(invocation);
            // check again
            // 重新检查一下
            checkInvokers(copyinvokers, invocation);
        }
        // 根据负载均衡机制从copyinvokers中选择一个Invoker
         //< 9 select ------------------------- 下面将进行 invoker选择----------------------------------->
        Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
        // 保存每次调用的Invoker
        invoked.add(invoker);
        // 设置已经调用的 Invoker 集合,到 Context 中
        RpcContext.getContext().setInvokers((List) invoked);
        try {
            // RPC 调用得到 Result
            Result result = invoker.invoke(invocation);
            // 重试过程中,将最后一次调用的异常信息以 warn 级别日志输出
            if (le != null && logger.isWarnEnabled()) {
                logger.warn("Although retry the method " + methodName
                        + " in the service " + getInterface().getName()
                        + " was successful by the provider " + invoker.getUrl().getAddress()
                        + ", but there have been failed providers " + providers
                        + " (" + providers.size() + "/" + copyinvokers.size()
                        + ") from the registry " + directory.getUrl().getAddress()
                        + " on the consumer " + NetUtils.getLocalHost()
                        + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                        + le.getMessage(), le);
            }
            return result;
        } catch (RpcException e) {
            // 如果是业务性质的异常,不再重试,直接抛出
            if (e.isBiz()) { // biz exception.
                throw e;
            }
            // 其他性质的异常统一封装成RpcException
            le = e;
        } catch (Throwable e) {
            le = new RpcException(e.getMessage(), e);
        } finally {
            providers.add(invoker.getUrl().getAddress());
        }
    }
    // 最大可调用次数用完还得到Result的话,抛出RpcException异常:重试了N次还是失败,并输出最后一次异常信息
    throw new RpcException(le.getCode(), "Failed to invoke the method "
            + methodName + " in the service " + getInterface().getName()
            + ". Tried " + len + " times of the providers " + providers
            + " (" + providers.size() + "/" + copyinvokers.size()
            + ") from the registry " + directory.getUrl().getAddress()
            + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
            + Version.getVersion() + ". Last error is: "
            + le.getMessage(), le.getCause() != null ? le.getCause() : le);
}

9 select 使用 loadbalance 选择 invoker

/**
 * Select a invoker using loadbalance policy.</br>
 * a) Firstly, select an invoker using loadbalance. If this invoker is in previously selected list, or,
 * if this invoker is unavailable, then continue step b (reselect), otherwise return the first selected invoker</br>
 * <p>
 * b) Reselection, the validation rule for reselection: selected > available. This rule guarantees that
 * the selected invoker has the minimum chance to be one in the previously selected list, and also
 * guarantees this invoker is available.
 *
 * @param loadbalance load balance policy
 * @param invocation  invocation
 * @param invokers    invoker candidates
 * @param selected    exclude selected invokers or not
 * @return the invoker which will final to do invoke.
 * @throws RpcException
 */
/**
 * 使用 loadbalance 选择 invoker.
 * a)
 * @param loadbalance  对象,提供负责均衡策略
 * @param invocation 对象
 * @param invokers 候选的 Invoker 集合
 * @param selected 已选过的 Invoker 集合. 注意:输入保证不重复
 * @return 最终的 Invoker 对象
 * @throws RpcException
 */
protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
    if (invokers == null || invokers.isEmpty())
        return null;
    // 获得 sticky 配置项,方法级
    String methodName = invocation == null ? "" : invocation.getMethodName();

    boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName, Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY);
    {
        //ignore overloaded method
        // 若 stickyInvoker 不存在于 invokers 中,说明不在候选中,需要置空,重新选择
        if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {
            stickyInvoker = null;
        }
        //ignore concurrency problem
        // 若开启粘滞连接的特性,且 stickyInvoker 不存在于 selected 中,则返回 stickyInvoker 这个 Invoker 对象
        if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) {
            // 若开启排除非可用的 Invoker 的特性,则校验 stickyInvoker 是否可用。若可用,则进行返回
            if (availablecheck && stickyInvoker.isAvailable()) {
                return stickyInvoker;
            }
        }
    }
    // 执行选择
    //<  10 -----------------------------进行选择------------------------------------->
    Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);

    // 若开启粘滞连接的特性,记录最终选择的 Invoker 到 stickyInvoker
    if (sticky) {
        stickyInvoker = invoker;
    }
    return invoker;
}

10 doSelect 从候选的 Invoker 集合,选择一个最终调用的 Invoker 对象

/**
 *  从候选的 Invoker 集合,选择一个最终调用的 Invoker 对象
 * @param loadbalance
 * @param invocation
 * @param invokers
 * @param selected
 * @return
 * @throws RpcException
 */
private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
    if (invokers == null || invokers.isEmpty())
        return null;
    // 如果只有一个 Invoker ,直接选择
    if (invokers.size() == 1)
        return invokers.get(0);
    // 使用 Loadbalance ,选择一个 Invoker 对象。
    //<11 ------------------ 根据LoadBalance(负载均衡) 选择一个合适的invoke>
    Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);

    //If the `invoker` is in the  `selected` or invoker is unavailable && availablecheck is true, reselect.
    // 如果 selected中包含(优先判断) 或者 不可用&&availablecheck=true 则重试.
    if ((selected != null && selected.contains(invoker))
            || (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
        try {
            //重选一个 Invoker 对象
            Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
            if (rinvoker != null) {
                invoker = rinvoker;
            } else {
                //Check the index of current selected invoker, if it's not the last one, choose the one at index+1.
                //看下第一次选的位置,如果不是最后,选+1位置.
                int index = invokers.indexOf(invoker);
                try {
                    //Avoid collision
                    // 最后在避免碰撞
                    invoker = index < invokers.size() - 1 ? invokers.get(index + 1) : invokers.get(0);
                } catch (Exception e) {
                    logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e);
                }
            }
        } catch (Throwable t) {
            logger.error("cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", t);
        }
    }
    return invoker;
}

11 AbstractLoadBalance#select

此方法是抽象方法,需要各种子类去实现

/**
 *  抽象方法,下面的实现类来实现这个选择invoker 的方法
 *  各个负载均衡的类自行实现提供自定义的负载均衡策略。
 * @param invokers
 * @param url
 * @param invocation
 * @param <T>
 * @return
 */
protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);

**12 RoundRobinLoadBalance 实现父类的抽象方法 **

@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
    int length = invokers.size(); // Number of invokers
    int maxWeight = 0; // The maximum weight
    int minWeight = Integer.MAX_VALUE; // The minimum weight
    final LinkedHashMap<Invoker<T>, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Invoker<T>, IntegerWrapper>();
    int weightSum = 0;
    // 计算最小、最大权重,总的权重和。
    for (int i = 0; i < length; i++) {
        int weight = getWeight(invokers.get(i), invocation);
        maxWeight = Math.max(maxWeight, weight); // Choose the maximum weight
        minWeight = Math.min(minWeight, weight); // Choose the minimum weight
        if (weight > 0) {
            invokerToWeightMap.put(invokers.get(i), new IntegerWrapper(weight));
            weightSum += weight;
        }
    }
    // 计算最小、最大权重,总的权重和。
    AtomicPositiveInteger sequence = sequences.get(key);
    if (sequence == null) {
        sequences.putIfAbsent(key, new AtomicPositiveInteger());
        sequence = sequences.get(key);
    }
    // 获得当前顺序号,并递增 + 1
    int currentSequence = sequence.getAndIncrement();
    // 权重不相等,顺序根据权重分配
    if (maxWeight > 0 && minWeight < maxWeight) {
        int mod = currentSequence % weightSum;// 剩余权重
        for (int i = 0; i < maxWeight; i++) {// 循环最大权重
            for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) {
                final Invoker<T> k = each.getKey();
                final IntegerWrapper v = each.getValue();
                // 剩余权重归 0 ,当前 Invoker 还有剩余权重,返回该 Invoker 对象
                if (mod == 0 && v.getValue() > 0) {
                    return k;
                }
                // 若 Invoker 还有权重值,扣除它( value )和剩余权重( mod )。
                if (v.getValue() > 0) {
                    v.decrement();
                    mod--;
                }
            }
        }
    }
    // 权重相等,平均顺序获得
    // Round robin
    //<13 --------------------------->
    return invokers.get(currentSequence % length);
}

14 FailoverClusterInvoker # doInvoke 方法

/**
 * 实际逻辑很简单:循环,查找一个 Invoker 对象,进行调用,直到成功
 * @param invocation
 * @param invokers
 * @param loadbalance
 * @return
 * @throws RpcException
 */
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    List<Invoker<T>> copyinvokers = invokers;
    // 检查copyinvokers即可用Invoker集合是否为空,如果为空,那么抛出异常
    checkInvokers(copyinvokers, invocation);
    String methodName = RpcUtils.getMethodName(invocation);
    // 得到最大可调用次数:最大可重试次数+1,默认最大可重试次数Constants.DEFAULT_RETRIES=2
    int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
    if (len <= 0) {
        len = 1;
    }

    // retry loop.
    // 保存最后一次调用的异常
    RpcException le = null; // last exception.
    List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
    Set<String> providers = new HashSet<String>(len);
    // failover机制核心实现:如果出现调用失败,那么重试其他服务器
    for (int i = 0; i < len; i++) {
        //Reselect before retry to avoid a change of candidate `invokers`.
        //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
        // 重试时,进行重新选择,避免重试时invoker列表已发生变化.
        // 注意:如果列表发生了变化,那么invoked判断会失效,因为invoker示例已经改变
        if (i > 0) {
            checkWhetherDestroyed();
            // 根据Invocation调用信息从Directory中获取所有可用Invoker
            copyinvokers = list(invocation);
            // check again
            // 重新检查一下
            checkInvokers(copyinvokers, invocation);
        }
        // 根据负载均衡机制从copyinvokers中选择一个Invoker
        //< ------------------------- 下面将进行 invoker选择----------------------------------->
        Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
        // 保存每次调用的Invoker
        invoked.add(invoker);
        // 设置已经调用的 Invoker 集合,到 Context 中
        RpcContext.getContext().setInvokers((List) invoked);
        try {
            // RPC 调用得到 Result
            Result result = invoker.invoke(invocation);
            // 重试过程中,将最后一次调用的异常信息以 warn 级别日志输出
            if (le != null && logger.isWarnEnabled()) {
                logger.warn("Although retry the method " + methodName
                        + " in the service " + getInterface().getName()
                        + " was successful by the provider " + invoker.getUrl().getAddress()
                        + ", but there have been failed providers " + providers
                        + " (" + providers.size() + "/" + copyinvokers.size()
                        + ") from the registry " + directory.getUrl().getAddress()
                        + " on the consumer " + NetUtils.getLocalHost()
                        + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                        + le.getMessage(), le);
            }
            return result;
        } catch (RpcException e) {
            // 如果是业务性质的异常,不再重试,直接抛出
            if (e.isBiz()) { // biz exception.
                throw e;
            }
            // 其他性质的异常统一封装成RpcException
            le = e;
        } catch (Throwable e) {
            le = new RpcException(e.getMessage(), e);
        } finally {
            providers.add(invoker.getUrl().getAddress());
        }
    }
    // 最大可调用次数用完还得到Result的话,抛出RpcException异常:重试了N次还是失败,并输出最后一次异常信息
    throw new RpcException(le.getCode(), "Failed to invoke the method "
            + methodName + " in the service " + getInterface().getName()
            + ". Tried " + len + " times of the providers " + providers
            + " (" + providers.size() + "/" + copyinvokers.size()
            + ") from the registry " + directory.getUrl().getAddress()
            + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
            + Version.getVersion() + ". Last error is: "
            + le.getMessage(), le.getCause() != null ? le.getCause() : le);
}

以上是一个完成调用过程的源码分析 以及架构分析

Dubbo集群容错源码解析


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK