30

大数据分析工程师入门(九):Spark SQL

 4 years ago
source link: https://www.tuicool.com/articles/yU3myq7
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

E7jQJrf.jpg!web

本文为《大数据分析师入门课程》系列的第9篇,在本系列的第8篇-Spark基础中,已经对Spark做了一个入门介绍,在此基础上本篇拎出Spark SQL,主要站在使用者的角度来进行讲解,需要注意的是本文中的例子的代码均使用Scala语言。

主要包括以下内容:

  • 你该了解的Spark SQL

  • 简单入门操作

  • 不得不说的数据源

你该了解的 Spark SQL

1.什么是Spark SQL?

Spark SQL是Spark专门用来处理结构化数据的模块,是Spark的核心组件,在1.0时发布。

SparkSQL替代的是HIVE的查询引擎,HIVE的默认引擎查询效率低是由于其基于MapReduce实现SQL查询,而MapReduce的shuffle是基于磁盘的。

2.Spark SQL特性

其实最初Spark团队推出的是Shark-基于Hive对内存管理、物理计划、执行做了优化,底层使用Spark基于内存的计算引擎,对比Hive性能提升一个数量级。

即便如此高的性能提升,但是由于Shark底层依赖Hive的语法解析器、查询优化器等组件制约其性能的进一步提升。最终Spark团队放弃了Shark,推出了Spark SQL项目,其具备以下特性:

  • 标准的数据连接,支持多种数据源

  • 多种性能优化技术

  • 组件的可扩展性

  • 支持多语言开发:Scala、Java、Python、R

  • 兼容Hive

3.Spark SQL可以做什么?

  • 大数据处理

使用SQL进行大数据处理,使传统的RDBMS人员也可以进行大数据处理,不需要掌握像mapreduce的编程方法。

  • 使用高级API进行开发

SparkSQL支持SQL API,DataFrame和Dataset API多种API,使用这些高级API进行编程和采用Sparkcore的RDD API 进行编程有很大的不同。

使用RDD进行编程时,开发人员在采用不同的编程语言和不同的方式开发应用程序时,其应用程序的性能千差万别,但如果使用DataFrame和Dataset进行开发时,资深开发人员和初级开发人员开发的程序性能差异很小,这是因为SparkSQL 内部使用Catalyst optimizer 对执行计划做了很好的优化。

简 单 入 门 操 作

1.构建入口

Spark SQL中所有功能的入口点是SparkSession类-Spark 2.0引入的新概念,它为用户提供统一的切入点。

早期Spark的切入点是SparkContext,通过它来创建和操作数据集,对于不同的API需要不同的context。

比如:使用sql-需要sqlContext,使用hive-需要hiveContext,使用streaming-需要StreamingContext。SparkSession封装了SparkContext和SQLContext。

要创建一个 SparkSession使用SparkSession.builder():

import org.apache.spark.sql.SparkSession

val spark = SparkSession

.builder()

.appName("Spark SQL basic example")

.config("spark.some.config.option", "some-value")

.getOrCreate()

2.创建DataFrame

在一个SparkSession中,应用程序可以从结构化的数据文件、Hive的table、外部数据库和RDD中创建一个DataFrame。

举个例子, 下面就是基于一个JSON文件创建一个DataFrame:

val df =spark.read.json("examples/src/main/resources/people.json")

// 显示出DataFrame的内容

df.show()

// +----+-------+

// | age| name|

// +----+-------+

// |null|Michael|

// | 30| Andy|

// | 19| Justin|

// +----+-------+

3.执行SQL查询

// 将DataFrame注册成一个临时视图

df.createOrReplaceTempView("people")

val sqlDF = spark.sql("SELECT * FROM people")

sqlDF.show()

// +----+-------+

// | age| name|

// +----+-------+

// |null|Michael|

// | 30| Andy|

// | 19|Justin|

// +----+-------+

SparkSession的SQL函数可以让应用程序以编程的方式运行SQL查询,并将结果作为一个 DataFrame返回。

例子中createOrReplaceTempView创建的临时视图是session级别的,也就是会随着session的消失而消失。如果你想让一个临时视图在所有session中相互传递并且可用,直到Spark 应用退出,你可以建立一个全局的临时视图,全局的临时视图存在于系统数据库global_temp中,我们必须加上库名去引用它。

// 将一个DataFrame注册成一个全局临时视图

df.createGlobalTempView("people")

// 注意这里的global_temp

spark.sql("SELECT * FROM global_temp.people").show()

// +----+-------+

// | age| name|

// +----+-------+

// |null|Michael|

// | 30| Andy|

// | 19|Justin|

// +----+-------+

// 新的session同样可以访问

spark.newSession().sql("SELECT * FROM global_temp.people").show()

// +----+-------+

// | age| name|

// +----+-------+

// |null|Michael|

// | 30| Andy|

// | 19|Justin|

// +----+-------+

4.DataFrame操作示例

import spark.implicits._ //导入隐式转换的包

//打印schema

df.printSchema()

// root

// |-- age: long (nullable = true)

// |-- name: string (nullable = true)

//选择一列进行打印

df.select("name").show()

// +-------+

// | name|

// +-------+

// |Michael|

// | Andy|

// | Justin|

// +-------+

//年龄加1

df.select($"name", $"age" +1).show()

// +-------+---------+

// | name|(age + 1)|

// +-------+---------+

// |Michael| null|

// | Andy| 31|

// | Justin| 20|

// +-------+---------+

//选取年龄大于21的

df.filter($"age" > 21).show()

// +---+----+

// |age|name|

// +---+----+

// | 30|Andy|

// +---+----+

//聚合操作

df.groupBy("age").count().show()

// +----+-----+

// | age|count|

// +----+-----+

// | 19| 1|

// |null| 1|

// | 30| 1|

// +----+-----+

5.创建DataSet

Dataset和RDD比较类似,与RDD不同的是实现序列化和反序列化的方式,RDD是使用Java serialization或者Kryo,而Dataset是使用Encoder。

Encoder的动态特性使得Spark可以在执行filtering、sorting和hashing等许多操作时无需把字节反序列化为对象。

// 一个简单的Seq转成DataSet,会有默认的schema

val primitiveDS = Seq(1, 2, 3).toDS().show

// +-----+

// |value|

// +-----+

// | 1|

// | 2|

// | 3|

// +-----+

case class Person(name: String, age: Long)

// 通过反射转换为DataSet

val caseClassDS = Seq(Person("Andy",32)).toDS()

caseClassDS.show()

// +----+---+

// |name|age|

// +----+---+

// |Andy| 32|

// +----+---+

// DataFrame指定一个类则为DataSet

val path = "examples/src/main/resources/people.json"

val peopleDS = spark.read.json(path).as[Person]

peopleDS.show()

// +----+-------+

// | age| name|

// +----+-------+

// |null|Michael|

// | 30| Andy|

// | 19| Justin|

// +----+-------+

通过上述的代码可以看出创建DataSet的代码很简单,一个toDs就可以自动推断出schema的类型,读取json这种结构化的数据得到的是一个DataFrame,再指定它的类则为DataSet。

6.RDD的互操作性

RDD的互操作性指的是RDD和DataFrame的相互转换,DataFrame转RDD很简单,复杂的是RDD转DataFrame。

目前Spark SQL有两种方法:

  • 反射推断

Spark SQL 的 Scala 接口支持自动转换一个包含 Case Class的 RDD 为DataFrame。Case Class 定义了表的Schema。Case class 的参数名使用反射读取并且成为了列名。Case class 也可以是嵌套的或者包含像 Seq 或者 Array 这样的复杂类型,这个 RDD 能够被隐式转换成一个 DataFrame 然后被注册为一个表。

// 开启隐式转换

import spark.implicits._

// 读入文本文件并最终转化成DataFrame

val peopleDF = spark.sparkContext

.textFile("examples/src/main/resources/people.txt")

.map(_.split(","))

.map(attributes => Person(attributes(0), attributes(1).trim.toInt))

.toDF()

// 将DataFrame注册成表

peopleDF.createOrReplaceTempView("people")

// 执行一条sql查询

val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")

// 通过map操作后得到的是RDD

teenagersDF.map(teenager => "Name: " +teenager(0)).show()

// +------------+

// | value|

// +------------+

// |Name: Justin|

// +------------+

另一种更加简单的操作是将RDD中每一行类型变为tuple类型,然后使用toDF依次赋予字段名,需要注意的是使用tuple最高可以支持22个字段。

// 开启隐式转换

import spark.implicits._

// 读入文本文件并最终转化成DataFrame

val peopleDF = spark.sparkContext

.textFile("examples/src/main/resources/people.txt")

.map(_.split(","))

.map(attributes => (attributes(0), attributes(1).trim.toInt))

.toDF("name","age")

//peopleDF: org.apache.spark.sql.DataFrame = [name:string, age: int]

  • 构造Schema

在无法提前定义schema的情况下,RDD转DataFrame或者DataSet需要构造Schema。

构建一个Schema并将它应用到一个已存在的RDD编程接口需要以下四个步骤:

a.从原始的RDD创建一个tuple或者列表类型的RDD

b.创建一个StructType来匹配RDD中的结构

c.将生成的RDD转换成Row类型的RDD

d.通过createDataFrame方法将Schema应用到RDD

//需要导入类型相关的包

import org.apache.spark.sql.types._

//读取hdfs上的文本文件,保存到rdd中

val peopleRDD =spark.sparkContext.textFile("examples/src/main/resources/people.txt")

// 这里的schema是一个字符串,可以来自于其他未知内容的文件,需要注意的是-这里明确写出来只是为了演示,并不代表提前知道schema信息。

val schemaString = "name age"

// 将有schema信息的字符串转变为StructField类型

val fields = schemaString.split(" ")

.map(fieldName => StructField(fieldName, StringType, nullable =true))

//通过StructType方法读入schema

val schema = StructType(fields)

// 将RDD转换成Row类型的RDD

val rowRDD = peopleRDD

.map(_.split(","))

.map(attributes => Row(attributes(0), attributes(1).trim))

// 应用schema信息到Row类型的RDD

val peopleDF = spark.createDataFrame(rowRDD,schema)

不得不说的数据源

在工作中使用Spark SQL进行处理数据的第一步就是读取数据,Spark SQL通过统一的接口去读取和写入数据。主要是read和write操作,不同的数据源相应的Option(附加设置)会有所不同,下面通过例子来具体说明。

1.数据读取

  • parquet

1)读取Parquet文件

parquet文件自带schema,读取后是DataFrame格式。

val usersDF =spark.read.load("examples/src/main/resources/users.parquet")

//usersDF: org.apache.spark.sql.DataFrame = [name:string, favorite_color: string ... 1 more field]

2)解析分区信息

parquet文件中如果带有分区信息,那么SparkSQL会自动解析分区信息。比如,这样一份人口数据按照gender和country进行分区存储,目录结构如下:

test

└── spark-sql

└── test

├──gender=male

│ │

│ ├── country=US

│ │ └── data.parquet

│ ├── country=CN

│ │ └── data.parquet

│ └── ...

└──gender=female

├── country=US

│ └── data.parquet

├── country=CN

│ └── data.parquet

└── ...

通过spark.read.load读取该目录下的文件SparkSQL将自动解析分区信息,返回的DataFrame的Schema如下:

root

|-- name: string (nullable = true)

|-- age: long (nullable = true)

|-- gender: string (nullable = true)

|-- country: string (nullable = true)

目前自动解析分区支持数值类型和字符串类型。

自动解析分区类型的参数为:spark.sql.sources.partitionColumnTypeInference.enabled,默认值为true。可以关闭该功能,直接将该参数设置为disabled。此时,分区列数据格式将被默认设置为string类型,不会再进行类型解析。

3)Schema合并

如果读取的多个parquet文件中的Schema信息不一致,Spark SQL可以设置参数进行合并,但是Schema合并是一个高消耗的操作,在大多数情况下并不需要,所以Spark SQL从1.5.0开始默认关闭了该功能。

可以通过下面两种方式开启该功能:

a.读取文件的时候,开启合并功能,只对本次读取文件进行合并Schema操作

b.设置全局SQL选项spark.sql.parquet.mergeSchema为true,每次读取文件都会进行合并Schema操作

具体请看下面的例子:

// sqlContext是之前例子中生成的

// 导入隐式转换

import sqlContext.implicits._

// 创建一个简单的DataFrame并保存

val df1 = sc.makeRDD(1 to 5).map(i => (i, i *2)).toDF("single", "double")

df1.write.parquet("data/test_table/key=1")

// 创建另一个DataFrame,注意字段名

val df2 = sc.makeRDD(6 to 10).map(i => (i, i *3)).toDF("single", "triple")

df2.write.parquet("data/test_table/key=2")

// 读取这两个parquet文件,增加开启合并Schema的设置

val df3 =sqlContext.read.option("mergeSchema","true").parquet("data/test_table")

df3.printSchema()

// 不同名称的字段都保留下来了

// root

// |-- single: int (nullable = true)

// |-- double: int (nullable = true)

// |-- triple: int (nullable = true)

// |-- key : int (nullable = true)

关于schema合并,有一点需要特别关注,那就是当不同parquet文件的schema有冲突时,合并会失败,如同名的字段,其类型不一致的情况。这时如果你读取的是hive数据源,可能会出现读取失败或者读取字段值全部为NULL的情况。如果大家遇到类型场景,可以考虑是否是这个因素导致。

  • json

json文件和parquet文件一样也是带有schema信息,不过需要指明是json文件,才能准确的读取。

val peopleDF =spark.read.format("json").load("examples/src/main/resources/people.json")

//peopleDF: org.apache.spark.sql.DataFrame = [age:bigint, name: string]

  • MySQL

读取MySQL中的数据是通过jdbc的方式,需要知道要访问的MySQL数据库、表等信息,具体请看下面的代码:

//MySQL数据的访问ip、端口号和数据库名

val url ="jdbc:mysql://192.168.100.101:3306/testdb"

//要访问的表名

val table = "test"

//建立一个配置变量

val properties = new Properties()

//将用户名存入配置变量

properties.setProperty("user","root")

//将密码存入配置变量

properties.setProperty("password","root")

//需要传入Mysql的URL、表名、配置变量

val df = sqlContext.read.jdbc(url,table,properties)

这里要注意的一个点是,读取MySQL需要运行作业时,classpath下有MySQL的驱动jar,或者通过--jars添加驱动jar。

  • hive

读取hive数据的前提是要进行相关的配置,需要将hive-site.xml、core-site.xml、hdfs-site.xml以及hive的lib依赖放入spark的classpath下,或者在提交作业时通过--files和--jars来指定这些配置文件和jar包。之后,就可以很方便的使用hive的数据表了,示例代码如下:

import java.io.File

import org.apache.spark.sql.Row

import org.apache.spark.sql.SparkSession

case class Record(key: Int, value: String)

// 数仓地址指向默认设置

val warehouseLocation = newFile("spark-warehouse").getAbsolutePath

val spark = SparkSession

.builder()

.appName("Spark Hive Example")

.config("spark.sql.warehouse.dir", warehouseLocation)

.enableHiveSupport() //增加支持hive特性

.getOrCreate()

import spark.implicits._

import spark.sql

//使用sql创建一个表,并将hdfs中的文件导入到表中

sql("CREATE TABLE IF NOT EXISTS src (key INT,value STRING) USING hive")

sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// 使用sql直接指向sql查询

sql("SELECT * FROM src").show()

// +---+-------+

// |key| value|

// +---+-------+

// |238|val_238|

// | 86| val_86|

// |311|val_311|

// ...

2.数据保存

  • write

保存用write方法,先看一个简单的例子,将一个DataFrame保存到parquet文件中。

//选取DataFrame中的两列保存到parquet文件中

usersDF.select("name","favorite_color").write.save("namesAndFavColors.parquet")

  • format

format可以指定保存文件的格式,支持json、csv、orc等

//选取DataFrame中的两列保存到json文件中

usersDF.select("name", "favorite_color").write.format("json").save("namesAndFavColors.json")

  • mode

在保存数据的时候,要不要考虑数据存不存在?是覆盖还是追加呢?通过mode可以进行设置。

//选取DataFrame中的两列保并追加到parquet文件中

usersDF.select("name","favorite_color").write.mode(SaveMode.append)

.save("namesAndFavColors.parquet")

除了append还有下列选项:

选项

含义

ErrorIfExists

如果数据已经存在,则会抛出异常

Append

如果数据已经存在,则会追加

Overwrite

每次都会覆盖

ignore

如果数据已经存在,则不做任何操作

总结

本文通过什么是Spark SQL,有哪些特性,可以做什么让读者对Spark SQL有个整体的了解,然后着重讲解了如何进行入门操作和多种数据源操作。

掌握了以上技能,大数据分析工程师面对Spark SQL相关的工作一定可以游刃有余。

参考文献:

[1] Apache Spark官网文档-http://spark.apache.org/docs/latest/sql-getting-started.html

[2] Spark SQL之前世今生,作者:ZFH__ZJ- https://www.cnblogs.com/linn/p/5325147.html

[3] Spark学习之路 (十八)SparkSQL简单使用,作者:扎心了,老铁 - https://www.cnblogs.com/qingyunzong/p/8987579.html

Mbm2qeq.jpg!web


Recommend

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK