42

Spark优化|Spark-SQL性能极致优化: Native Codegen Framework

 3 years ago
source link: http://mp.weixin.qq.com/s?__biz=MzU5OTQ1MDEzMA%3D%3D&%3Bmid=2247487960&%3Bidx=2&%3Bsn=0b0bcc135b5c308a2debee3ac816f9c8
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

EMR团队探索并开发了SparkSQL Native Codegen框架,为SparkSQL换了引擎,新引擎带来最高4倍性能提升,为EMR再次获取世界第一立下汗马功劳。来自阿里云EMR团队的周克勇将详细介绍Native Codegen框架。

本文整理自视频 https://developer.aliyun.com/live/43579

本次分享主要分为三部分,第一做这件事情的动机和背景,第二做的过程中解决的核心问题,最后是总结。

有些同学可能了解到,EMR团队今年4月份打破了大数据领域Benchmark TBCDS的世界纪录。在硬件完全相同的情况下,性能提升了一倍,从520万分提高到1100多万分。这个成绩的背后主要依赖两条技术,增强了Optimizer和Native Runtime。Optimizer层面,我们在之前工作的基础上,又做了诸如 CTE 、动态分区裁剪、小表广播、PK/FK、Fast Decimal等优化。

640?wx_fmt=png

大家如果关注刚结束的SparkSummit,会发现一些类似的技术,如动态分区裁剪已经进了最新的Spark3.0,EMR版本的Spark在几个月之前就支持了。另一条技术是Native Runtime,也是今天分享的主题,涵盖的主要工作,包括 Native Codegen、统一内存布局、Batch化执行框架,后续会详细介绍。

大家都知道 Optimizer的目的是获取最好的执行计划,主要技术包括states收集和Cost Model,难点是静态states不够准确,无法在Plan阶段准确预知Filter或join之后的数据量,因此对后续Plan的代价评估不够准确。

今年SparkSummit发布的adaptive Execution,就把动态stats收集和plan优化结合在一起来解决这个问题。相对应的 Runtime的目的是针对选定的plan,如何使它跑得更快,长期以来 Runtime的主要工作基本上都聚焦在解决当下的新硬件瓶颈。如MapReduce刚出来时,网络带宽是瓶颈,所以Google做了很多locality方面的优化。Spark刚出来时解决的硬件瓶颈是磁盘I/O,它通过内存缓存来提升性能。

再后来 CPU成了新的瓶颈,我们可以看到从10年到20年,磁盘I/O和网络带宽都有了每年数量级的提升,但是CPU的主频基本上保持不变,因此CPU成了新的硬硬件瓶颈,提升CPU性能,成为近年来 Runtime领域重要的优化方向。优化CPU主要有两条技术路线,向量化和Codegen。我们先看一下传统的 SQL执行所应用的火山模型的问题所在,这是一个简单的Select加Filter加Project加Agg的例子。

640?wx_fmt=png

在执行的过程中,在火山模型中,每个算子都是一个迭代器,下游的算子,调上游算子的next方法,next返回当前算子处理之后的中间结果。这个模型最大的问题是每条record在经过每一个算子的时候,都要经过一次虚函数调用,而虚函数调用的开销是非常大的。

第二个问题就是在每个算子之间需要把中间的结果物化到内存。针对这个问题,向量化技术给出的解,是通过批量执行加列式存储,加小循环,来更好的利用 SIMD的指令和CPU的乱序执行,从而最大化数据并行度和指令并行度,从而分摊掉虚函数调用的开销,并提升执行性能。

例如上面例子里Agg 算子计算过程,他把输入 column1,column2以及 Agg的输出结果sum都存在数组里,然后通过一个很紧凑的for循环进行计算。由于循环足够简单,编译器会做循环展开和SIMD的优化。从截图中我们可以看到,编译器生成了很多向量化的指令,此外,由于for循环足够简单,然后for循环内部基本上都是访存指令,如访问colum1的第i个数据,colum2的第i个数据,所以每次放循环最主要的时间都是在进行访存,而因为 for循环足够的短,所以CPU的乱序执行的窗口里,可以同时发射多条漏斗指令,从而解决了 Memory Wall的问题。

这个技术的代表是MonetDB/X100(2005),以及今年SparkSummit宣布的 photon(2020)技术,主要的缺点是中间缓存的数据量比较大,Codegen技术的给出的解释算子融合,他打破了Stage内部算子间的界限,拼出来跟原来的逻辑保持一致的裸的代码通常是一个大的for循环,然后把拼成的代码编译成可执行文件,这里 面展示的跨越的第一个Stage拼出来的代码,可以看到最外层是一个大的for循环,接下来是Filter,表达了 Filter算术的语义,然后在Filter的内部是Agg的语意,拼出的代码完全不存在迭代器和额外的函数调用,就像是一个新手手写的代码,而这种代码不存在任何框架上的Overhead,性能往往是最好的。

Spark的Codegen把拼成的代码交给 Janino模块做编译,在运行的时候直接load即时编译出来的class文件。Codegen技术的好处有几点:

1.用for循环代替了迭代器,完全消除了虚函数调用;

2.没有了雾化,中间数据都保存在寄存器里。它的缺点就是因为 for循环比较大,而且每次迭代执行的逻辑非常的复杂,所以很难应用SIMD的优化。这个技术的代表是Hyper和Apache Spark,尽管Spark的Java Codegen,相比之前有了数量级的提升,但依然有一些不足。首先是Java的性能还是弱于Native Code,二是Java语义的限制,例如无法显示使用 SIMD或Prefetch之类,并且由于机器的存在,无法自主精细化控制内存。

640?wx_fmt=png

3.NativeCode更容易跟新硬件进行交互。基于这个原因,我们决定使用 Native Runtime替换Java Runtime。同时我们不想对现有的Spark做太多的改动,所以最终我们选择了Codegen技术路线,结合起来就是Native Codegen。

接下来介绍我们做Native codegen解决的核心问题,集中在三个方面,我们要生成什么代码,怎么生成这些代码,以及怎么样跟Spark做集成。

640?wx_fmt=png

第一个问题,生成什么?

如今的NativeCode有很多,C/C++。Go Rest,LLVM等。基于我们自己的技能点,其实可以选择的就只有C/C++, C++实现起来相对直观,只需要对照原来生成的Java代码,替换成C++即可。但C++最大的问题是它在编译时间过长,根据HyPer的论文,C++的编译时间比LLVM高出了一个数量级。LLVM的编译时间很短,而且执行的效率跟C++相当,看上去是一个很不错的选择。

640?wx_fmt=png

其实很多Native Codegen这样的系统都选择了LLVM,包括HyPer,Impala以及阿里云自研的MaxCompute,ADB等,但LLVM对我们来说还是过于复杂,它的语法接近汇编,是想用汇编重写SQL算法的工作量会有多大,其实大多数引擎也不会用
LLVM写全量的代码,比如HyPer,解码算子的核心逻辑,用LLVM生成其他通用的功能,包括spill复杂数据结构的管理等,实际上是用C++提前编写好并进行编译。即便如此,LLVM对我们来说依然过于复杂,在广泛调研之后,另外一种可能性出现了 Weld。

先介绍一下Weld

这个是Spark的作者matei的学生的作品,他提供了包括Language+Compiler+Runner的工具链,最终会转化成LLVM,然后用LLVM的工具链编译执行, Weld最初想解决的问题是不同lib之间相互调用时数据传输的开销,例如要在pandas里调用numpy的接口,首先pandas把数据写入内存,然后numpy读取内存进行计算。

640?wx_fmt=png

对于极度优化的Library来说,内存的写入和读取的时间可能会远超计算本身。针对这个问题,Weld开发了Common Runtime,并配套提供了一组IR,再加上惰性求值的特性,只需要简单修改Library,使其符合Weld的语法规范,便可以做到不同Library共用Weld的Runtime,再利用惰性求值实现快Library的Pipeline,从而省去数据物化的开销。Weld Runtime还做了若干优化,如循环融合循环展开,向量化自适应执行等。

此外Weld支持调用C代码,可以方便的调用三方库。我们感兴趣的是Weld提供的IR和对应的Runtime。

Weld IR语法是针对关系代数进行设计的,非常适合表达SQL语句。数据结构层面,Weld IR最核心的数据结构是vec和struct,对应C语言里的数组和struct,能较好的表达Spark SQL的 Row Batch基于struct和vec,可以构造字典数据结构,能够比较好的表达SQL里面重度使用Hash结构,操作层面,Weld IR提供了类函数式语言的语义,如Map,Filter,Iterator等配合Builder语义,能方便地表达Project、Filter、Agg、Broadcast join等算子语义,例如 select加Filter的例子,用Weld IR的表达如下,第一行是函数签名,表示入参是一个数组,数组的元素是一个struct,strut包含两个int32的成员。

接下来就是一个大的 for表达式,跟常见的语法不同,for表达式包含三个参数

1.需要遍历的数组;

2.Build,用来生成最终的结果。Build类型也决定了最终生成的结果的。用什么数据结构来存储。

3.lambda,用来定义针对每个元素的操作,在这个例子里面,第一个参数就是这个函数的入参v第二个参数是append,表示最终构造的结果,存在一个数组里面。第三个,lambda参数是一个if表达式, if的语义跟我们常见的也不太相同,它实际上是把 if的true和false的两个分支都作为参数表达,其中第一个参数是condition,第二个参数是当condition为true的时候,所执行的逻辑。

第三个参数肯定是认为false的时候执行的逻辑,在这个里面可以看到当第二个成员它是从0开始计数,当第二个成员大于10的时候,会把第一个成员 merge到 appender里面。否则的话就什么都不做,直接返回原来的build。Weld的IR。通过 weld_module_compile和weld_module_run,两个接口,分别做编译和执行。由于Weld同时兼顾了语法简洁,编译时间短的特性,因此我们选择Weld作为生成的目标。

第二个问题就是怎么生成?

我们复用了Spark Codegen框架。我们知道 Spark Codegen包含Expression和Stage两个级别,在Expression级别,我们对照原来的doGenCode()的接口,增加了doGenNativeCode(),里面拼出来的是Weld的语法,例如之前可能Java的代码里面就直接是两个变量的相加,然后改造了以后就成了一个struct的两个成员的相加。在WholeStage级别,我们复用了producer/ consumer的框架,熟悉Spark源码的同学应该了解到,在producer/consumer框架下,每个算子都提供了produce和consume接口,produce的职责是生成为下游提供数据的代码,consumer的职责是生成消费上游数据的代码,Spark 中并非所有的算子都支持Codegen,例如outjoin就不支持支持Codegen的算子,继承了CodeGenSupport的接口,我们对整个producer/consumer的框架并没有改动,在他们旁边又新增加了一系列的接口,包括 NativeCodeGenSupport/doProduceNative/doConsumeNative。

640?wx_fmt=png

以一个具体的例子加以说明,还是一个相比较简单的select加Filter加Project的例子, query包含三个算子,Scan、Filter、Project。

640?wx_fmt=png

然后 query他的代码生成的过程是右上角的这张图。首先 project就是最下游的算子,它的produce方法会返回最终生成的代码的字符串。然后它这个是怎么生成的呢?Project 的doProduce。直接调用了 Filter的doProduce方法,然后Filter的doProduce方法直接调用了Scan的doProduce,然后Scan的 doProduce会生成一个框架代码,在框架代码的内部会调用Scan的 doConsume。Scan的doConsume。直接调用Filter的doConsume。Filter的doConsume会生成Filter的逻辑,并在内部调用Project的doConsume,Project的 doConsume。会把最终的数据输出 append的到 output中。

我们看下面这三张图,Scan的 doProduce会生成for循环的一个架子。然后在for循环的每个迭代里面调用 Filter的doConsume方法, Filter的doConsume会生成一个if的表达式的框架,然后在判断为true,也就是if的内部的话,调用的是project的 doConsume。最后project的doConsume拼成一段append 的方法把column1 append到 output里面到此为止。一个完整的Java的Codegen过程就结束了,然后我们就拿得到了直接可以编译的Java代码,当然这个是简化的过程。

640?wx_fmt=png

对于Native Codegen的话,我们是复用了这个逻辑,只是把生成的Java代码替换成了Weld的IR,如底下三张图所示,具体的Weld,语法我就不详细展开了。

640?wx_fmt=png

感兴趣的同学可以到Weld官网上看语法定义,代码生成还有一个问题就是Fallback机制,由于人力有限,我们无法覆盖所有的算子,因此需要实现Fallback机制。这里需要做的决定是应该做算子级别的Fallback,还是Stage级别的Fallback。直观上算子粒力度的Fallback好像更加合理,实际上却会导致更严重的问题。它会导致Stage内部Pipeline的断裂。前面讲了Codegen的一个优势是整个过程不存在物化,而算子力度的Fallback则会导致Stage内部一部分算是走Native Runtime,另一部分走Java Runtime,两者的连接数无可避免存在数据物化,开销通常会大于Native Runtime带来的收益。

640?wx_fmt=png基于这个原因,我们选择Stage级别的Fallback,一旦有任何算子不支持Native Codegen,在整个Stage都Fallback到Java Codegen,代码也已经生成了。

最后的问题,如何跟Spark集成 。

task的执行可以理解为一个黑盒,它的输入是Row Batch或者Row Iterator我们知道在Scan Stage Spark用了向量化读的优化,读出来的是列式存储的 column batch,每一列本质上都用一个数组进行存储,而在Shuffle Stage,Shuffle Fetch回来的数据结构是行式存储的 unsaferowbatch。每个Stage的输出会封装成会封装成Row Iterator。

640?wx_fmt=png

我们前面讲到既然选择了Stage级别的Fallback,意味着黑盒要么是Java Runtime要么是Native Runtime,不存在混合的情况。因此我们关心的如何把输入转化为Weld认识的内存布局,以及如何把Weld的输出包装成Row Iterator。针对列存数据,打开offheap开关,数据天生就是指针数组,Weld可以直接操作。对于行存数据,主要问题是变长数据难以映射到 Weld的 struct右上方的图展示了Spark Row Batch的内存布局,首先是固定长度的,null bitmap,然后是固定长度的列数据,最后是变长数据,由于变长数据的存在,无法直接把一条record映射成 strut。

我们的做法是把定长部分和变长部分分别拷贝出来,并有offset和length来标志变量部分的位置和长度。这样一来record就能映射到strut结构了,而整个Row Batch就映射成了一个 vec strut。例如这个例子,每个record包括两个long和一个String,null bit用一个long表示,紧接着是两个long表示两个列的数据。第三个long保存变长数据的 offset和length,最后是变成部分,我们把变成拷贝出去之后,根据原先的offset和length,计算新的offset和length,最终我们用1个包含5个long的strut表示 record,分别是 null bitmap原先的两个long offset和length。这样一来我们就完成了统一内存布局,并且当且仅当有变长数据存在的时候才需要拷贝,否则的话是不需要拷贝的。Weld输出转换成Row Batch是刚才所说的过逆向过程,这里就不再赘述了,完成了数据转换,最后是Spark的执行流程。首先我们尝试走Native Codegen,若有异常发生,则切换到Java Codegen。若没有异常,则执行StageInit做初始化工作,包括初始化Weld,加载编译好的Weld module,拉取Broadcast数据等。

接着是一个循环,每个循环会读取一个Row Batch,给Native Runtime来执行,输出结果包装成Row Iterator,给Shuffle Write。以上就是EMR团队在Native Runtime上做的探索。总结下来,我们采用Weld的IR作为代码生成的目标语言,复用了Spark Codegen框架,进行代码生成,采用了Stage级别的Fallback机制,并通过统一内存布局跟Spark做了集成。

640?wx_fmt=png

由于时间有限,一些工作没有包含在今天的分享中,例如 Weld的不好表达的算子,如SortMergejoin、Partitionby ,我们其实也用了Native的技术进行了优化。再例如 Weld本身的字典的实现效率比较低,我们也对此进行了比较大的优化。除了Native Runtime,EMR团队在Spark很多技术点都做了工作,欢迎大家交流沟通。

640?wx_fmt=png640?wx_fmt=png

相关阅读推荐:

EMR Spark-SQL性能极致优化揭秘 Native Codegen Framework

E- MapReduce 入门训练营火热报名中,点击文末阅读原文抢入营名额!

报名链接:

https://developer.aliyun.com/learning/trainingcamp/emr/1

640?wx_fmt=jpeg


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK