50

Table/SQL SCAN优化:应对source及上游的分区倾斜

 5 years ago
source link: https://www.tuicool.com/articles/buABf2j
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

好吧,终于要发一篇不是纯粹的广告文了。这些广告推文都是前段时间预先编排、排期好的,只是恰好排在在前几天发。

在正文之前,我想再提一下昨天的当当网图书大促推文 号外!当当网图书大促!送你专享优惠码!满减后再减!(400减230) (活动持续到6月3日,时间依然很充裕)。这次大促的优惠力度真的相当大,我是不希望大家错过一年难得一次的机会,优惠满减能帮你省下很多钱。建议大家不要一年散开来买太多次书,囤积起来一年在搞活动的时候买一两次就够了。你可以用省下的钱去做更多有意义的事情,比如这些钱绝对够买很多优秀的极客时间的专栏!

bQvEnmR.jpg!web

极客时间的专栏我买了不少,每天上下班路上都在看。很多工作多年的人,都有相似的感触,你的技能树看起来已经好久没变了,你的实际有效经验增长得也非常缓慢,周围的人或许也无法帮你摆脱技术上认知的束缚。这个时候如果你不觉得你应该多充电的话,那么随着你年龄的增长你会感觉到痛苦的。

言归正传,计算领域早已挂起SQL旋风,SQL是数据处理的标准语言,另一方面作为DSL提供了对一些复杂程序逻辑的良好抽象(抽象语法树[AST]可以表述大部分场景下data flow的DAG逻辑)。

但是从事物的两面性来看,抽象层级越高,你对底层细节的掌控就越困难。比如我们最近就遇到的一种情况:上游中间件中的分区数据出现了倾斜,那么反应到底层DAG中,数据处理的pipeline也会因此而倾斜(group by之前,我们暂且不讨论常规的hash分区造成的倾斜),而从纯粹的Flink SQL层面上我们却不太好控制它们的均衡方式。但如果我们是用DataStream API编写Job,那么事情就会简单很多:因为DataStream API给我们提供了shuffle的API,我们只需要在source后调用 rebalance 或其他shuffle类型的API,然后再接map算子进行数据解析,甚至可以单独提升map算子的并行度来提升数据解析的性能。

抽象程度高,绝大部分情况下是好事,就好像我们操作系统的UI一样。但是当你需要更多的个性化或者更高的控制权,你会一层一层向下找,从控制面板->注册表->BIOS->甚至BIOS高级设置。最后你还是不得不对细节有更多的了解。

其实在这里我们也一样,要解决这个问题,我们就希望控制SQL底层的shuffle机制,我们也需要沿着这个路径一层层向下找。我们从逻辑计划找到DataStream API对应的物理计划,然后在上层开放一个配置让用户来控制它。当然我们除了要从抽象层级路径向下找,我们还需要从SQL对应的语法树向下找,我们常规的Table Source其实对应着from子句(在AST中对应着table scan)。

所以我们很直接地找到table scan对应的物理计划所在的类 StreamTableSourceScan 以及 DataStreamScan (这两个类都继承自 StreamScan 接口)。为什么会有两个类呢,其实他们有各自的场景:

  • StreamTableSourceScan: 这是直接应对用户注册table source所对应的scan

  • DataStreamScan: 这是应对  DataStreamSource 也就是流转表(调用  tEnv#fromDataStream )的情况

无论以上哪个类,都存在 translateToPlan 方法用来转物理计划,我们主要关注这个方法。通过分析它的实现,我们看到真正对DataStream API的调用位于 StreamScan#convertToInternalRow 方法中。

OK,我们再分析一下我们的需求,在source后均衡一下上游分区中不均衡的数据,我们不能做的是什么: 改变SQL本身的分区策略 ,如果用户的SQL语句中包含 groupby 子句,那么我们不能对它的语义产生影响。我们看一下scan对应的DataStream API的调用,核心逻辑如下:

这里从 else/elseif 中我们看到,存在Row->CRow的过程,无论是 input->map 还是 input->process ,我们都可以在input后调用shuffle API,来重均衡数据,从而变成这样的一个处理链:input-> shuffle -> (row->crow),由于我们把shuffle加在转crow之前,所以它并不会破坏SQL本身的partition逻辑。

大致上,我们的改进逻辑如下:

//人为shuffle

//人为shuffle

这里我们可以让用户选择它想要的shuffle策略,但核心的原则是: 不能改变原始的数据量 ,比如像broadcast这种肯定是不能支持的。其实真正最有用的主要是 REBALANCE 。OK, 以上的示例代码展示了我们支持用户选择shuffle的策略,那么我们是如何让用户配置的呢。我们来看一下这个方法的参数列表:

这里我们看到一个 TableConfig 类型的config参数,它一直从Table/SQL 程序上下文穿透到物理计划层,为用户的程序提供相应的配置。我们只需要在它里面定义一个分区的枚举类型,然后开放相应的getter/setter即可。

另外,在你添加shuffle之后,会打破默认的operator chain,这时我们在生成JobGraph之后就可以针对shuffle之后的算子针对性地调整它的并行度,如果你们已经扩展了这个功能的话。没错,我们的Oceanus平台已经提供了这个功能。

AbuMfeA.jpg!web

RnMbmia.jpg!web


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK