18

spark是怎么从RDD升级到DataFrame的?

 4 years ago
source link: http://www.cnblogs.com/techflow/p/12865965.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

今天是 spark专题的第五篇 ,我们来看看DataFrame。

用过Python做过机器学习的同学对Python当中pandas当中的DataFrame应该不陌生,如果没做过也没有关系,我们简单来介绍一下。DataFrame翻译过来的意思是数据帧,但其实它指的是一种特殊的数据结构,使得数据 以类似关系型数据库当中的表 一样存储。使用DataFrame我们可以非常方便地对整张表进行一些类似SQL的一些复杂的处理。Apache Spark在升级到了1.3版本之后,也提供了类似功能的DataFrame,也就是大名鼎鼎的SparkSQL。

关于SparkSQL的前世今生其实是有 一大段历史 的,这一段历史除了可以充当吹牛的谈资之外,还可以帮助我们理清楚许多技术之间的内在关联。

从优化到重构的血泪史

在程序开发这个行当, 优化和重构 注定是两个无法摆脱的问题。

当一个项目启动的时候,由于投入有限,可能招不到特别匹配的人才,或者是为了快速满足业务的需要。往往会采取一些 不是特别合理 的设计来构建项目,这个应该很好理解,为了图快牺牲一些性能或者是拓展性。而且有时候由于视野和能力的限制,早期的开发者可能也是无法意识到设计中的不合理性的。但是俗话说得好,出来混早晚是要还的。前面挖了坑,后来早晚也会暴露出来。问题就在于暴露了之后我们怎么处理。

一般来说,无论是作为公司也好,还是作为开发者个人也罢。想的肯定都是怎么样 以最小的代价解决问题 ,也就是尽量优化,能不动核心代码就不动。除了因为核心代码太久没有维护或者是文档缺失之外,也涉及到成本问题。现在的项目日进斗金,每天都在运行,一旦要下决心把核心代码翻新一遍,那么会付出巨大的代价,可能整个项目组要暂停一段时间。而且在上层管理层眼中,往往也是看不到重构的必要性的。因为 上层都是以业务为导向的 ,技术做得好不好不重要,能赚钱才是王道。

但问题是优化并不是无止境的,很多时候核心设计的不合理才是大头,边边角角的修补只能聊胜于无。这个时候考验的往往都是技术负责人的担当了,是当个糊裱匠混一年是一年,还是壮士断腕,敢叫日月换新天。一般来说 糊裱起到的效果都是有限的 ,总会有撑不下去要重构的那天。

SparkSQL早期的发展就非常好的印证了这点,SparkSQL诞生之初就是当做一个优化项目诞生的。目的是为了优化Hive中在spark的效率。

这里的Hive可能很多人不太熟悉,它是Hadoop家族结构化查询的工具。将hadoop集群中的数据以表结构的形式存储,让程序员可以以类SQL语句来查询数据。看起来和数据库有些近似,但原理不太一样。Hive 底层是以MapReduce驱动的 ,也就是说会把我们写好的SQL转化成MapReduce执行。由于Hive易用性很好,使用的人很多,所以spark当中也支持Hive。

但其实那个时候spark兴起,MapReduce时代已经逐渐走到了末期。那时的spark是基于前面介绍的RDD的结构处理数据的,性能比MapReduce好得多。但如果在spark上依然使用MapReduce的形式支持Hive,那么就不能体现出spark计算性能的优越性。所以对于Hive on Spark的优化势在必行。我个人觉得这有点抢市场的调调。

最好的办法是对spark彻底重构,重建出一套支持结构化数据查询的计算框架。但估计那时候主负责人没能狠下心,或者是为了赶时间。所以只是对Hive进行了一些优化,大概就是把一些使用MapReduce的计算想办法尽量改成使用RDD,从而提升整体的效率。这样做当然是能够有提升的,但是核心的框架仍然是Hive的那一套机制,这样的 提升是有限的 。大概过了三年左右的时间,基本上所有能压榨出来的性能都被压榨完了,开发组经过激烈的思想斗争之后,终于接受现实,彻底抛弃原本的框架,构建出一套新的架构来。

这套新开发出的架构就是SparkSQL,也就是DataFrame。

SparkSQL的架构

我们来简单看下SparkSQL的架构,大概知道内部是怎么运行的。

整个SparkSQL的模型大概分为三层,最上面是编程模型层,中间是执行优化层,最后是任务执行引擎。

mYbmm2E.jpg!web

这些都是术语,我们简单介绍一下,编程模型层主要有两块一块是SparkSQL一种是DataFrame,这两者只是语法不一样,底层执行的逻辑是一样的。主要做的是对我们写的一些语法进行解析以及一些基本的处理。执行计划层是将SQL语句转化成具体需要执行的逻辑执行计划,根据一些策略进行优化之后输出物理执行策略。最后一层是执行层,负责将物理计划转化成RDD或者是DAG进行执行。

我们观察一下这个架构,可能还有很多细节不是很清楚,但是至少整个执行的过程已经很明白了。进一步可以发现,整个架构当中已经 完全没有MapReduce 的影子了,底层的执行单元就是RDD。也就是说SparkSQL其实是进一步更高层次的封装。

RDD和DataFrame

我们来简单看下DataFrame和RDD的差别,最大最直观的差别就是DataFrame多了schema的概念。也就是多了数据格式的概念,我们拿到DataFrame可以很轻松地获取它其中数据的结构信息。

我们看下下图做个对比,同样一份数据在RDD和DataFrame的样子:

3muayef.jpg!web

不要小瞧这个schema,有了它之后,我们就 可以做一些结构化数据才支持的操作 了。比如groupby、where、sum等等。这些结构化数据操作的灵活度要比RDD的map、filter等操作大得多。

另外一个好处就是效率,如果我们自己写RDD来操作数据的话,那么Python是一定干不过scala和java的。因为spark底层是依托Java实现的,spark的所有计算都执行在JVM当中。scala和java都是直接在JVM当中直接运行的语言,而Python不行,所以之前我们使用Python调用RDD处理spark的速度也会慢很多。因为我们 需要经过多层中转 ,我们可以看下下面这张图。

eIbUFvr.jpg!web

当我们执行pyspark当中的RDD时,spark context会通过Py4j启动一个使用JavaSparkContext的JVM,所有的RDD的转化操作都会被映射成Java中的PythonRDD对象。当我们的任务被传输到Workder进行执行的时候,PythonRDD会启动Python的子进程来传输代码和执行的结果。

上面这段话说起来有点绕,简单理解就是当pyspark调用RDD的时候,Python会转化成Java调用spark集群分发任务。每一个任务具体在机器上执行的时候,还是以Python程序的方式执行。执行结束之后,还是通过Python拿回数据给spark中的JVM。JVM执行结束之后,再把结果包装成Python的类型返回给调用端。

本来Python的执行效率就低,加上中间又经过了若干次转换以及通信开销(占大头),这就导致了pyspark中的RDD操作效率更低。

而现在有了Catalyst优化器之后,会自动帮助我们进行底层的计算优化。并且即使是非原生的Python语言,也可以使用它,因此会带来性能的极大提升。甚至经过官方的测量,使用pyspark写DataFrame的效率已经和scala和java平起平坐了。

bUzeYn3.jpg!web

所以如果我们要选择Python作为操作spark的语言,DataFrame一定是首选。不过Catalyst优化器也有短板,它无法解决跨语言本身带来的问题。比如我们使用Python写一些udf(user defined function),还是会带来性能的损耗。这个时候的整体效率还是会比scala低一些。

写了这么多废话,下面就让我们实际一点,看看究竟pyspark当中的DataFrame要如何使用吧。

创建DataFrame

和RDD一样,DataFrame的创建方法有很多,我们可以基于内存当中的数据进行创建,也可以从本地文件或者是HDFS等其他云存储系统当中进行读取。但怎么读取不重要,使用方法才是关键,为了方便演示,我们先来看看如何从内存当中创建DataFrame。

前文当中曾经说过,DataFrame当中的数据以表结构的形式存储。也就是说我们读入的一般都是结构化的数据,我们经常使用的结构化的存储结构就是json,所以我们先来看看如何 从json字符串当中创建DataFrame

首先,我们创建一个json类型的RDD。

jsonstr = sc.parallelize(("""
{'name': 'xiaoming', 'age': 13, 'score': 100}""",
"""{'name': 'xiaohong', 'age': 15, 'score': 98}"""
))

接着,我们用 spark.read.json 将它转化成一个DataFrame。需要注意的是,如果数据量很大,这个执行会需要一点时间,但是它仍然是一个转化操作。数据其实并没有真正被我们读入,我们读入的只是它的schema而已,只有当我们执行执行操作的时候,数据才会真正读入处理。

studentDf = spark.read.json(jsonstr)

执行完这一句之后,RDD转DataFrame的工作就完成了。严格说起来这是读取操作,并不是真正的转化操作。RDD转DataFrame稍微复杂一些,我们晚点再说。

如果我们想要查看DataFrame当中的内容,我们可以执行show方法,这是一个行动操作。和pandas中的head类似,执行之后,会展示出DataFrame当中前20条数据。我们也可以传入参数,指定我们要求展示的数据条数。

我们来运行一下,看看展示出来的结果:

aaE7nqM.jpg!web

我们也collect一下原本的RDD作为一下对比:

ZNFvm2q.jpg!web

这下一对比我们就发现了,json格式的字符串果然可以被解析,并且RDD被转化成了表格格式的DataFrame。

查询

我们再来看下DataFrame的简单查询功能,其实Dataframe当中的查询功能很多。我们今天先来看其中用得比较多的两种。

先来看第一种,第一种是 通过select接口 查询数据。这里的select其实对应的是SQL语句当中的select,含义也基本相同,不同的是我们是通过函数进行调用的而已。

我们可以在select当中传入我们想要查找的列名。

aaIn6vy.jpg!web

我们可以加上where或者filter函数进行条件判断, where和filter函数是一个意思 ,两者的用法也完全一样。官方提供了两个名字,为了不同习惯的人使用方便而已。我们把下图当中的函数换成filter结果也是一样的。

JBva2ib.jpg!web

另外一种操作方式稍稍复杂一些,则是将DataFrame 注册成pyspark中的一张视图 。这里的视图和数据库中的视图基本上是一个概念,spark当中支持两种不同的视图。第一种是临时视图,第二种是全局视图。两者的用法基本一致,不同的是作用范围。临时视图的作用范围是当前的session,如果当前的session关闭,或者是另外开启了新的session,这个视图就会作废。而全局视图则是跨session的,所有session都可以使用。

如果搞不清楚session的概念也没有关系,在之后的文章当中我们还会遇到的。我们先有这么个印象即可。

我们调用createOrReplaceTempView方法创建一个临时视图,有了视图之后,我们就可以通过 SQL语句 来查询数据了。

studentDf.createOrReplaceTempView("student")

我们通过spark.sql传入一段SQL string即可完成数据的调用,需要注意的是,DataFrame也支持RDD的collect或者take等方法。如果这里的结果我们调用的是collect,那么spark会将所有数据都返回。如果数据集很大的情况下可能会出现问题,所以要 注意show和collect的使用范围和区别 ,在一些场景下搞错了会很危险。

VJJjMjA.jpg!web

结尾

今天这篇文章我们一起来看了pyspark当中目前为止最常用的数据处理工具——DataFrame,还简单了解了一下它和RDD相比的性能优势以及它简单的查询语法的使用方法。

从上面的方法我们也看得出来,相比之前RDD中介绍的那些方法,DataFrame中封装的API提供了更多高级的功能,比写RDD处理数据也要方便很多。再加上性能原因,我们在处理数据时必然首选使用DataFrame。相信大家通过本文对于DataFrame也应该有了一个最初的印象,后续还会有更多文章详细地介绍DataFrame的使用以及内部机制的一些细节,敬请期待吧。

今天的文章就到这里,原创不易, 扫码关注我 ,获取更多精彩文章。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK