7

Dubbo源码跟踪实录-集群容错:集群Cluster

 3 years ago
source link: https://nnkwrik.github.io/2019/04/21/20190421-2/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Dubbo源码跟踪实录-集群容错:集群Cluster



字数统计: 3.9k阅读时长: 24 min
 2019/04/21  Share

Dubbo的两个集群接口Cluster,Cluster Invoker

Cluster将多个Invoker合并为一个Cluster Invoker,并将这个 Invoker 暴露给服务消费者。

服务消费者只需通过这个Cluster Invoker 进行远程调用即可,至于具体调用哪个Invoker,以及调用失败后如何处理等问题,现在都交给集群模块去处理。

Dubbo 提供了多种集群实现,包含但不限于 Failover Cluster、Failfast Cluster 和 Failsafe Cluster 等。每种集群实现类的用途不同,接下来会一一进行分析。

在对集群相关代码进行分析之前,这里有必要先来介绍一下集群容错的所有组件。包含 Cluster、Cluster Invoker、Directory、Router 和 LoadBalance 等。

分析一下集群工作的流程.分为消费者初始化期间和进行远程调用时

  • 服务消费者初始化期间
    • 集群 Cluster 实现类为服务消费者创建 Cluster Invoker 实例,即上图中的 merge 操作。
  • 服务消费者进行远程调用时.FailoverClusterInvoker 为例
    1. 调用 Directory 的 list 方法列举 Invoker 列表.
      这里的Directory通常是指RegistryDirectory.RegistryDirectory是会动态增删Invoker的,当有新增的Invoker时它会调用Router的route方法进行路由,也就是Invoker的过滤.
    2. 通过LoadBalance选择一个Invoker实例的invoker方法
    3. 把参数传给该invoker方法.进行远程调用.

Dubbo 主要提供了这样几种容错方式:

  • Failover Cluster - 失败自动切换
  • Failfast Cluster - 快速失败
  • Failsafe Cluster - 失败安全
  • Failback Cluster - 失败自动恢复
  • Forking Cluster - 并行调用多个服务提供者

Cluster

cluster接口仅用于生成 Cluster Invoker。所以它只包含下面这一个方法.

public class FailoverCluster implements Cluster {
public final static String NAME = "failover";
@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
// 创建并返回 FailoverClusterInvoker 对象
return new FailoverClusterInvoker<T>(directory);
}

}

除了FailoverCluster,其他的cluster也都如此.仅用于生成各自对应的 Cluster Invoker.在构造方法中参数的directory会被保存.

Cluster Invoker

前面说到,服务消费者初始化期间会调用Cluster的join()来创建ClusterInvoker.其实这部分在前几篇讲服务引入的时候有接触过,在RegistryProtocol#doRefer里面.这里来简单回顾一下方法中相关的部分.

//com.alibaba.dubbo.registry.integration.RegistryProtocol#doRefer
1. 创建RegistryDirectory实例
2. RegistryDirectory中放入zookeeperRegistry.(顺便创建ZK ConsumerURL的节点)
3. RegistryDirectory中放入dubboProtocol
4. directory#subscribe
1. 从zk获取providers,router等
2. (更新路由)
3. 创建对应providerURL的netty客户端,把连接构成invoker
5. cluster.join(directory);//如果该注册中心下有多个服务提供者,合并成一个
6. 返回合并后的invoker

重点是第5步,clusterInvoker就是在那时被创建的.

这一部分就不过多解释了,下面来看服务消费者进行远程调用的部分.

讲服务目录的时候说过,每当远程服务被Consumer调用时,AbstractDirectory的list()都会被调用.其实Consumer是调用AbstractClusterInvoker 的 invoke (),再由它去调用AbstractDirectory的list()的.

//com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker#invoke
@Override
public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();
LoadBalance loadbalance = null;

// 绑定 attachments 到 invocation 中.
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation) invocation).addAttachments(contextAttachments);
}

// 列举 Invoker
List<Invoker<T>> invokers = list(invocation);
if (invokers != null && !invokers.isEmpty()) {
// 加载 LoadBalance
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
.getMethodParameter(RpcUtils.getMethodName(invocation), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
}
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
// 调用 doInvoke 进行后续操作
return doInvoke(invocation, invokers, loadbalance);
}

重点做了两件事1. 从Directory拿到List<Invoker<T>>,2. 加载LoadBalance.

回顾一下,从Directory拿到List<Invoker<T>>部分是调用AbstractDirectory.list()获取invoker列表.列表的元素是在RegistryDirectory#notify()的时候放进去的.

继续看后续由各个子类实现的doInvoke()方法

FailoverClusterInvoker

FailoverClusterInvoker的特性是在调用失败时,会自动切换 Invoker 进行重试.这是dubbo默认的cluster Invoker.

//com.alibaba.dubbo.rpc.cluster.support.FailoverClusterInvoker#doInvoke
@Override
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyinvokers = invokers;
checkInvokers(copyinvokers, invocation);
// 获取重试次数.默认3
int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
RpcException le = null;
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size());
Set<String> providers = new HashSet<String>(len);
// 循环调用,失败重试
for (int i = 0; i < len; i++) {
if (i > 0) {
checkWhetherDestroyed();
// 在进行重试前重新列举 Invoker,这样做的好处是,如果某个服务挂了,
// 通过调用 list 可得到最新可用的 Invoker 列表
copyinvokers = list(invocation);
// 对 copyinvokers 进行判空检查
checkInvokers(copyinvokers, invocation);
}

// 通过负载均衡选择 Invoker
Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
// 添加到 invoker 到 invoked 列表中
invoked.add(invoker);
// 设置 invoked 到 RPC 上下文中
RpcContext.getContext().setInvokers((List) invoked);
try {
// 调用目标 Invoker 的 invoke 方法
Result result = invoker.invoke(invocation);
return result;
} catch (RpcException e) {
if (e.isBiz()) {
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}

// 若重试失败,则抛出异常
throw new RpcException(..., "Failed to invoke the method ...");
}

首先从url中获取retries(重试次数)参数,默认为3.也就是说当Invoker不好使时可以进行3次尝试.每次通过loadbalance来选择一个invoker.如果invoker调用成功,则返回远程调用的结果.如果3次调用都失败了,那么就抛出异常

看一下通过loadbalance选择一个invoker时调用的select()的逻辑.该方法在父类的AbstractClusterInvoker中定义

//com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker#select
protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
if (invokers == null || invokers.isEmpty())
return null;
// 获取调用方法名
String methodName = invocation == null ? "" : invocation.getMethodName();

// 获取 sticky 配置,sticky 表示粘滞连接。所谓粘滞连接是指让服务消费者尽可能的
// 调用同一个服务提供者,除非该提供者挂了再进行切换.如果url中没有配置,则是默认的false
boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName, Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY);
{
// 检测 invokers 列表是否包含 stickyInvoker,如果不包含,
// 说明 stickyInvoker 代表的服务提供者挂了,此时需要将其置空
if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {//false 不包含
stickyInvoker = null;
}
// 在 sticky 为 true,且 stickyInvoker != null 的情况下。如果 selected 包含
// stickyInvoker,表明 stickyInvoker 对应的服务提供者可能因网络原因未能成功提供服务。
// 但是该提供者并没挂,此时 invokers 列表中仍存在该服务提供者对应的 Invoker。
if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) {
// availablecheck 表示是否开启了可用性检查,如果开启了,则调用 stickyInvoker 的
// isAvailable 方法进行检查,如果检查通过,则直接返回 stickyInvoker。
if (availablecheck && stickyInvoker.isAvailable()) {
return stickyInvoker;
}
}
}
// 如果线程走到当前代码处,说明前面的 stickyInvoker 为空,或者不可用。
// 此时继续调用 doSelect 选择 Invoker
Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);

// 如果 sticky 为 true,则将负载均衡组件选出的 Invoker 赋值给 stickyInvoker
if (sticky) {
stickyInvoker = invoker;
}调用同一个服务提供者,除非该提供者挂了再进行切换
return invoker;
}

上面可以看到有粘滞连接的功能.当该功能开启是时尽可能得调用同一个服务提供者,除非该提供者挂了再进行切换.而没有开启该功能时(如demo),则是调用doSelect()来进行选择.

//com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker#doSelect
private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
if (invokers == null || invokers.isEmpty())
return null;
if (invokers.size() == 1)//如果只有一个invoker,则直接返回
return invokers.get(0);
if (loadbalance == null) {
// 如果 loadbalance 为空,这里通过 SPI 加载 Loadbalance,默认为 RandomLoadBalance
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
}

// 通过负载均衡组件选择 Invoker
Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);

// 如果 selected 包含负载均衡选择出过的 Invoker,或者该 Invoker 无法经过可用性检查,此时进行重选
if ((selected != null && selected.contains(invoker))
|| (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
try {
// 进行重选
Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
if (rinvoker != null) {
// 如果 rinvoker 不为空,则将其赋值给 invoker
invoker = rinvoker;
} else {
// rinvoker 为空,定位 invoker 在 invokers 中的位置
int index = invokers.indexOf(invoker);
try {
// 获取 index + 1 位置处的 Invoker,以下代码等价于:
// invoker = invokers.get((index + 1) % invokers.size());
invoker = index < invokers.size() - 1 ? invokers.get(index + 1) : invokers.get(0);
} catch (Exception e) {
logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e);
}
}
} catch (Throwable t) {
logger.error("cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", t);
}
}
return invoker;
}

主要包含三个工作,

  1. 通过负载均衡组件选择invoker.loadBalancer默认是RandomLoadBalance.

  2. 如果这个Invoker是之前选到过并且不好使的invoker,则调用reselect()进行重选.

  3. 如果重选的结果为null,直接返回下一个Invoker

重选的reselect()方法的代码就不分析了,简单说一下逻辑,它有几个分支

  • 从未选过的invoker集合中进行选择
    从所有Invoker中排除之前选择过得selected集合.通过负载均衡组件从这个未选过的invoker集合中选择

  • 如果所有Invoker都被选择过了,那么从selected集合里面找是否有可用的

  • 如果所有Invoker都被选过了,而且选过的Invoker状态都为不可用,返回null

以上FailoverClusterInvoker就分析结束了

FailbackClusterInvoker

FailbackClusterInvoker 会在调用失败后,返回一个空结果给服务提供者。并通过定时任务对失败的调用进行重传,适合执行消息通知等操作。

简单看这一下关于失败处理的部分代码:

public class FailbackClusterInvoker<T> extends AbstractClusterInvoker<T> {

private static final long RETRY_FAILED_PERIOD = 5 * 1000;

private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2,
new NamedInternalThreadFactory("failback-cluster-timer", true));

private final ConcurrentMap<Invocation, AbstractClusterInvoker<?>> failed = new ConcurrentHashMap<Invocation, AbstractClusterInvoker<?>>();
private volatile ScheduledFuture<?> retryFuture;

@Override
protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
try {
checkInvokers(invokers, invocation);
// 选择 Invoker
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
// 进行调用
return invoker.invoke(invocation);
} catch (Throwable e) {
// 如果调用过程中发生异常,此时仅打印错误日志,不抛出异常
logger.error("Failback to invoke method ...");

// 记录调用信息
addFailed(invocation, this);
// 返回一个空结果给服务消费者
return new RpcResult();
}
}

private void addFailed(Invocation invocation, AbstractClusterInvoker<?> router) {
if (retryFuture == null) {
synchronized (this) {
if (retryFuture == null) {
// 创建定时任务,每隔5秒执行一次
retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {

@Override
public void run() {
try {
// 对失败的调用进行重试
retryFailed();
} catch (Throwable t) {
// 如果发生异常,仅打印异常日志,不抛出
logger.error("Unexpected error occur at collect statistic", t);
}
}
}, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS);
}
}
}在调用失败后,返回一个空结果给服务提供者。

// 添加 invocation 和 invoker 到 failed 中
failed.put(invocation, router);
}

void retryFailed() {
if (failed.size() == 0) {
return;
}

// 遍历 failed,对失败的调用进行重试
for (Map.Entry<Invocation, AbstractClusterInvoker<?>> entry : new HashMap<Invocation, AbstractClusterInvoker<?>>(failed).entrySet()) {
Invocation invocation = entry.getKey();
Invoker<?> invoker = entry.getValue();
try {
// 再次进行调用
invoker.invoke(invocation);
// 调用成功后,从 failed 中移除 invoker
failed.remove(invocation);
} catch (Throwable e) {
// 仅打印异常,不抛出
logger.error("Failed retry to invoke method ...");
}
}
}
}

逻辑很简单.总之首次invoke失败时,无论之后在定时任务中Invoker再次调用成功与否,消费者拿到的都是空结果.而且就算失败也不会抛出异常.

FailfastClusterInvoker

FailfastClusterInvoker 只会进行一次调用,失败后立即抛出异常。

public class FailfastClusterInvoker<T> extends AbstractClusterInvoker<T> {

@Override
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
checkInvokers(invokers, invocation);
// 选择 Invoker
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
try {
// 调用 Invoker
return invoker.invoke(invocation);
} catch (Throwable e) {
if (e instanceof RpcException && ((RpcException) e).isBiz()) {
// 抛出异常
throw (RpcException) e;
}
// 抛出异常
throw new RpcException(..., "Failfast invoke providers ...");
}
}
}

FailsafeClusterInvoker

FailsafeClusterInvoker 是一种失败安全的 Cluster Invoker。所谓的失败安全是指,当调用过程中出现异常时,FailsafeClusterInvoker 仅会打印异常,而不会抛出异常。

public class FailsafeClusterInvoker<T> extends AbstractClusterInvoker<T> {

@Override
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
try {
checkInvokers(invokers, invocation);
// 选择 Invoker
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
// 进行远程调用
return invoker.invoke(invocation);
} catch (Throwable e) {
// 打印错误日志,但不抛出
logger.error("Failsafe ignore exception: " + e.getMessage(), e);
// 返回空结果忽略错误
return new RpcResult();
}
}
}

ForkingClusterInvoker

ForkingClusterInvoker 会在运行时通过线程池创建多个线程,并发调用多个服务提供者。只要有一个服务提供者成功返回了结果,doInvoke 方法就会立即结束运行。ForkingClusterInvoker 的应用场景是在一些对实时性要求比较高读操作(注意是读操作,并行写操作可能不安全)下使用,但这将会耗费更多的资源。

这部分也非常简单,由于代码比较长.解析的部分可以参考官方文档

https://dubbo.incubator.apache.org/zh-cn/docs/source_code_guide/cluster.html

BroadcastClusterInvoker

BroadcastClusterInvoker 会逐个调用每个服务提供者,如果其中一台报错,在循环调用结束后,BroadcastClusterInvoker 会抛出异常。

public class BroadcastClusterInvoker<T> extends AbstractClusterInvoker<T> {

@Override
public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
checkInvokers(invokers, invocation);
RpcContext.getContext().setInvokers((List) invokers);
RpcException exception = null;
Result result = null;
// 遍历 Invoker 列表,逐个调用
for (Invoker<T> invoker : invokers) {
try {
// 进行远程调用
result = invoker.invoke(invocation);
} catch (RpcException e) {
exception = e;
logger.warn(e.getMessage(), e);
} catch (Throwable e) {
exception = new RpcException(e.getMessage(), e);
logger.warn(e.getMessage(), e);
}
}

// exception 不为空,则抛出异常
if (exception != null) {
throw exception;
}
return result;
}
}

关于cluster 和 cluster Invoker的概念.

Consumer持有的Invoker是cluster Invoker.

ClusterInvoker实际上只是多个Invoker的包装类罢了,让消费者认为自己是在调用单个的Invoker.

而在clusterInvoker被调用时,它将会根据负载均衡等一系列操作,从自己持有的Invoker中选出一个Invoker,作为真正进行远程调用的invoker,把结果返回给Consumer.

总结一下各个ClusterInvoker的特性:

  • Failover Cluster
    在调用失败时,会自动切换其他的 Invoker 进行重试
  • Failfast Cluster
    只会进行一次调用,失败后立即抛出异常。
  • Failsafe Cluster
    当调用过程中出现异常时, 仅会打印异常,而不会抛出异常。
  • Failback Cluster
    在调用失败后,返回一个空结果给服务提供者。并通过定时任务对失败的调用进行重试,适合执行消息通知等操作。
  • Forking Cluster
    在运行时通过线程池创建多个线程,并发调用多个服务提供者。只要有一个服务提供者成功返回了结果,doInvoke 方法就会立即结束运行。

  • BroadcastClusterInvoker
    循环调用每个服务提供者,如果其中一台报错,在循环调用结束后抛出异常


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK