8

Dubbo源码跟踪实录-集群容错:负载均衡LoadBalance

 3 years ago
source link: https://nnkwrik.github.io/2019/04/21/20190421-3/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Dubbo 提供了4种负载均衡实现,分别是

  • 基于权重随机算法的 RandomLoadBalance
  • 基于最少活跃调用数算法的 LeastActiveLoadBalance
  • 基于 hash 一致性的 ConsistentHashLoadBalance
  • 基于加权轮询算法的 RoundRobinLoadBalance

以下是Dubbo 2.6.4的代码分析.之后的版本优化了部分代码.

AbstractLoadBalance

所有负载均衡类的父类.首先来看一下负载均衡的入口方法 select,如下:

//com.alibaba.dubbo.rpc.cluster.loadbalance.AbstractLoadBalance#select
@Override
public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
if (invokers == null || invokers.isEmpty())
return null;
// 如果 invokers 列表中仅有一个 Invoker,直接返回即可,无需进行负载均衡
if (invokers.size() == 1)
return invokers.get(0);
// 调用 doSelect 方法进行负载均衡,该方法为抽象方法,由子类实现
return doSelect(invokers, url, invocation);
}

除此外还封装了一些公共逻辑,比如获取服务提供者权重的逻辑。具体实现如下:

protected int getWeight(Invoker<?> invoker, Invocation invocation) {
// 从 url 中获取权重 weight 配置值
int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
if (weight > 0) {
// 获取服务提供者启动时间戳
long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L);
if (timestamp > 0L) {
// 计算服务提供者运行时长
int uptime = (int) (System.currentTimeMillis() - timestamp);
// 获取服务预热时间,默认为10分钟
int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);
// 如果服务运行时间小于预热时间,则重新计算服务权重,即降权
if (uptime > 0 && uptime < warmup) {
// 重新计算服务权重
weight = calculateWarmupWeight(uptime, warmup, weight);
}
}
}
return weight;
}

static int calculateWarmupWeight(int uptime, int warmup, int weight) {
// 计算权重,下面代码逻辑上形似于 (uptime / warmup) * weight。
// 随着服务运行时间 uptime 增大,权重计算值 ww 会慢慢接近配置值 weight
int ww = (int) ((float) uptime / ((float) warmup / (float) weight));
return ww < 1 ? 1 : (ww > weight ? weight : ww);
}

上面的getWeight()用于获取invoker的权重.如果Invoker的服务启动时间未满10分钟.表示该服务还在预热的状态,此时降低它的权重.

RandomLoadBalance

先看一下它的算法思想,简单又容易实现:

RandomLoadBalance 是加权随机算法的具体实现,它的算法思想很简单。假设我们有一组服务器 servers = [A, B, C],他们对应的权重为 weights = [5, 3, 2],权重总和为10。现在把这些权重值平铺在一维坐标值上,[0, 5) 区间属于服务器 A,[5, 8) 区间属于服务器 B,[8, 10) 区间属于服务器 C。接下来通过随机数生成器生成一个范围在 [0, 10) 之间的随机数,然后计算这个随机数会落到哪个区间上。比如数字3会落到服务器 A 对应的区间上,此时返回服务器 A 即可。权重越大的机器,在坐标轴上对应的区间范围就越大,因此随机数生成器生成的数字就会有更大的概率落到此区间内。只要随机数生成器产生的随机数分布性很好,在经过多次选择后,每个服务器被选中的次数比例接近其权重比例。比如,经过一万次选择后,服务器 A 被选中的次数大约为5000次,服务器 B 被选中的次数约为3000次,服务器 C 被选中的次数约为2000次。

//com.alibaba.dubbo.rpc.cluster.loadbalance.RandomLoadBalance#doSelect
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size();
int totalWeight = 0;
boolean sameWeight = true;
// 下面这个循环有两个作用,第一是计算总权重 totalWeight,
// 第二是检测每个服务提供者的权重是否相同
for (int i = 0; i < length; i++) {
int weight = getWeight(invokers.get(i), invocation);
totalWeight += weight;
if (sameWeight && i > 0
&& weight != getWeight(invokers.get(i - 1), invocation)) {
sameWeight = false;
}
}
if (totalWeight > 0 && !sameWeight) {//有必要做基于权重的负载均衡.就是之前的区间算法

int offset = random.nextInt(totalWeight);

for (int i = 0; i < length; i++) {
offset -= getWeight(invokers.get(i), invocation);
if (offset < 0) {
return invokers.get(i);
}
}
}

return invokers.get(random.nextInt(length));//权重都一样,随便取一个
}

当各个Invoker有不同的权重时需要进行基于算法的负载均衡.否则表示没有必要进行基于权重的负载均衡,则随便选一个.

当然 RandomLoadBalance 也存在一定的缺点,当调用次数比较少时,Random 产生的随机数可能会比较集中,此时多数请求会落到同一台服务器上。这个缺点并不是很严重,多数情况下可以忽略。RandomLoadBalance 是一个简单,高效的负载均衡实现,因此 Dubbo 选择它作为默认实现。

LeastActiveLoadBalance

说白了就是,哪个Invoker最闲就用哪个

每个服务提供者对应一个活跃数 active。初始情况下,所有服务提供者活跃数均为0。每收到一个请求,活跃数加1,完成请求后则将活跃数减1。在服务运行一段时间后,性能好的服务提供者处理请求的速度更快,因此活跃数下降的也越快,此时这样的服务提供者能够优先获取到新的服务请求、这就是最小活跃数负载均衡算法的基本思想。除了最小活跃数,LeastActiveLoadBalance 在实现上还引入了权重值。所以准确的来说,LeastActiveLoadBalance 是基于加权最小活跃数算法实现的。

//com.alibaba.dubbo.rpc.cluster.loadbalance.LeastActiveLoadBalance#doSelect
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size();
// 最小的活跃数
int leastActive = -1;
// 具有相同“最小活跃数”的服务者提供者(以下用 Invoker 代称)数量
int leastCount = 0;
// leastIndexs 用于记录具有相同“最小活跃数”的 Invoker 在 invokers 列表中的下标信息
int[] leastIndexs = new int[length];
int totalWeight = 0;
// 第一个最小活跃数的 Invoker 权重值,用于与其他具有相同最小活跃数的 Invoker 的权重进行对比,
// 以检测是否“所有具有相同最小活跃数的 Invoker 的权重”均相等
int firstWeight = 0;
boolean sameWeight = true;

// 1. 遍历 invokers 列表,寻找活跃度最小的invoker
for (int i = 0; i < length; i++) {
Invoker<T> invoker = invokers.get(i);
// 获取 Invoker 对应的活跃数
int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
// 获取权重 - ⭐️
int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
// 这个Invoker的active比之前的都要小,重新开始
if (leastActive == -1 || active < leastActive) {
// 使用当前活跃数 active 更新最小活跃数 leastActive
leastActive = active;
// 更新 leastCount 为 1
leastCount = 1;
// 记录当前下标值到 leastIndexs 中
leastIndexs[0] = i;
totalWeight = weight;
firstWeight = weight;
sameWeight = true;

// 当前 Invoker 的活跃数 active 与最小活跃数 leastActive 相同
} else if (active == leastActive) {
// 在 leastIndexs 中记录下当前 Invoker 在 invokers 集合中的下标
leastIndexs[leastCount++] = i;
// 累加权重
totalWeight += weight;
// 检测当前 Invoker 的权重与 firstWeight 是否相等,
// 不相等则将 sameWeight 置为 false
if (sameWeight && i > 0
&& weight != firstWeight) {
sameWeight = false;
}
}
}

// 2. 当只有一个 Invoker 具有最小活跃数,此时直接返回该 Invoker 即可
if (leastCount == 1) {
return invokers.get(leastIndexs[0]);
}

// 3. 有多个 Invoker 具有相同的最小活跃数,但它们之间的权重不同
if (!sameWeight && totalWeight > 0) {
// 随机生成一个 [0, totalWeight) 之间的数字
int offsetWeight = random.nextInt(totalWeight);
// 循环让随机数减去具有最小活跃数的 Invoker 的权重值,
// 当 offset 小于等于0时,返回相应的 Invoker
for (int i = 0; i < leastCount; i++) {
int leastIndex = leastIndexs[i];
// 获取权重值,并让随机数减去权重值 - ⭐️
offsetWeight -= getWeight(invokers.get(leastIndex), invocation);
if (offsetWeight <= 0) // ❌
return invokers.get(leastIndex);
}
}
// 如果权重相同或权重为0时,随机返回一个 Invoker
return invokers.get(leastIndexs[random.nextInt(leastCount)]);
}

下面简单总结一下以上代码所做的事情,如下:

  1. 遍历 invokers 列表,寻找活跃数最小的 Invoker
  2. 如果只有一个 Invoker 具有最小的活跃数,此时直接返回该 Invoker 即可
  3. 如果有多个 Invoker 具有最小活跃数,且它们的权重不相等,之后的处理方式和 RandomLoadBalance 一致

有标记的两个部分是在之后版本中被修复的部分

> // 获取权重 - ⭐️
> int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
>

比如上面的代码直接从 url 中取权重值,在预热阶段时也不会进行降权处理。应该调用它父类的getWeight()方法.

> // afterWarmup 等价于上面的 weight 变量,这样命名是为了强调该变量经过了 warmup 降权处理
> int afterWarmup = getWeight(invoker, invocation);
>

还有一个被标记的部分

> int offsetWeight = random.nextInt(totalWeight);
> for (int i = 0; i < leastCount; i++) {
> int leastIndex = leastIndexs[i];
> // 获取权重值,并让随机数减去权重值 - ⭐️
> offsetWeight -= getWeight(invokers.get(leastIndex), invocation);
> if (offsetWeight <= 0) // ❌
> return invokers.get(leastIndex);
> }
>

这里被修复成了

> int offsetWeight = random.nextInt(totalWeight) + 1;
>

ConsistentHashLoadBalance

基于hash一致性的算法,官方文档中用了大量的篇幅来解释该算法,Dubbo官方文档-ConsistentHashLoadBalance

.我这里就不贴过来了,如果之前没有接触过的话,可能得多读几遍.

下面开始它的源码分析:

public class ConsistentHashLoadBalance extends AbstractLoadBalance {

private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors =
new ConcurrentHashMap<String, ConsistentHashSelector<?>>();

@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
String methodName = RpcUtils.getMethodName(invocation);
String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName;

// 获取 invokers 原始的 hashcode
int identityHashCode = System.identityHashCode(invokers);
ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
// 如果 invokers 是一个新的 List 对象,意味着服务提供者数量发生了变化,可能新增也可能减少了。
// 此时 selector.identityHashCode != identityHashCode 条件成立
if (selector == null || selector.identityHashCode != identityHashCode) {
// 创建新的 ConsistentHashSelector
selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, identityHashCode));
selector = (ConsistentHashSelector<T>) selectors.get(key);
}

// 调用 ConsistentHashSelector 的 select 方法选择 Invoker
return selector.select(invocation);
}

private static final class ConsistentHashSelector<T> {...}
}

doSelect()方法主要用于完成一系列前置工作,根据服务方法创建或获取ConsistentHashSelector类型的selector.

最终调用selector的select()方法进行选择.在分析 select 方法之前,我们先来看一下一致性 hash 选择器 ConsistentHashSelector 的初始化过程,如下:

private static final class ConsistentHashSelector<T> {

// 使用 TreeMap 存储 Invoker 虚拟节点
private final TreeMap<Long, Invoker<T>> virtualInvokers;

private final int replicaNumber;

private final int identityHashCode;

private final int[] argumentIndex;

ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
this.identityHashCode = identityHashCode;
URL url = invokers.get(0).getUrl();
// 获取虚拟节点数,默认为160
this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160);
// 获取参与 hash 计算的参数下标值,默认对第一个参数进行 hash 运算
String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0"));
argumentIndex = new int[index.length];
for (int i = 0; i < index.length; i++) {
argumentIndex[i] = Integer.parseInt(index[i]);
}
for (Invoker<T> invoker : invokers) {
String address = invoker.getUrl().getAddress();
for (int i = 0; i < replicaNumber / 4; i++) {
// 对 address + i 进行 md5 运算,得到一个长度为16的字节数组
byte[] digest = md5(address + i);
// 对 digest 部分字节进行4次 hash 运算,得到四个不同的 long 型正整数
for (int h = 0; h < 4; h++) {
// h = 0 时,取 digest 中下标为 0 ~ 3 的4个字节进行位运算
// h = 1 时,取 digest 中下标为 4 ~ 7 的4个字节进行位运算
// h = 2, h = 3 时过程同上
long m = hash(digest, h);
// 将 hash 到 invoker 的映射关系存储到 virtualInvokers 中,
// virtualInvokers 需要提供高效的查询操作,因此选用 TreeMap 作为存储结构
virtualInvokers.put(m, invoker);
}
}
}
}
}

刚看到这段代码的时候一脸懵逼.其实就是构建下图的过程:

ConsistentHashSelector的核心是一个TreeMap:

// 使用 TreeMap 存储 Invoker 虚拟节点
private final TreeMap<Long, Invoker<T>> virtualInvokers;

这个TreeMap保存了上图的hash数据结构

上图是一个List<Invoker<T>>构建后的结果.实际上只有Invoker1,2,3的3个有相同服务的Invoker.其他相同颜色的点都是他们的复制出来的”虚拟节点”.至于为什么需要创建那么多的虚拟节点,文档中有解释.

我们经过一系列的算法从3个Invoker生成为160*3的虚拟节点,通过md5算法分散保存到TreeMap中了.

理解一致性hash的构造过程后,开始看它的select()方法:

public Invoker<T> select(Invocation invocation) {
// 将参数转为 key
String key = toKey(invocation.getArguments());
// 对参数 key 进行 md5 运算
byte[] digest = md5(key);
// 取 digest 数组的前四个字节进行 hash 运算,再将 hash 值传给 selectForKey 方法,
// 寻找合适的 Invoker
return selectForKey(hash(digest, 0));
}

private Invoker<T> selectForKey(long hash) {
// 到 TreeMap 中查找第一个节点值大于或等于当前 hash 的 Invoker
Map.Entry<Long, Invoker<T>> entry = virtualInvokers.tailMap(hash, true).firstEntry();
// 如果 hash 大于 Invoker 在圆环上最大的位置,此时 entry = null,
// 需要将 TreeMap 的头节点赋值给 entry
if (entry == null) {
entry = virtualInvokers.firstEntry();
}
// 返回 Invoker
return entry.getValue();
}

如上,选择的过程相对比较简单了。首先是对参数进行 md5 以及 hash 运算,得到一个 hash 值。然后再拿这个值到 TreeMap 中查找目标 Invoker 即可。

RoundRobinLoadBalance

加权轮询负载均衡,算法介绍:

这里从最简单的轮询开始讲起,所谓轮询是指将请求轮流分配给每台服务器。举个例子,我们有三台服务器 A、B、C。我们将第一个请求分配给服务器 A,第二个请求分配给服务器 B,第三个请求分配给服务器 C,第四个请求再次分配给服务器 A。这个过程就叫做轮询。轮询是一种无状态负载均衡算法,实现简单,适用于每台服务器性能相近的场景下。

但现实情况下,我们并不能保证每台服务器性能均相近。如果我们将等量的请求分配给性能较差的服务器,这显然是不合理的。因此,这个时候我们需要对轮询过程进行加权,以调控每台服务器的负载。经过加权后,每台服务器能够得到的请求数比例,接近或等于他们的权重比。比如服务器 A、B、C 权重比为 5:2:1。那么在8次请求中,服务器 A 将收到其中的5次请求,服务器 B 会收到其中的2次请求,服务器 C 则收到其中的1次请求。

源代码如下,说白了就是根据mod值和权重选择.

@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// key = 全限定类名 + "." + 方法名,比如 com.xxx.DemoService.sayHello
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
int length = invokers.size();
int maxWeight = 0; // 最大权重
int minWeight = Integer.MAX_VALUE; // 最小权重
final LinkedHashMap<Invoker<T>, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Invoker<T>, IntegerWrapper>();
// 权重总和
int weightSum = 0;
// 下面这个循环主要用于查找最大和最小权重,计算权重总和等
for (int i = 0; i < length; i++) {
int weight = getWeight(invokers.get(i), invocation);
// 获取最大和最小权重
maxWeight = Math.max(maxWeight, weight);
minWeight = Math.min(minWeight, weight);
if (weight > 0) {
// 将 weight 封装到 IntegerWrapper 中.key = invoker,value = 权重
invokerToWeightMap.put(invokers.get(i), new IntegerWrapper(weight));
// 累加权重
weightSum += weight;
}
}
// 查找 key 对应的对应 AtomicPositiveInteger 实例,为空则创建。
// 这里可以把 AtomicPositiveInteger 看成一个黑盒,大家只要知道
// AtomicPositiveInteger 用于记录服务的调用编号即可。
AtomicPositiveInteger sequence = sequences.get(key);
if (sequence == null) {
sequences.putIfAbsent(key, new AtomicPositiveInteger());
sequence = sequences.get(key);
}
// 获取当前的调用编号
int currentSequence = sequence.getAndIncrement();
// 如果最小权重小于最大权重,表明服务提供者之间的权重是不相等的
if (maxWeight > 0 && minWeight < maxWeight) {
// 使用调用编号对权重总和进行取余操作
int mod = currentSequence % weightSum;
// 进行 maxWeight 次遍历
for (int i = 0; i < maxWeight; i++) {
// 遍历 invokerToWeightMap
for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) {
// 获取 Invoker
final Invoker<T> k = each.getKey();
// 获取权重包装类 IntegerWrapper
final IntegerWrapper v = each.getValue();

// 如果 mod = 0,且权重大于0,此时返回相应的 Invoker
if (mod == 0 && v.getValue() > 0) {
return k;
}

// mod != 0,且权重大于0,此时对权重和 mod 分别进行自减操作
if (v.getValue() > 0) {
v.decrement();
mod--;
}
}
}
}

// 服务提供者之间的权重相等,此时通过轮询选择 Invoker
return invokers.get(currentSequence % length);
}

下面我们举例进行说明,假设我们有三台服务器 servers = [A, B, C],对应的权重为 weights = [2, 5, 1]。接下来对上面的逻辑进行简单的模拟。

mod = 0:满足条件,此时直接返回服务器 A

mod = 1:需要进行一次递减操作才能满足条件,此时返回服务器 B,[1,5,1]

mod = 2:需要进行两次递减操作才能满足条件,此时返回服务器 C,[1,4,1]

mod = 3:需要进行三次递减操作才能满足条件,经过递减后,服务器权重为 [1, 4, 0],此时返回服务器 A

mod = 4:需要进行四次递减操作才能满足条件,经过递减后,服务器权重为 [0, 4, 0],此时返回服务器 B

mod = 5:需要进行五次递减操作才能满足条件,经过递减后,服务器权重为 [0, 3, 0],此时返回服务器 B

mod = 6:需要进行六次递减操作才能满足条件,经过递减后,服务器权重为 [0, 2, 0],此时返回服务器 B

mod = 7:需要进行七次递减操作才能满足条件,经过递减后,服务器权重为 [0, 1, 0],此时返回服务器 B

经过8次调用后,我们得到的负载均衡结果为 [A, B, C, A, B, B, B, B],次数比 A:B:C = 2:5:1,等于权重比。当 sequence = 8 时,mod = 0,此时重头再来。从上面的模拟过程可以看出,当 mod >= 3 后,服务器 C 就不会被选中了,因为它的权重被减为0了。当 mod >= 4 后,服务器 A 的权重被减为0,此后 A 就不会再被选中。

缺点在于如果mod值非常大时,需要做出多次的循环才能将mod减成0.存在性能问题.

后面的版本中对这一部分进行了优化,具体参考官方文档的解释:https://dubbo.incubator.apache.org/zh-cn/docs/source_code_guide/loadbalance.html


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK