1

供应链时效域接口性能进阶之路

 1 year ago
source link: https://www.51cto.com/article/722864.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

供应链时效域历经近一年的发展,在预估时效方面沉淀出了一套理论和两把利器(预估模型和路由系统)。以现货为例,通过持续的技术方案升级,预估模型的准确率最高接近了90%,具备了透出给用户的条件。但在接入前台场景的过程中,前台对我们提出接口性能的要求。

以接入的商详浮层场景为例,接口调用链路经过商详、出价、交易,给到我们供应链只有15ms的时间,在15ms内完成所有的业务逻辑处理是一个不小的挑战。

图片
图片

二、初始状态 - 春风得意马蹄疾

抛开业务场景聊接口性能就是耍流氓。时效预估接口依赖于很多数据源:模型基础数据、模型兜底数据、仓库数据、SPU类目数据、卖家信息数据等,如何快速批量获取到内存中进行逻辑运算,是性能提升的关键。

最先接入时效表达的是现货业务,最初的查询单个现货SKU时效的接口调用链路如下:

图片

根据trace分析,接口性能的瓶颈在于数据查询,而不在于逻辑处理,数据查询后的逻辑处理耗时只占0.6%。

数据查询又分为外部查询和内部查询。外部查询为3次RPC调用(耗时占比27%),内部查询为11次DB查询(耗时占比73%)。

为什么会有这么多次内部查询?因为预估模型是分段的,每段又根据不同的影响因子有不同的兜底策略,无法聚合成一次查询。

单个SKU时效查询都达到了76.5ms,以商详浮层页30个现货SKU时效批量查询估算,一次请求需要76.5*30=2295ms,这是不可接受的,性能提升刻不容缓。

三、优化Round

1 - 昨夜西风凋碧树

3.1 内部查询优化

由于内部查询需要的预估模型数据都是离线清洗,按天级别同步的,对实时性要求不高,有多种方案可以选择:

离线处理好后刷MySQL

现有方案,无开发成本

查询性能一般

查询性能不满足要求,不采用

离线处理好后刷到Redis

查询性能好

数据量过大时成本较高

离线处理好后刷到本地内存

查询性能很好

对数据量有限制

模型数据量约为15G,方案不可行

最终选择方案二,离线数据同步到Redis中。由于模型数据量增幅不大,每天的同步更多的是覆盖,故采用32G实例完全能满足要求。

图片

3.2 外部查询优化

将三个RPC查询接口逐个分析,找到优化方案:

城市名称转code

由于城市名称和code 的映射关系数据仅约20K左右,可以在应用启动时请求一次后放入本地缓存。另外城市名称和code发生变化的频率很低,通过jetcache的@CacheRefresh每隔8小时自动刷新完全满足要求

获取卖家信息

Redis缓存

由于得物全量卖家数据量较大,不适合放在本地缓存,且卖家信息是低频变化数据,可以采用T+1同步到Redis

获取商品类目

Redis缓存

同样商品类目数据也是低频变化数据,采用T+1同步到Redis

3.3优化后效果

图片

优化后的效果很明显,单个SKU时效查询RT已从76.5ms降低至27ms,同时减少了对外部域的直接依赖,一定程度上提升了稳定性。

27ms仍然没法满足要求。当前的瓶颈在查询Redis上(耗时占比96%),是否可以再进一步优化?

四、优化Round

2 - 衣带渐宽终不悔

通过上述分析,可以看到目前的耗时集中在一次次的Redis I/O操作中,如果将一组Redis命令进行组装,通过一次传输给Redis并返回结果,可以大大地减少耗时。

4.1 pipeline原理

Redis客户端执行一条命令分为如下四个过程:

1)发送命令

2)命令排队

3)执行命令

4)返回结果

其中1-4称为Round Trip Time(RTT,往返时间)。pipeline通过一次性将多Redis命令发往Redis服务端,大大减少了RTT。

图片

4.2优化和效果

虽然Redis提供了像mget、mset这种批量接口,但Redis不支持hget批量操作,且不支持mget、hget混合批量查询,只能采用pipeline。另外我们的场景是多key读场景,并且允许一定比例(少概率事件)读失败,且pipeline中的其中一条读失败(pipeline是非原子性的),也不会影响时效预估,因为有兜底策略,故非常适合。

图片

由于Redis查询之间存在相互依赖,上次查询的结果需要作为下次查询的入参,故无法将所有redis查询合并成一个Redis pipeline。虽然最终仍然存在3次Redis I/O,但7ms的RT满足了要求。

4.3 代码

// pipeline查询类public class RedisBasePipelineRegister {    // 存放查到的数据    private ThreadLocal<Map<String, Object>> context = ThreadLocal.withInitial(HashMap::new);
    // 查询    public void fetch(final RedisConsumers redisConsumers){        if (redisConsumers.isNotEmpty()){            List<Object> ret = redisClient.executePipelined((RedisCallback<Object>) connection -> {                connection.openPipeline();                redisConsumers.get().forEach(t -> t.accept(connection));                return null;            });            addValueToContext(ret,redisConsumers.getKeyList());        }    }
    /**     * 将pipeline查到的数据存入threadlocal中     * 注意,redis读取的数据可能是空的,如果是空,会填充一个null obj,这样可以防止后面用的时候,发现thread local里面没有数据,重新查redis     */    private void addValueToContext(List<Object> val, List<String> keys) {        Map<String, Object> t = context.get();        IntStream.range(0, keys.size())                .forEach(i -> t.put(keys.get(i), val.get(i) == null ? NULL_OBJ : val.get(i)));    }
    public Object get(String key) {        return context.get().get(key);    }}

// redis查询类public class RedisClient {
    // threadlocal没查到,再查redis(兜底)    public Object get(String key) {        Object value = Optional.ofNullable(redisBatchPipelineRegister.get(key)).orElseGet(()->                redisTemplate.opsForValue().get(key)        );        return value;    }}

即使pipeline部分失败后,可用Redis单指令查询作为兜底。

五、优化Round

3 - 众里寻他千百度

5.1 背景

随着时效预估的准确率在寄售、品牌直发、保税等业务场景中满足要求后,越来越多的业务类型需要接入时效表达接口。最初为了快速上线,交易在内部根据出价类型串行多次调时效预估接口,导致RT压力越来越大。出于领域内聚考虑,与交易开发讨论后,由时效域提供不同出价类型的聚合接口,同时保证聚合接口的RT性能。

图片

自此,进入并发区域。

5.2 ForkJoinPool vs ThreadPoolExecutor

Java7 提供了ForkJoinPool来支持将一个任务拆分成多个“小任务”并行计算,再把多个“小任务的结果合并成总的计算结果。ForkJoinPool的工作窃取是指在每个线程中会维护一个队列来存放需要被执行的任务。当线程自身队列中的任务都执行完毕后,它会从别的线程中拿到未被执行的任务并帮助它执行,充分利用多核CPU的优势。下图为ForkJoinPool执行示意:

图片

而Java8的并行流采用共享线程池(默认也为ForkJoinPool线程池),性能不可控,故不考虑。

ForkJoinPool

ForkJoinPool能用使用数据有限的线程来完成非常多的父子关系任务。由于工作窃取机制,在多任务且任务分配不均情况具有优势。

1.不存在父子关系任务。

2.获取不同出价类型的时效RT相近,不存在任务分配不均匀情况。

ThreadPoolExecutor

ThreadPoolExecutor不会像ForkJoinPool一样创建大量子任务,不会进行大量GC,因此单线程或任务分配均匀情况下具有优势。

选定ThreadPoolExecutor后,需要考虑如何设计参数。根据实际情况分析,交易请求时效QPS峰值为1000左右,而我们一个请求一般会拆分3~5个线程任务,不考虑机器数的情况下,每秒任务数量:taskNum = 3000~5000。单个任务耗时taskCost = 0.01s 。上游容忍最大响应时间 responseTime = 0.015s。

1)核心线程数 = 每秒任务数 * 单个任务耗时

corePoolSize = taskNum * taskCost = (3000 ~ 5000) * 0.01 = 30 ~ 50,取40

2)任务队列容量 = 核心线程数 / 单个任务耗时 * 容忍最大响应时间

queueCapacity = corePoolSize / taskCost * responseTime = 40 / 0.01 * 0.015 = 60

3)最大线程数 = (每秒最大任务数 - 任务队列容量)* 每个任务耗时

maxPoolSize = (5000 - 60) * 0.01 ≈  50

当然上述计算都是理论值,实际有可能会出现未达最大线程数,cpu load就打满的情况,需要根据压测数据来最终确定ThreadPoolExecutor的参数。

5.3优化和压测

经优化和压测后聚合接口平均RT从22.8ms(串行)降低为8.52ms(并行),99线为13.22ms,满足要求。

图片
图片

按单机300QPS(高于预估峰值QPS两倍左右)进行压测,接口性能和线程池运行状态均满足。

图片
图片
图片

最终优化后应用内调用链路示意图如下:

图片

5.4 代码

// 并行时效预估类public class ConcurrentEstimateCaller {    // 自定义线程池    private Executor executor;    // 时效预估策略工厂    private EstimateStrategyFactory estimateStrategyFactory;
    //存放异步返回的结果,KEY为出价类型,VALUE为对应的时效结果        private ConcurrentHashMap<String, CompletableFuture<List<PromiseEstimateRes>>> futures = new ConcurrentHashMap<>();
    // 提交并行任务public ConcurrentEstimateCaller submit(PromiseEstimateAggreRequest request)        for (String scene : request.getMap().keySet()) {            futures.put(scene, CompletableFuture.supplyAsync(() -> {
                EstimateStrategy estimateStrategy = estimateStrategyFactory.getStrategy(scene);
                if (estimateStrategy != null) {                    Result<PromiseEstimateBatchResponse> tmp =                            estimateStrategy.promiseEstimateBatch(EstimateAggreConvertor.INSTANCE.convertBatchRequest(request, scene));                    if (Result.SUCCESS_CODE.equals(tmp.getCode())) {                        return tmp.getData().getEstimateRes();                    }                }                return null;
            }, executor));        }    }
    // 指定时间内等待和获取所有子任务返回结果    public Map<String, List<PromiseEstimateRes>> join(long timeout, TimeUnit unit) throws Exception {        // 等待所有的子任务执行完成        CompletableFuture.allOf(futures.values().toArray(new CompletableFuture[]{})).get(timeout, unit);
        Map<String, List<PromiseEstimateRes>> res = new HashMap<>();
        for (Map.Entry<String, CompletableFuture<List<PromiseEstimateRes>>> entry : futures.entrySet()) {
            if (entry.getValue().get() != null) {                res.put(entry.getKey(), entry.getValue().get());            }        }
        return res;    }}

接口性能进阶之路随着业务的变化和技术的升级永无止境。

分享一些建设过程中的Tips:

如果Redis和服务机器不在同一个区域会增加几ms的跨区传输耗时,所以对RT敏感的场景,如果机器不同于Redis区域,可以让运维帮忙重建机器。

阻塞队列可以采用SynchronousQueue来提高响应时间,但需要保证有足够多的消费者(线程池里的消费者),并且总是有一个消费者准备好获取交付的工作,才适合使用。

后续建设的一些思路:随着业务和流量的增长,线程池参数如何在不重启机器的情况下自动调整,可以参考美团开源的DynamicTp项目对线程池动态化管理,同时添加监控、告警等功能。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK