8

Apache Doris在京东搜索实时OLAP中的应用实践

 3 years ago
source link: https://mp.weixin.qq.com/s?__biz=MzUyMDA4OTY3MQ%3D%3D&%3Bmid=2247494301&%3Bidx=1&%3Bsn=1d6d8a29af8710dd74d71714b0fef01d
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

NVzuAz.gif!mobile

1、前言

本文讨论了京东搜索在实时流量数据分析方面,利用Apache Flink和Apache Doris进行的探索和实践。流式计算在近些年的热度与日俱增,从Google Dataflow论文的发表,到Apache Flink计算引擎逐渐站到舞台中央,再到Apache Druid等实时分析型数据库的广泛应用,流式计算引擎百花齐放。但不同的业务场景,面临着不同的问题,没有哪一种引擎是万能的。我们希望京东搜索业务在流计算的应用实践,能够给到大家一些启发,也欢迎大家多多交流,给我们提出宝贵的建议。

2、搜索业务形态

京东集团的新使命是“技术为本,致力于更高效和可持续的世界”。京东搜索作为电商平台的一个入口,为众多商家与用户提供连接的纽带。京东搜索发挥着导流的作用,给用户提供表达需求的入口;为了正确理解用户意图,将用户的需求进行高效的转化,线上同时运行着多个AB实验算法,遍及POP形态与自营形态的多个商品,而这些商品所属的品类、所在的组织架构以及品牌店铺等属性,都需要在线进行监控,以衡量转化的效果和承接的能力。

3、实时技术的挑战

目前搜索上层应用业务对实时数据的需求,主要包含三部分内容:

1、 搜索整体数据的实时分析。

2、 AB实验效果的实时监控。

3、 热搜词的Top榜单以反映舆情的变化。

这三部分数据需求,都需要进行深度的下钻,维度细化需要到SKU粒度。同时我们也承担着搜索实时数据平台的建设任务,为下游用户输出不同层次的实时流数据。

我们的用户包括搜索的运营、产品、算法以及采销人员。虽然不同用户关心的数据粒度不同、时间频率不同、维度也不同,但是我们希望能够建立统一的实时OLAP数据仓库,并提供一套安全、可靠的、灵活的实时数据服务。

目前每日新增的曝光日志达到几亿条记录,而拆分到SKU粒度的日志则要翻10倍,再细拆到AB实验的SKU粒度时,数据量则多达上百亿记录,多维数据组合下的聚合查询要求秒级响应时间,这样的数据量也给团队带来了不小的挑战。

m2QJRfI.png!mobile

4、实时技术架构演进

我们之前的方案是以Apache Storm引擎进行点对点的数据处理,这种方式在业务需求快速增长的阶段,可以快速的满足实时报表的需求。但是随着业务的不断发展、数据量逐渐增加以及需求逐渐多样化,弊端随之产生。例如灵活性差、数据一致性无法满足、开发效率较低、资源成本增加等。

IVfYRj.png!mobile

为解决之前架构出现的问题,我们首先进行了架构升级,将storm引擎替换为Apache Flink,用以实现高吞吐、exactly once的处理语义。同时根据搜索数据的特点,将实时数据进行分层处理,构建出PV流明细层、SKU流明细层和AB实验流明细层,期望基于不同明细层的实时流,构建上层的实时OLAP层。

OLAP层的技术选型,需要满足以下几点:

1:数据延迟在分钟级,查询响应时间在秒级

2:标准SQL交互引擎,降低使用成本

3:支持join操作,方便维度增加属性信息

4:流量数据可以近似去重,但订单行要精准去重

5:高吞吐,每分钟数据量在千万级记录,每天数百亿条新增记录

6:前端业务较多,查询并发度不能太低

通过对比目前业界广泛使用的支持实时导入的OLAP引擎,我们在druid、ES、clickhouse和doris之间做了横向比较:

7FrEBbr.png!mobile

通过对比开源的几款实时OLAP引擎,我们发现doris和clickhouse能够满足我们的需求,但是clickhouse的并发度太低是个潜在的风险,而且clickhouse的数据导入没有事务支持,无法实现exactly once语义,对标准sql的支持也是有限的。

最终,我们选定doris作为聚合层,用于实时OLAP分析。对于流量数据,使用聚合模型建表;对于订单行,我们将聚合模型换成Uniq模型,保证同一个订单最终只会存储一条记录,从而达到订单行精准去重的目的。在flink处理时,我们也将之前的任务拆解,将反复加工的逻辑封装,每一次处理都生成新的topic流,明细层细分了不同粒度的实时流。新方案如下:

eqArY3y.png!mobile

目前的技术架构中,flink的任务是非常轻的,state状态非常小,并没有使用KeyedState自定义状态,而OperatorState中只包含kafka的offset信息,这样保证任务的运行开销很小,稳定性大大提升。同时基于生产的数据明细层,我们直接使用了doris来充当聚合层的功能,将原本可以在flink中实现的窗口计算,下沉到doris中完成。利用doris的routine load消费实时数据,虽然数据在导入前是明细粒度,但是基于聚合模型,导入后自动进行异步聚合。而聚合度的高低,完全根据维度的个数与维度的基数决定。通过在base表上建立rollup,在导入时双写或多写并进行预聚合操作,这有点类似于物化视图的功能,可以将数据进行高度的汇总,以提升查询性能。

在明细层采用kafka直接对接到doris,还有一个好处就是这种方式天然的支持数据回溯。数据回溯简单说就是当遇到实时数据的乱序问题时,可以将“迟到”的数据进行重新计算,更新之前的结果。这是因为我们导入的是明细数据,延迟的数据无论何时到达都可以被写入到表中,而查询接口只需要再次进行查询即可获得最新的计算结果。最终方案的数据流图如下:

NRBfUvY.png!mobile

5、技术取舍

实时数据处理的技术实现,需要在低延迟、准确性和成本之间进行取舍。我们采用doris作为实时仓库的聚合层,其实也是在多方面进行了取舍。例如:

1、routine load的导入任务,为了达到更高的写入吞吐量,我们将实时导入任务的最大时间间隔设置了30s,即增加了导入延迟,换来了更大的吞吐

2、为了降低开发成本,节省计算资源,我们通过建立rollup来支持快速的查询需求,但是却增加了存储压力以及写入时的IO压力

3、PV、UV等流量指标在聚合时采用的是HLL计算,降低了精度,换来了更短的查询响应时间

以上几点取舍,是结合业务场景与需求的要求而决定的,并非绝对的情况。所以,面对实时数据大规模、无界、乱序等特点,实时流计算的选型,最终考虑的就是如何取舍。

euuUBrj.png!mobile

6、Doris在大促期间的优化

上文提到我们在doris中建立了不同粒度的聚合模型,包括PV粒度、SKU粒度以及AB实验粒度。我们这里以每日生产数据量最大的曝光AB实验模型为例,阐述在doris中如何支持大促期间每日新增百亿条记录的查询的。

AB实验的效果监控,业务上需要10分钟、30分钟、60分钟以及全天累计等四个时间段,同时需要根据渠道、平台和一二三级品类等维度进行下钻分析,观测的指标则包含曝光PV、UV、曝光SKU件次、点击PV、点击UV等基础指标,以及CTR等衍生指标。

在数据建模阶段,我们将曝光实时数据建立聚合模型,其中K空间包含日期字段、分钟粒度的时间字段、渠道、平台、一二三级品类等,V空间则包含上述的指标列,其中UV和PV进行HLL近似计算,而SKU件次则采用SUM函数,每到来一条记录则加1。由于AB实验数据都是以AB实验位作为过滤条件,因此将实验位字段设置为分桶字段,查询时能够快速定位tablet分片。值得注意的是,HLL的近似度在目前PV和UV的基数下,实际情况误差在0.8%左右,符合预期。

目前doris的集群共30+台BE,存储采用的是支持NVMe协议的SSD硬盘。AB实验曝光topic的分区数为40+,每日新增百亿条数据。在数据导入阶段,我们主要针对导入任务的三个参数进行优化:最大时间间隔、最大数据量以及最大记录数。当这3个指标中任何一个达到设置的阈值时,任务都会触发导入操作。为了更好的了解任务每次触发的条件,达到10分钟消费6亿条记录的压测目标,我们通过间隔采样的方法,每隔3分钟采样一次任务的情况,获取Statistic信息中的receivedBytes、cimmittedTaskNum、loadedRows以及taskExecuteTimeMs数值。通过对上述数值在前后2个时间段的差值计算,确定每个任务触发的条件,并调整参数,以在吞吐和延迟之间进行平衡,最终达到压测的要求。

为了实现快速的多维数据查询,基于base表建立了不同的rollup,同时每个rollup的字段顺序,也要遵循过滤的字段尽可能放到前面的原则,充分利用前缀索引的特性。这里并不是rollup越多越好,因为每个rollup都会有相应的物理存储,每增加一个rollup,在写入时就会增加一份IO。最终我们在此表上建立了2个rollup,在要求的响应时间内尽可能多的满足查询需求。

7、总结与展望

京东搜索是在今年5月份引入doris的,第一个应用的上线到现在已经运行半年时间。目前集群版本是0.11.33,规模是30+台BE,线上同时运行着10+ 个routine load任务,每日新增数据条数在200亿+,已经成为京东体量最大的doris用户。 从结果看,用doris替换flink的窗口计算,既可以提高开发效率,适应维度的变化,同时也可以降低计算资源,用doris充当实时数据仓库的聚合层,并提供统一的接口服务,保证了数据的一致性和安全性。

我们在使用中也遇到了查询相关的、任务调度相关的bug,也在推动京东OLAP平台升级到0.12版本。接下来待版本升级后,我们计划使用bitmap功能来支持UV等指标的精准去重操作,并将推荐实时业务应用doris实现。除此之外,为了完善实时数仓的分层结构,为更多业务提供数据输入,我们也计划使用适当的flink窗口开发聚合层的实时流,增加数据的丰富度和完整度。

作者:李哲,京东搜推数据开发工程师,曾就职于美团点评,主要从事离线数据开发、流计算开发以及OLAP多维查询引擎的应用开发。

6fUf2m2.png!mobile


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK