0

近期业务大量突增微服务性能优化总结-1.改进客户端负载均衡算法

 2 years ago
source link: https://www.heapdump.cn/article/2859094
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
近期业务大量突增微服务性能优化总结-1.改进客户端负载均衡算法 | HeapDump性能社区
文章>近期业务大量突增微服务性能优化总结-1.改进客户端负载均衡算法

近期业务大量突增微服务性能优化总结-1.改进客户端负载均衡算法

张哈希
Spring Cloud
13小时前

最近,业务增长的很迅猛,对于我们后台这块也是一个不小的挑战,这次遇到的核心业务接口的性能瓶颈,并不是单独的一个问题导致的,而是几个问题揉在一起:我们解决一个之后,发上线,之后发现还有另一个的性能瓶颈问题。这也是我经验不足,导致没能一下子定位解决;而我又对我们后台整个团队有着固执的自尊,不想通过大量水平扩容这种方式挺过压力高峰,导致线上连续几晚都出现了不同程度的问题,肯定对于我们的业务增长是有影响的。这也是我不成熟和要反思的地方。这系列文章主要记录下我们针对这次业务增长,对于我们后台微服务系统做的通用技术优化,针对业务流程和缓存的优化由于只适用于我们的业务,这里就不再赘述了。本系列会分为如下几篇:

1.改进客户端负载均衡算法

2.开发日志输出异常堆栈的过滤插件

3.针对 x86 云环境改进异步日志等待策略

4.增加对于同步微服务的 HTTP 请求等待队列的监控以及云上部署,需要小心达到实例网络流量上限导致的请求响应缓慢

5.针对系统关键业务增加必要的侵入式监控

改进客户端负载均衡算法

Spring Cloud LoadBalancer 内置轮询算法以及问题

我们是用 Spring Cloud 作为我们的微服务体系,并且针对其中很多组件做了优化改造,请参考我的另一系列。之前我们的客户端负载均衡算法,是不同请求之间相互独立的轮询。由于我们实现的微服务框架会针对可以重试的请求进行重试,重试需要重试与之前不同的实例。没有重试,无法实现在线发布对于用户无感知,并且我们部署同一个微服务的不同实例是处于不同的可用区,并且微服务不会每次都全部出问题,而是某些实例出问题,有重试可以让某些实例出问题的时候,对用户无感知。当某些实例压力过大时,重试也可以让请求重试压力比较小的实例。使用 Spring Cloud LoadBalancer 的内置的负载均衡算法均无法满足我们的需求,所以我们针对其中的轮询算法进行了改进。原有的流程是:

1.获取服务实例列表

2.所有线程共用同一个原子变量 position,每次请求原子加 1

3.position 对实例个数取余,返回对应下标的实例进行调用

这样的算法问题是:假设有微服务 A 有两个实例:实例 1 和实例 2。请求 A 到达时,RoundRobinLoadBalancer 返回实例 1,这时有请求 B 到达,RoundRobinLoadBalancer 返回实例 2。然后如果请求 A 失败重试,RoundRobinLoadBalancer 又返回了实例 1。这不是我们期望看到的。

本次优化前,我们的负载均衡算法以及问题

在本次业务突增很多的改进之前,我们第一版改进后的流程是:

获取服务实例列表,将实例列表按照 ip 端口排序,如果不排序即使 position 是下一个可能也代表的是之前已经调用过的实例

根据请求中的 traceId,从本地缓存中以 traceId 为 key 获取一个初始值为随机数的原子变量 position,这样防止所有请求都从第一个实例开始调用,之后第二个、第三个这样。

position 原子加一,之后对实例个数取余,返回对应下标的实例进行调用

其中请求包含 traceId 是来自于我们使用了 spring-cloud-sleuth 链路追踪,基于这种机制我们能保证请求不会重试到之前已经调用过的实例。源码是:

//一定必须是实现ReactorServiceInstanceLoadBalancer
//而不是ReactorLoadBalancer<ServiceInstance>
//因为注册的时候是ReactorServiceInstanceLoadBalancer
@Log4j2
public class RoundRobinWithRequestSeparatedPositionLoadBalancer implements ReactorServiceInstanceLoadBalancer {
    private final ServiceInstanceListSupplier serviceInstanceListSupplier;
    //每次请求算上重试不会超过1分钟
    //对于超过1分钟的,这种请求肯定比较重,不应该重试
    private final LoadingCache<Long, AtomicInteger> positionCache = Caffeine.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES)
            //随机初始值,防止每次都是从第一个开始调用
            .build(k -> new AtomicInteger(ThreadLocalRandom.current().nextInt(0, 1000)));
    private final String serviceId;
    private final Tracer tracer;


    public RoundRobinWithRequestSeparatedPositionLoadBalancer(ServiceInstanceListSupplier serviceInstanceListSupplier, String serviceId, Tracer tracer) {
        this.serviceInstanceListSupplier = serviceInstanceListSupplier;
        this.serviceId = serviceId;
        this.tracer = tracer;
    }
    
    //每次重试,其实都会调用这个 choose 方法重新获取一个实例
    @Override
    public Mono<Response<ServiceInstance>> choose(Request request) {
        return serviceInstanceListSupplier.get().next().map(serviceInstances -> getInstanceResponse(serviceInstances));
    }

    private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> serviceInstances) {
        if (serviceInstances.isEmpty()) {
            log.warn("No servers available for service: " + this.serviceId);
            return new EmptyResponse();
        }
        return getInstanceResponseByRoundRobin(serviceInstances);
    }

    private Response<ServiceInstance> getInstanceResponseByRoundRobin(List<ServiceInstance> serviceInstances) {
        if (serviceInstances.isEmpty()) {
            log.warn("No servers available for service: " + this.serviceId);
            return new EmptyResponse();
        }
        //为了解决原始算法不同调用并发可能导致一个请求重试相同的实例
        //从 sleuth 的 Tracer 中获取当前请求的上下文
        Span currentSpan = tracer.currentSpan();
        //如果上下文不存在,则可能不是前端用户请求,而是其他某些机制触发,我们就创建一个新的上下文
        if (currentSpan == null) {
            currentSpan = tracer.newTrace();
        }
        //从请求上下文中获取请求的 traceId,用来唯一标识一个请求
        long l = currentSpan.context().traceId();
        AtomicInteger seed = positionCache.get(l);
        int s = seed.getAndIncrement();
        int pos = s % serviceInstances.size();
        log.info("position {}, seed: {}, instances count: {}", pos, s, serviceInstances.size());
        return new DefaultResponse(serviceInstances.stream()
                //实例返回列表顺序可能不同,为了保持一致,先排序再取
                .sorted(Comparator.comparing(ServiceInstance::getInstanceId))
                .collect(Collectors.toList()).get(pos));
    }
}

但是在这次请求突增很多的时候,这种负载均衡算法还是给我们带来了问题。

首先,本次突增,我们并没有采取扩容,导致本次的性能压力对于压力的均衡分布非常敏感。举个例子是,假设微服务 A 有 9 个实例,在业务高峰点来的时候,最理想的情况是保证无论何时这 9 个负载压力都完全均衡,但是由于我们使用了初始值为随机数的原子变量 position,虽然从一天的总量上来看,负责均衡压力肯定是均衡,但是在某一小段时间内,很可能压力全都跑到了某几个实例上,导致这几个实例被压垮,熔断,然后又都跑到了另外的几个实例上,又被压垮,熔断,如此恶性循环。

然后,我们部署采用的是 k8s 部署,同一个虚拟机上面可能会跑很多微服务的 pod。在某些情况下,同一个微服务的多个 pod 可能会跑到同一个虚拟机 Node 上,这个可以从pod 的 ip 网段上看出来:例如某个微服务有如下 7 个实例:10.238.13.12:8181,10.238.13.24:8181,10.238.15.12:8181,10.238.17.12:8181,10.238.20.220:8181,10.238.21.31:8181,10.238.21.121:8181,那么 10.238.13.12:8181 与 10.238.13.24:8181 很可能在同一个 Node 上,10.238.21.31:8181 和 10.238.21.121:8181 很可能在同一个 Node 上。我们重试,需要优先重试与之前重试过的实例尽量不在同一个 Node 上的实例,因为同一个 Node 上的实例只要有一个有问题或者压力过大,其他的基本上也有问题或者压力过大。

最后,如果调用某个实例一直失败,那么这个实例的调用优先级需要排在其他正常的实例后面。这个对于减少快速刷新发布(一下子启动很多实例之后停掉多个老实例,实例个数大于重试次数配置)对于用户的影响,以及某个可用区突然发生异常导致多个实例下线对用户的影响,以及业务压力已经过去,压力变小后,需要关掉不再需要的实例,导致大量实例发生迁移的时候对用户的影响,有很大的作用。

针对以上问题的优化方案

我们针对上面三个问题,提出了一种优化后的解决方案:

1.针对每次请求,记录:

2.本次请求已经调用过哪些实例 -> 请求调用过的实例缓存

3.调用的实例,当前有多少请求在处理中 -> 实例运行请求数

4.调用的实例,最近请求错误率 -> 实例请求错误率

5.随机将实例列表打乱,防止在以上三个指标都相同时,总是将请求发给同一个实例。

6.按照 当前请求没有调用过靠前 -> 错误率越小越靠前 的顺序排序 -> 实例运行请求数越小越靠前

7.取排好序之后的列表第一个实例作为本次负载均衡的实例

具体实现是:以下的代码来自于:https://github.com/JoJoTec/spring-cloud-parent

我们使用了依赖:

<dependency>
    <groupId>io.dropwizard.metrics</groupId>
    <artifactId>metrics-core</artifactId>
</dependency>

记录实例数据的缓存类:

@Log4j2
public class ServiceInstanceMetrics {
	private static final String CALLING = "-Calling";
	private static final String FAILED = "-Failed";

	private MetricRegistry metricRegistry;

	ServiceInstanceMetrics() {
	}

	public ServiceInstanceMetrics(MetricRegistry metricRegistry) {
		this.metricRegistry = metricRegistry;
	}

	/**
 * 记录调用实例
 * @param serviceInstance
 */
	public void recordServiceInstanceCall(ServiceInstance serviceInstance) {
		String key = serviceInstance.getHost() + ":" + serviceInstance.getPort();
		metricRegistry.counter(key + CALLING).inc();
	}
	/**
 * 记录调用实例结束
 * @param serviceInstance
 * @param isSuccess 是否成功
 */
	public void recordServiceInstanceCalled(ServiceInstance serviceInstance, boolean isSuccess) {
		String key = serviceInstance.getHost() + ":" + serviceInstance.getPort();
		metricRegistry.counter(key + CALLING).dec();
		if (!isSuccess) {
			//不成功则记录失败
			metricRegistry.meter(key + FAILED).mark();
		}
	}

	/**
 * 获取正在运行的调用次数
 * @param serviceInstance
 * @return
 */
	public long getCalling(ServiceInstance serviceInstance) {
		String key = serviceInstance.getHost() + ":" + serviceInstance.getPort();
		long count = metricRegistry.counter(key + CALLING).getCount();
		log.debug("ServiceInstanceMetrics-getCalling: {} -> {}", key, count);
		return count;
	}

	/**
 * 获取最近一分钟调用失败次数分钟速率,其实是滑动平均数
 * @param serviceInstance
 * @return
 */
	public double getFailedInRecentOneMin(ServiceInstance serviceInstance) {
		String key = serviceInstance.getHost() + ":" + serviceInstance.getPort();
		double rate = metricRegistry.meter(key + FAILED).getOneMinuteRate();
		log.debug("ServiceInstanceMetrics-getFailedInRecentOneMin: {} -> {}", key, rate);
		return rate;
	}
}

负载均衡核心代码:

private final LoadingCache<Long, Set<String>> calledIpPrefixes = Caffeine.newBuilder()
        .expireAfterAccess(3, TimeUnit.MINUTES)
        .build(k -> Sets.newConcurrentHashSet());
private final String serviceId;
private final Tracer tracer;
private final ServiceInstanceMetrics serviceInstanceMetrics;

//每次重试,其实都会调用这个 choose 方法重新获取一个实例
@Override
public Mono<Response<ServiceInstance>> choose(Request request) {
    Span span = tracer.currentSpan();
    return serviceInstanceListSupplier.get().next()
            .map(serviceInstances -> {
                //保持 span 和调用 choose 的 span 一样
                try (Tracer.SpanInScope cleared = tracer.withSpanInScope(span)) {
                    return getInstanceResponse(serviceInstances);
                }
            });
}


private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> serviceInstances) {
    if (serviceInstances.isEmpty()) {
        log.warn("No servers available for service: " + this.serviceId);
        return new EmptyResponse();
    }
    //读取 spring-cloud-sleuth 的对于当前请求的链路追踪上下文,获取对应的 traceId
    Span currentSpan = tracer.currentSpan();
    if (currentSpan == null) {
        currentSpan = tracer.newTrace();
    }
    long l = currentSpan.context().traceId();
    return getInstanceResponseByRoundRobin(l, serviceInstances);
}

@VisibleForTesting
public Response<ServiceInstance> getInstanceResponseByRoundRobin(long traceId, List<ServiceInstance> serviceInstances) {
    //首先随机打乱列表中实例的顺序
    Collections.shuffle(serviceInstances);
    //需要先将所有参数缓存起来,否则 comparator 会调用多次,并且可能在排序过程中参数发生改变(针对实例的请求统计数据一直在并发改变)
    Map<ServiceInstance, Integer> used = Maps.newHashMap();
    Map<ServiceInstance, Long> callings = Maps.newHashMap();
    Map<ServiceInstance, Double> failedInRecentOneMin = Maps.newHashMap();
    serviceInstances = serviceInstances.stream().sorted(
            Comparator
                    //之前已经调用过的网段,这里排后面
                    .<ServiceInstance>comparingInt(serviceInstance -> {
                        return used.computeIfAbsent(serviceInstance, k -> {
                            return calledIpPrefixes.get(traceId).stream().anyMatch(prefix -> {
                                return serviceInstance.getHost().contains(prefix);
                            }) ? 1 : 0;
                        });
                    })
                    //当前错误率最少的
                    .thenComparingDouble(serviceInstance -> {
                        return failedInRecentOneMin.computeIfAbsent(serviceInstance, k -> {
                            double value = serviceInstanceMetrics.getFailedInRecentOneMin(serviceInstance);
                            //由于使用的是移动平均值(EMA),需要忽略过小的差异(保留两位小数,不是四舍五入,而是直接舍弃)
                            return ((int) (value * 100)) / 100.0;
                        });
                    })
                    //当前负载请求最少的
                    .thenComparingLong(serviceInstance -> {
                        return callings.computeIfAbsent(serviceInstance, k ->
                                serviceInstanceMetrics.getCalling(serviceInstance)
                        );
                    })
    ).collect(Collectors.toList());
    if (serviceInstances.isEmpty()) {
        log.warn("No servers available for service: " + this.serviceId);
        return new EmptyResponse();
    }
    ServiceInstance serviceInstance = serviceInstances.get(0);
    //记录本次返回的网段
    calledIpPrefixes.get(traceId).add(serviceInstance.getHost().substring(0, serviceInstance.getHost().lastIndexOf(".")));
    //目前记录这个只为了兼容之前的单元测试(调用次数测试)
    positionCache.get(traceId).getAndIncrement();
    return new DefaultResponse(serviceInstance);
}

一些组内关于方案设计的取舍 Q&A

1. 为何没有使用所有微服务共享的缓存来保存调用数据,来让这些数据更加准确

共享缓存的可选方案包括将这些数据记录放入 Redis,或者是 Apache Ignite 这样的内存网格中。但是有两个问题:

(1)如果数据记录放入 Redis 这样的额外存储,如果 Redis 不可用会导致所有的负载均衡都无法执行。如果放入 Apache Ignite,如果对应的节点下线,那么对应的负载均衡也无法执行。这些都是不能接受的。

(2)假设微服务 A 需要调用微服务 B,可能 A 的某个实例调用 B 的某个实例有问题,但是 A 的其他实例调用 B 的这个实例却没有问题,例如当某个可用区与另一个可用区网络拥塞的时候。如果用同一个缓存 Key 记录 A 所有的实例调用 B 这个实例的数据,显然是不准确的。

每个微服务使用本地缓存,记录自己调用其他实例的数据,在我们这里看来,不仅是更容易实现,也是更准确的做法。

2. 采用 EMA 的方式而不是请求窗口的方式统计最近错误率

采用请求窗口的方式统计,肯定是最准确的,例如我们统计最近一分钟的错误率,就将最近一分钟的请求缓存起来,读取的时候,将缓存起来的请求数据加在一起取平均数即可。但是这种方式在请求突增的时候,可能会占用很多很多内存来缓存这些请求。同时计算错误率的时候,随着缓存请求数的增多也会消耗更大量的 CPU 进行计算。这样做很不值得。

EMA 这种滑动平均值的计算方式,常见于各种性能监控统计场景,例如 JVM 中 TLAB 大小的动态计算,G1 GC Region 大小的伸缩以及其他很多 JVM 需要动态得出合适值的地方,都用这种计算方式。他不用将请求缓存起来,而是直接用最新值乘以一个比例之后加上老值乘以 (1 - 这个比例),这个比例一般高于 0.5,表示 EMA 和当前最新值更加相关。

但是 EMA 也带来另一个问题,我们会发现随着程序运行小数点位数会非常多,会看到类似于如下的值:0.00000000123, 0.120000001, 0.120000003, 为了忽略过于细致差异的影响(其实这些影响也来自于很久之前的错误请求),我们只保留两位小数进行排序


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK