3

Spark SQL & Spark streaming

 2 years ago
source link: https://xfliu1998.github.io/2022/01/18/5.6-Spark-SQL/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

一直进步 做喜欢的

Spark SQL & Spark streaming

Created2022-01-18|Updated2022-03-17|Search and Recommendation
Word count:8.2k|Reading time:34min|Post View:88|Comments:

推荐系统学习笔记目录

Spark SQL

Spark SQL概念

  • Spark SQL is Apache Spark’s module for working with structured data.
    • 它是spark中用于处理结构化数据的一个模块

Spark SQL历史

  • Hive是目前大数据领域,事实上的数据仓库标准。
  • Shark:shark底层使用spark的基于内存的计算模型,从而让性能比Hive提升了数倍到上百倍。
  • 底层很多东西还是依赖于Hive,修改了内存管理、物理计划、执行三个模块
  • 2014年6月1日的时候,Spark宣布了不再开发Shark,全面转向Spark SQL的开发

Spark SQL优势

  • Write Less Code
  • Performance

python操作RDD,转换为可执行代码,运行在java虚拟机,涉及两个不同语言引擎之间的切换,进行进程间 通信很耗费性能。

DataFrame

  • 是RDD为基础的分布式数据集,类似于传统关系型数据库的二维表,dataframe记录了对应列的名称和类型
  • dataFrame引入schema和off-heap(使用操作系统层面上的内存)
    • 1、解决了RDD的缺点
    • 序列化和反序列化开销大
    • 频繁的创建和销毁对象造成大量的GC
    • 2、丢失了RDD的优点
    • RDD编译时进行类型检查
    • RDD具有面向对象编程的特性

用scala/python编写的RDD比Spark SQL编写转换的RDD慢,涉及到执行计划

  • CatalystOptimizer:Catalyst优化器
  • ProjectTungsten:钨丝计划,为了提高RDD的效率而制定的计划
  • Code gen:代码生成器

直接编写RDD也可以自实现优化代码,但是远不及SparkSQL前面的优化操作后转换的RDD效率高,快1倍左右

优化引擎:类似mysql等关系型数据库基于成本的优化器

首先执行逻辑执行计划,然后转换为物理执行计划(选择成本最小的),通过Code Generation最终生成为RDD

  • Language-independent API

    用任何语言编写生成的RDD都一样,而使用spark-core编写的RDD,不同的语言生成不同的RDD

  • Schema

    结构化数据,可以直接看出数据的详情

    在RDD中无法看出,解释性不强,无法告诉引擎信息,没法详细优化。

**为什么要学习sparksql **

sparksql特性

  • 1、易整合
  • 2、统一的数据源访问
  • 3、兼容hive
  • 4、提供了标准的数据库连接(jdbc/odbc)

DataFrame

在Spark语义中,DataFrame是一个分布式的行集合,可以想象为一个关系型数据库的表,或者一个带有列名的Excel表格。它和RDD一样,有这样一些特点:

  • Immuatable:一旦RDD、DataFrame被创建,就不能更改,只能通过transformation生成新的RDD、DataFrame
  • Lazy Evaluations:只有action才会触发Transformation的执行
  • Distributed:DataFrame和RDD一样都是分布式的
  • dataframe和dataset统一,dataframe只是dataset[ROW]的类型别名。由于Python是弱类型语言,只能使用DataFrame

DataFrame vs RDD

  • RDD:分布式的对象的集合,Spark并不知道对象的详细模式信息
  • DataFrame:分布式的Row对象的集合,其提供了由列组成的详细模式信息,使得Spark SQL可以进行某些形式的执行优化。
  • DataFrame和普通的RDD的逻辑框架区别如下所示:
  • 左侧的RDD Spark框架本身不了解 Person类的内部结构。

  • 右侧的DataFrame提供了详细的结构信息(schema——每列的名称,类型)

  • DataFrame还配套了新的操作数据的方法,DataFrame API(如df.select())和SQL(select id, name from xx_table where …)。

  • DataFrame还引入了off-heap,意味着JVM堆以外的内存, 这些内存直接受操作系统管理(而不是JVM)。

  • RDD是分布式的Java对象的集合。DataFrame是分布式的Row对象的集合。DataFrame除了提供了比RDD更丰富的算子以外,更重要的特点是提升执行效率、减少数据读取以及执行计划的优化。

  • DataFrame的抽象后,我们处理数据更加简单了,甚至可以用SQL来处理数据了

  • 通过DataFrame API或SQL处理数据,会自动经过Spark 优化器(Catalyst)的优化,即使你写的程序或SQL不仅高效,也可以运行的很快。

  • DataFrame相当于是一个带着schema的RDD

Pandas DataFrame vs Spark DataFrame

  • Cluster Parallel:集群并行执行
  • Lazy Evaluations: 只有action才会触发Transformation的执行
  • Immutable:不可更改
  • Pandas rich API:比Spark SQL api丰富

创建DataFrame

  1. 创建dataFrame的步骤
    调用方法例如:spark.read.xxx方法

  2. 其他方式创建dataframe

  • createDataFrame:pandas dataframe、list、RDD

  • 数据源:RDD、csv、json、parquet、orc、jdbc

      jsonDF = spark.read.json("xxx.json")

    jsonDF = spark.read.format('json').load('xxx.json')

    parquetDF = spark.read.parquet("xxx.parquet")

    jdbcDF = spark.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/db_name").option("dbtable","table_name").option("user","xxx").option("password","xxx").load()
    PNG

    - Transformation:延迟性操作

    - action:立即操作

    ![s14](s14.PNG)

    ## DataFrame API实现

    **基于RDD创建**

    ```python
    from pyspark.sql import SparkSession
    from pyspark.sql import Row

    spark = SparkSession.builder.appName('test').getOrCreate()
    sc = spark.sparkContext
    # spark.conf.set("spark.sql.shuffle.partitions", 6)
    # ================直接创建==========================
    l = [('Ankit',25),('Jalfaizy',22),('saurabh',20),('Bala',26)]
    rdd = sc.parallelize(l)
    #为数据添加列名
    people = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
    #创建DataFrame
    schemaPeople = spark.createDataFrame(people)

从csv中读取数据

# ==================从csv读取======================
#加载csv类型的数据并转换为DataFrame
df = spark.read.format("csv"). \
option("header", "true") \
.load("iris.csv")
#显示数据结构
df.printSchema()
#显示前10条数据
df.show(10)
#统计总量
df.count()
#列名
df.columns

增加一列

# ===============增加一列(或者替换) withColumn===========
#定义一个新的列,数据为其他某列数据的两倍
#如果操作的是原有列,可以替换原有列的数据
df.withColumn('newWidth',df.SepalWidth * 2).show()

删除一列

# ==========删除一列  drop=========================
#删除一列
df.drop('cls').show()

统计信息

#================ 统计信息 describe================
df.describe().show()
#计算某一列的描述信息
df.describe('cls').show()

提取部分列

# ===============提取部分列 select==============
df.select('SepalLength','SepalWidth').show()

基本统计功能

# ==================基本统计功能 distinct count=====
df.select('cls').distinct().count()

分组统计

# 分组统计 groupby(colname).agg({'col':'fun','col2':'fun2'})
df.groupby('cls').agg({'SepalWidth':'mean','SepalLength':'max'}).show()

# avg(), count(), countDistinct(), first(), kurtosis(),
# max(), mean(), min(), skewness(), stddev(), stddev_pop(),
# stddev_samp(), sum(), sumDistinct(), var_pop(), var_samp() and variance()

自定义的汇总方法

# 自定义的汇总方法
import pyspark.sql.functions as fn
#调用函数并起一个别名
df.agg(fn.count('SepalWidth').alias('width_count'),fn.countDistinct('cls').alias('distinct_cls_count')).show()

拆分数据集

#====================数据集拆成两部分 randomSplit ===========
#设置数据比例将数据划分为两部分
trainDF, testDF = df.randomSplit([0.6, 0.4])

采样数据

# ================采样数据 sample===========
#withReplacement:是否有放回的采样
#fraction:采样比例
#seed:随机种子
sdf = df.sample(False,0.2,100)

查看两个数据集在类别上的差异

#查看两个数据集在类别上的差异 subtract,确保训练数据集覆盖了所有分类
diff_in_train_test = testDF.select('cls').subtract(trainDF.select('cls'))
diff_in_train_test.distinct().count()

交叉表

# ================交叉表 crosstab=============
df.crosstab('cls','SepalLength').show()

udf

udf:自定义函数

#================== 综合案例 + udf================
# 测试数据集中有些类别在训练集中是不存在的,找到这些数据集做后续处理
trainDF,testDF = df.randomSplit([0.99,0.01])

diff_in_train_test = trainDF.select('cls').subtract(testDF.select('cls')).distinct().show()

#首先找到这些类,整理到一个列表
not_exist_cls = trainDF.select('cls').subtract(testDF.select('cls')).distinct().rdd.map(lambda x :x[0]).collect()

#定义一个方法,用于检测
def should_remove(x):
if x in not_exist_cls:
return -1
else :
return x

#创建udf,udf函数需要两个参数:
# Function
# Return type (in my case StringType())

#在RDD中可以直接定义函数,交给rdd的transformatioins方法进行执行
#在DataFrame中需要通过udf将自定义函数封装成udf函数再交给DataFrame进行调用执行

from pyspark.sql.types import StringType
from pyspark.sql.functions import udf


check = udf(should_remove,StringType())

resultDF = trainDF.withColumn('New_cls',check(trainDF['cls'])).filter('New_cls <> -1')

resultDF.show()

JSON数据的处理

JSON数据

  • Spark SQL can automatically infer the schema of a JSON dataset and load it as a DataFrame

    Spark SQL能够自动将JSON数据集以结构化的形式加载为一个DataFrame

  • This conversion can be done using SparkSession.read.json on a JSON file

    读取一个JSON文件可以用SparkSession.read.json方法

从JSON到DataFrame

  • 指定DataFrame的schema

    1. 通过反射自动推断,适合静态数据
    2. 程序指定,适合程序运行中动态生成的数据

加载json数据

#使用内部的schema
jsonDF = spark.read.json("xxx.json")
jsonDF = spark.read.format('json').load('xxx.json')

#指定schema
jsonDF = spark.read.schema(jsonSchema).json('xxx.json')

嵌套结构的JSON

  • 重要的方法

    1. get_json_object
    2. get_json
    3. explode

静态json数据的读取和操作

无嵌套结构的json数据

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('json_demo').getOrCreate()
sc = spark.sparkContext

# ==========================================
# 无嵌套结构的json
# ==========================================
jsonString = [
"""{ "id" : "01001", "city" : "AGAWAM", "pop" : 15338, "state" : "MA" }""",
"""{ "id" : "01002", "city" : "CUSHMAN", "pop" : 36963, "state" : "MA" }"""
]

从json字符串数组得到DataFrame

# 从json字符串数组得到rdd有两种方法
# 1. 转换为rdd,再从rdd到DataFrame
# 2. 直接利用spark.createDataFrame(),见后面例子

jsonRDD = sc.parallelize(jsonString) # stringJSONRDD
jsonDF = spark.read.json(jsonRDD) # convert RDD into DataFrame
jsonDF.printSchema()
jsonDF.show()

直接从文件生成DataFrame

# -- 直接从文件生成DataFrame
#只有被压缩后的json文件内容,才能被spark-sql正确读取,否则格式化后的数据读取会出现问题
jsonDF = spark.read.json("xxx.json")
# or
# jsonDF = spark.read.format('json').load('xxx.json')

jsonDF.printSchema()
jsonDF.show()

jsonDF.filter(jsonDF.pop>4000).show(10)
#依照已有的DataFrame,创建一个临时的表(相当于mysql数据库中的一个表),这样就可以用纯sql语句进行数据操作
jsonDF.createOrReplaceTempView("tmp_table")

resultDF = spark.sql("select * from tmp_table where pop>4000")
resultDF.show(10)

动态json数据的读取和操作

指定DataFrame的Schema

3.1节中的例子为通过反射自动推断schema,适合静态数据

下面我们来讲解如何进行程序指定schema

没有嵌套结构的json

jsonString = [
"""{ "id" : "01001", "city" : "AGAWAM", "pop" : 15338, "state" : "MA" }""",
"""{ "id" : "01002", "city" : "CUSHMAN", "pop" : 36963, "state" : "MA" }"""
]

jsonRDD = sc.parallelize(jsonString)

from pyspark.sql.types import *

#定义结构类型
#StructType:schema的整体结构,表示JSON的对象结构
#XXXStype:指的是某一列的数据类型
jsonSchema = StructType() \
.add("id", StringType(),True) \
.add("city", StringType()) \
.add("pop" , LongType()) \
.add("state",StringType())

jsonSchema = StructType() \
.add("id", LongType(),True) \
.add("city", StringType()) \
.add("pop" , DoubleType()) \
.add("state",StringType())

reader = spark.read.schema(jsonSchema)

jsonDF = reader.json(jsonRDD)
jsonDF.printSchema()
jsonDF.show()

带有嵌套结构的json

from pyspark.sql.types import *
jsonSchema = StructType([
StructField("id", StringType(), True),
StructField("city", StringType(), True),
StructField("loc" , ArrayType(DoubleType())),
StructField("pop", LongType(), True),
StructField("state", StringType(), True)
])

reader = spark.read.schema(jsonSchema)
jsonDF = reader.json('data/nest.json')
jsonDF.printSchema()
jsonDF.show(2)
jsonDF.filter(jsonDF.pop>4000).show(10)

前面我们处理的数据实际上都是已经被处理好的规整数据,但是在大数据整个生产过程中,需要先对数据进行数据清洗,将杂乱无章的数据整理为符合后面处理要求的规整数据。

数据去重

'''
1.删除重复数据

groupby().count():可以看到数据的重复情况
'''
df = spark.createDataFrame([
(1, 144.5, 5.9, 33, 'M'),
(2, 167.2, 5.4, 45, 'M'),
(3, 124.1, 5.2, 23, 'F'),
(4, 144.5, 5.9, 33, 'M'),
(5, 133.2, 5.7, 54, 'F'),
(3, 124.1, 5.2, 23, 'F'),
(5, 129.2, 5.3, 42, 'M'),
], ['id', 'weight', 'height', 'age', 'gender'])

# 查看重复记录
#无意义重复数据去重:数据中行与行完全重复
# 1.首先删除完全一样的记录
df2 = df.dropDuplicates()

#有意义去重:删除除去无意义字段之外的完全重复的行数据
# 2.其次,关键字段值完全一模一样的记录(在这个例子中,是指除了id之外的列一模一样)
# 删除某些字段值完全一样的重复记录,subset参数定义这些字段
df3 = df2.dropDuplicates(subset = [c for c in df2.columns if c!='id'])
# 3.有意义的重复记录去重之后,再看某个无意义字段的值是否有重复(在这个例子中,是看id是否重复)
# 查看某一列是否有重复值
import pyspark.sql.functions as fn
df3.agg(fn.count('id').alias('id_count'),fn.countDistinct('id').alias('distinct_id_count')).collect()
# 4.对于id这种无意义的列重复,添加另外一列自增id

df3.withColumn('new_id',fn.monotonically_increasing_id()).show()

缺失值处理

'''
2.处理缺失值
2.1 对缺失值进行删除操作(行,列)
2.2 对缺失值进行填充操作(列的均值)
2.3 对缺失值对应的行或列进行标记
'''
df_miss = spark.createDataFrame([
(1, 143.5, 5.6, 28,'M', 100000),
(2, 167.2, 5.4, 45,'M', None),
(3, None , 5.2, None, None, None),
(4, 144.5, 5.9, 33, 'M', None),
(5, 133.2, 5.7, 54, 'F', None),
(6, 124.1, 5.2, None, 'F', None),
(7, 129.2, 5.3, 42, 'M', 76000),],
['id', 'weight', 'height', 'age', 'gender', 'income'])

# 1.计算每条记录的缺失值情况

df_miss.rdd.map(lambda row:(row['id'],sum([c==None for c in row]))).collect()
[(1, 0), (2, 1), (3, 4), (4, 1), (5, 1), (6, 2), (7, 0)]

# 2.计算各列的缺失情况百分比
df_miss.agg(*[(1 - (fn.count(c) / fn.count('*'))).alias(c + '_missing') for c in df_miss.columns]).show()

# 3、删除缺失值过于严重的列
# 其实是先建一个DF,不要缺失值的列
df_miss_no_income = df_miss.select([
c for c in df_miss.columns if c != 'income'
])

# 4、按照缺失值删除行(threshold是根据一行记录中,缺失字段的百分比的定义)
df_miss_no_income.dropna(thresh=3).show()

# 5、填充缺失值,可以用fillna来填充缺失值,
# 对于bool类型、或者分类类型,可以为缺失值单独设置一个类型,missing
# 对于数值类型,可以用均值或者中位数等填充

# fillna可以接收两种类型的参数:
# 一个数字、字符串,这时整个DataSet中所有的缺失值都会被填充为相同的值。
# 也可以接收一个字典{列名:值}这样

# 先计算均值,并组织成一个字典
means = df_miss_no_income.agg( *[fn.mean(c).alias(c) for c in df_miss_no_income.columns if c != 'gender']).toPandas().to_dict('records')[0]
# 然后添加其它的列
means['gender'] = 'missing'

df_miss_no_income.fillna(means).show()

异常值处理

'''
3、异常值处理
异常值:不属于正常的值 包含:缺失值,超过正常范围内的较大值或较小值
分位数去极值
中位数绝对偏差去极值
正态分布去极值
上述三种操作的核心都是:通过原始数据设定一个正常的范围,超过此范围的就是一个异常值
'''
df_outliers = spark.createDataFrame([
(1, 143.5, 5.3, 28),
(2, 154.2, 5.5, 45),
(3, 342.3, 5.1, 99),
(4, 144.5, 5.5, 33),
(5, 133.2, 5.4, 54),
(6, 124.1, 5.1, 21),
(7, 129.2, 5.3, 42),
], ['id', 'weight', 'height', 'age'])
# 设定范围 超出这个范围的 用边界值替换

# approxQuantile方法接收三个参数:参数1,列名;参数2:想要计算的分位点,可以是一个点,也可以是一个列表(0和1之间的小数),第三个参数是能容忍的误差,如果是0,代表百分百精确计算。

cols = ['weight', 'height', 'age']

bounds = {}
for col in cols:
quantiles = df_outliers.approxQuantile(col, [0.25, 0.75], 0.05)
IQR = quantiles[1] - quantiles[0]
bounds[col] = [
quantiles[0] - 1.5 * IQR,
quantiles[1] + 1.5 * IQR
]

>>> bounds
{'age': [-11.0, 93.0], 'height': [4.499999999999999, 6.1000000000000005], 'weight': [91.69999999999999, 191.7]}

# 为异常值字段打标志
outliers = df_outliers.select(*['id'] + [( (df_outliers[c] < bounds[c][0]) | (df_outliers[c] > bounds[c][1]) ).alias(c + '_o') for c in cols ])
outliers.show()
#
# +---+--------+--------+-----+
# | id|weight_o|height_o|age_o|
# +---+--------+--------+-----+
# | 1| false| false|false|
# | 2| false| false|false|
# | 3| true| false| true|
# | 4| false| false|false|
# | 5| false| false|false|
# | 6| false| false|false|
# | 7| false| false|false|
# +---+--------+--------+-----+

# 再回头看看这些异常值的值,重新和原始数据关联

df_outliers = df_outliers.join(outliers, on='id')
df_outliers.filter('weight_o').select('id', 'weight').show()
# +---+------+
# | id|weight|
# +---+------+
# | 3| 342.3|
# +---+------+

df_outliers.filter('age_o').select('id', 'age').show()
# +---+---+
# | id|age|
# +---+---+
# | 3| 99|
# +---+---+

sparkStreaming

sparkStreaming概述

SparkStreaming是什么

  • 它是一个可扩展,高吞吐具有容错性的流式计算框架

    吞吐量:单位时间内成功传输数据的数量

之前我们接触的spark-core和spark-sql都是处理属于离线批处理任务,数据一般都是在固定位置上,通常我们写好一个脚本,每天定时去处理数据,计算,保存数据结果。这类任务通常是T+1(一天一个任务),对实时性要求不高。

但在企业中存在很多实时性处理的需求,例如:双十一的京东阿里,通常会做一个实时的数据大屏,显示实时订单。这种情况下,对数据实时性要求较高,仅仅能够容忍到延迟1分钟或几秒钟。

实时计算框架对比

Storm

  • 流式计算框架
  • 以record为单位处理数据
  • 也支持micro-batch方式(Trident)

Spark

  • 批处理计算框架
  • 以RDD为单位处理数据
  • 支持micro-batch流式处理数据(Spark Streaming)
  • 吞吐量:Spark Streaming优于Storm
  • 延迟:Spark Streaming差于Storm

SparkStreaming的组件

  • Streaming Context
    • 一旦一个Context已经启动(调用了Streaming Context的start()),就不能有新的流算子(Dstream)建立或者是添加到context中
    • 一旦一个context已经停止,不能重新启动(Streaming Context调用了stop方法之后 就不能再次调 start())
    • 在JVM(java虚拟机)中, 同一时间只能有一个Streaming Context处于活跃状态, 一个SparkContext创建一个Streaming Context
    • 在Streaming Context上调用Stop方法, 也会关闭SparkContext对象, 如果只想仅关闭Streaming Context对象,设置stop()的可选参数为false
    • 一个SparkContext对象可以重复利用去创建多个Streaming Context对象(不关闭SparkContext前提下), 但是需要关一个再开下一个
  • DStream (离散流)
    • 代表一个连续的数据流
    • 在内部, DStream由一系列连续的RDD组成
    • DStreams中的每个RDD都包含确定时间间隔内的数据
    • 任何对DStreams的操作都转换成了对DStreams隐含的RDD的操作
    • 数据源
      • 基本源
        • TCP/IP Socket
        • FileSystem
      • 高级源
        • Kafka
        • Flume

Spark Streaming编码实践

Spark Streaming编码步骤:

  • 创建一个StreamingContext
  • 从StreamingContext中创建一个数据对象
  • 对数据对象进行Transformations操作
  • 开始和停止

利用Spark Streaming实现WordCount

需求:监听某个端口上的网络数据,实时统计出现的不同单词个数。

  1. 需要安装一个nc工具:sudo yum install -y nc
  2. 执行指令:nc -lk 9999 -v
import os
# 配置spark driver和pyspark运行时,所使用的python解释器路径
PYSPARK_PYTHON = "/home/hadoop/miniconda3/envs/datapy365spark23/bin/python"
JAVA_HOME='/home/hadoop/app/jdk1.8.0_191'
SPARK_HOME = "/home/hadoop/app/spark-2.3.0-bin-2.6.0-cdh5.7.0"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
os.environ['JAVA_HOME']=JAVA_HOME
os.environ["SPARK_HOME"] = SPARK_HOME

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":

sc = SparkContext("local[2]",appName="NetworkWordCount")
#参数2:指定执行计算的时间间隔
ssc = StreamingContext(sc, 1)
#监听ip,端口上的上的数据
lines = ssc.socketTextStream('localhost',9999)
#将数据按空格进行拆分为多个单词
words = lines.flatMap(lambda line: line.split(" "))
#将单词转换为(单词,1)的形式
pairs = words.map(lambda word:(word,1))
#统计单词个数
wordCounts = pairs.reduceByKey(lambda x,y:x+y)
#打印结果信息,会使得前面的transformation操作执行
wordCounts.pprint()
#启动StreamingContext
ssc.start()
#等待计算结束
ssc.awaitTermination()

可视化查看效果:http://192.168.199.188:4040

点击streaming,查看效果

Spark Streaming的状态操作

在Spark Streaming中存在两种状态操作

  • UpdateStateByKey
  • Windows操作

使用有状态的transformation,需要开启Checkpoint

  • spark streaming 的容错机制
  • 它将足够多的信息checkpoint到某些具备容错性的存储系统如hdfs上,以便出错时能够迅速恢复

updateStateByKey

Spark Streaming实现的是一个实时批处理操作,每隔一段时间将数据进行打包,封装成RDD,是无状态的。

无状态:指的是每个时间片段的数据之间是没有关联的。

需求:想要将一个大时间段(1天),即多个小时间段的数据内的数据持续进行累积操作

一般超过一天都是用RDD或Spark SQL来进行离线批处理

如果没有UpdateStateByKey,我们需要将每一秒的数据计算好放入mysql中取,再用mysql来进行统计计算

Spark Streaming中提供这种状态保护机制,即updateStateByKey

  • 首先,要定义一个state,可以是任意的数据类型
  • 其次,要定义state更新函数–指定一个函数如何使用之前的state和新值来更新state
  • 对于每个batch,Spark都会为每个之前已经存在的key去应用一次state更新函数,无论这个key在batch中是否有新的数据。如果state更新函数返回none,那么key对应的state就会被删除
  • 对于每个新出现的key,也会执行state更新函数

举例:词统计。

案例:updateStateByKey

需求:监听网络端口的数据,获取到每个批次的出现的单词数量,并且需要把每个批次的信息保留下来

代码

import os
# 配置spark driver和pyspark运行时,所使用的python解释器路径
PYSPARK_PYTHON = "/home/hadoop/miniconda3/envs/datapy365spark23/bin/python"
JAVA_HOME='/home/hadoop/app/jdk1.8.0_191'
SPARK_HOME = "/home/hadoop/app/spark-2.3.0-bin-2.6.0-cdh5.7.0"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
os.environ['JAVA_HOME']=JAVA_HOME
os.environ["SPARK_HOME"] = SPARK_HOME
from pyspark.streaming import StreamingContext
from pyspark.sql.session import SparkSession

# 创建SparkContext
spark = SparkSession.builder.master("local[2]").getOrCreate()
sc = spark.sparkContext

ssc = StreamingContext(sc, 3)
#开启检查点
ssc.checkpoint("checkpoint")

#定义state更新函数
def updateFunc(new_values, last_sum):
return sum(new_values) + (last_sum or 0)

lines = ssc.socketTextStream("localhost", 9999)
# 对数据以空格进行拆分,分为多个单词
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.updateStateByKey(updateFunc=updateFunc)#应用updateStateByKey函数

counts.pprint()

ssc.start()
ssc.awaitTermination()

Windows

  • 窗口长度L:运算的数据量
  • 滑动间隔G:控制每隔多长时间做一次运算

每隔G秒,统计最近L秒的数据

操作细节

  • Window操作是基于窗口长度和滑动间隔来工作的
  • 窗口的长度控制考虑前几批次数据量
  • 默认为批处理的滑动间隔来确定计算结果的频率

相关函数

  • Smart computation
  • invAddFunc

reduceByKeyAndWindow(func,invFunc,windowLength,slideInterval,[num,Tasks])

func:正向操作,类似于updateStateByKey

invFunc:反向操作

例如在热词时,在上一个窗口中可能是热词,这个一个窗口中可能不是热词,就需要在这个窗口中把该次剔除掉

典型案例:热点搜索词滑动统计,每隔10秒,统计最近60秒钟的搜索词的搜索频次,并打印出最靠前的3个搜索词出现次数。

案例

监听网络端口的数据,每隔3秒统计前6秒出现的单词数量

import os
# 配置spark driver和pyspark运行时,所使用的python解释器路径
PYSPARK_PYTHON = "/home/hadoop/miniconda3/envs/datapy365spark23/bin/python"
JAVA_HOME='/home/hadoop/app/jdk1.8.0_191'
SPARK_HOME = "/home/hadoop/app/spark-2.3.0-bin-2.6.0-cdh5.7.0"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
os.environ['JAVA_HOME']=JAVA_HOME
os.environ["SPARK_HOME"] = SPARK_HOME
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql.session import SparkSession

def get_countryname(line):
country_name = line.strip()

if country_name == 'usa':
output = 'USA'
elif country_name == 'ind':
output = 'India'
elif country_name == 'aus':
output = 'Australia'
else:
output = 'Unknown'

return (output, 1)

if __name__ == "__main__":
#定义处理的时间间隔
batch_interval = 1 # base time unit (in seconds)
#定义窗口长度
window_length = 6 * batch_interval
#定义滑动时间间隔
frequency = 3 * batch_interval

#获取StreamingContext
spark = SparkSession.builder.master("local[2]").getOrCreate()
sc = spark.sparkContext
ssc = StreamingContext(sc, batch_interval)

#需要设置检查点
ssc.checkpoint("checkpoint")

lines = ssc.socketTextStream('localhost', 9999)
addFunc = lambda x, y: x + y
invAddFunc = lambda x, y: x - y
#调用reduceByKeyAndWindow,来进行窗口函数的调用
window_counts = lines.map(get_countryname) \
.reduceByKeyAndWindow(addFunc, invAddFunc, window_length, frequency)
#输出处理结果信息
window_counts.pprint()

ssc.start()
ssc.awaitTermination()

Spark Streaming对接Kafka

对接数据的两种方式

在前面的案例中,我们监听了来自网络端口的数据,实现了WordCount,但是在实际开发中并不是这样。我们更多的是接收来自高级数据源的数据,例如Kafka。

下面我们来介绍如何利用Spark Streaming对接Kafka

以下两种方式都是为了数据可靠性:

  • Receiver-based Approach:由Receiver来对接数据,Receiver接收到数据后会将日志预先写入到hdfs上(WAL),同时也会将数据做副本传输到其他的Worker节点。在读取数据的过程中,Receiver是从Zookeeper中获取数据的偏移信息。
  • Direct Approach(No Receivers):没有Receiver接收信息,由Spark Streaming直接对接Kafka的broker,获取数据和数据的偏移信息。

上述两种方式中,Direct Approach方式更加可靠,不需要Spark Streaming自己去保证维护数据的可靠性,而是由善于处理这类工作的Kafka来做。

对应代码

  • KafkaUtils.createStream(ssc,zkQuorum,”spark-streaming-consumer”,{topic:1})
  • KafkaUtils.createDirectStream(ssc,[topic],{“metadata.broker.list”:’localhost:9092’})

Direct API的好处

  • 简化的并行:在Receiver的方式中我们提到创建多个Receiver之后利用union来合并成一个Dstream的方式提高数据传输并行度。而在Direct方式中,Kafka中的partition与RDD中的partition是一一对应的并行读取Kafka数据,这种映射关系也更利于理解和优化。
  • 高效:在Receiver的方式中,为了达到0数据丢失需要将数据存入Write Ahead Log中,这样在Kafka和日志中就保存了两份数据,浪费!而第二种方式不存在这个问题,只要我们Kafka的数据保留时间足够长,我们都能够从Kafka进行数据恢复。
  • 精确一次:在Receiver的方式中,使用的是Kafka的高阶API接口从Zookeeper中获取offset值,这也是传统的从Kafka中读取数据的方式,但由于Spark Streaming消费的数据和Zookeeper中记录的offset不同步,这种方式偶尔会造成数据重复消费。而第二种方式,直接使用了简单的低阶Kafka API,Offsets则利用Spark Streaming的checkpoints进行记录,消除了这种不一致性。

步骤:

  • 配置spark streaming kafka开发环境

      1. 下载spark streaming集成kafka的jar包
        spark-streaming-kafka-0-8-assembly_2.11-2.3.0.jar
      1. 将jar包放置到spark的jars目录下
      1. 编辑spark/conf目录下的spark-defaults.conf,添加如下两条配置
      spark.driver.extraClassPath=$SPAKR_HOME/jars/spark-streaming-kafka-0-8-assembly_2.11-2.3.0.jar
      spark.executor.extraClassPath=$SPARK_HOME/jars/spark-streaming-kafka-0-8-assembly_2.11-2.3.0.jar
      #driver和executor对应的两个路径一致
  • 测试配置是否成功

    • 启动zookeeper

      zkServer.sh start
    • 启动kafka

      kafka-server-start.sh config/server.properties
    • 创建topic

      • bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic test

        replication-factor:副本数量

        partitions:分区数量

        出现Created topic “test”,说明创建成功

    • 查看所有topic

      • bin/kafka-topics.sh –list –zookeeper localhost:2181
    • 通过Pycharm远程连接Centos 创建代码

    • 通过KafkaUtils 成功连接Kafka 创建DStream对象说明连接成功

      import os
      # 配置spark driver和pyspark运行时,所使用的python解释器路径
      PYSPARK_PYTHON = "/home/hadoop/miniconda3/envs/datapy365spark23/bin/python"
      JAVA_HOME='/home/hadoop/app/jdk1.8.0_191'
      SPARK_HOME = "/home/hadoop/app/spark-2.3.0-bin-2.6.0-cdh5.7.0"
      # 当存在多个版本时,不指定很可能会导致出错
      os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
      os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
      os.environ['JAVA_HOME']=JAVA_HOME
      os.environ["SPARK_HOME"] = SPARK_HOME
      from pyspark.streaming import StreamingContext
      from pyspark.streaming.kafka import KafkaUtils
      from pyspark.sql.session import SparkSession

      sc = sparkContext('master[2]','kafkastreamingtest'
      ssc = StreamingContext(sc,3)
      #createDirectStream 连接kafka数据源获取数据
      # 参数1 streamingcontext
      #参数2 topic的名字
      # 参数3 kafka broker地址
      ks = KafkaUtils.createDirectStream(ssc,["topic1"],{"metadata.broker.list":"localhost:9092"})

需求:利用Spark Streaming不断处理来自Kafka生产者生产的数据,并统计出现的单词数量

    1. 编写producer.py,用于生产数据
    from kafka import KafkaProducer
    import time

    #创建KafkaProducer,连接broker
    producer = KafkaProducer(bootstrap_servers='localhost:9092')

    #每隔一段时间发送一端字符串数据到broker
    def send_data():
    for i in range(60):
    # (key,value) 参数2 是value
    producer.send('topic_name',"hello,kafka,spark,streaming,kafka")
    time.sleep(2)
    send_data()
    1. 编辑Spark Streaming代码,统计单词出现的数量
    from pyspark.streaming import StreamingContext
    from pyspark.streaming.kafka import KafkaUtils
    from pyspark.sql.session import SparkSession

    topic="topic_name"

    spark = SparkSession.builder.master("local[2]").getOrCreate()
    sc = spark.sparkContext
    ssc = StreamingContext(sc,3)

    #创建direct连接,指定要连接的topic和broker地址
    ks = KafkaUtils.createDirectStream(ssc,[topic],{"metadata.broker.list":"localhost:9092"})
    #(None,内容)
    ks.pprint()
    #(key,value)
    #以下代码每操作一次,就打印输出一次
    lines = ks.map(lambda x:x[1])
    lines.pprint()

    words = lines.flatMap(lambda line:line.split(","))
    #words.pprint()

    pairs = words.map(lambda word:(word,1))
    #pairs.pprint()

    counts = pairs.reduceByKey(lambda x,y:x+y)
    counts.pprint()

    ssc.start()
    #等待计算结束
    ssc.awaitTermination()
    1. 开启Spark Streaming消费数据,将产生的日志结果存储到日志中

    spark-submit xxx.py>a.log

    1. 开启producer.py,生产数据

    python3 producer.py

    1. 通过浏览器观察运算过程

    http://node-teach:4040

    1. 分析生成的日志内容
    -------------------------------------------
    Time: 2018-12-11 01:31:21
    -------------------------------------------
    (None, 'hello,kafka,spark,streaming,kafka')
    (None, 'hello,kafka,spark,streaming,kafka')
    (None, 'hello,kafka,spark,streaming,kafka')
    (None, 'hello,kafka,spark,streaming,kafka')

    -------------------------------------------
    Time: 2018-12-11 01:02:33
    -------------------------------------------
    hello,kafka,spark,streaming,kafka
    hello,kafka,spark,streaming,kafka

    -------------------------------------------
    Time: 2018-12-11 01:02:33
    -------------------------------------------
    hello
    kafka
    spark
    streaming
    kafka
    hello
    kafka
    spark
    streaming
    kafka

    -------------------------------------------
    Time: 2018-12-11 01:02:33
    -------------------------------------------
    ('hello', 1)
    ('kafka', 1)
    ('spark', 1)
    ('streaming', 1)
    ('kafka', 1)
    ('hello', 1)
    ('kafka', 1)
    ('spark', 1)
    ('streaming', 1)
    ('kafka', 1)

    -------------------------------------------
    Time: 2018-12-11 01:02:33
    -------------------------------------------
    ('streaming', 2)
    ('hello', 2)
    ('kafka', 4)
    ('spark', 2)

    -------------------------------------------
    Time: 2018-12-11 01:02:36
    -------------------------------------------

    -------------------------------------------
    Time: 2018-12-11 01:02:36
    -------------------------------------------

Spark Streaming对接flume

flume作为日志实时采集的框架,可以与SparkStreaming实时处理框架进行对接,flume实时产生数据,sparkStreaming做实时处理。

Spark Streaming对接FlumeNG有两种方式,一种是FlumeNG将消息Push推给Spark Streaming,还有一种是Spark Streaming从flume 中Pull拉取数据。

Pull方式

    1. 安装flume1.6以上
    1. 下载依赖包

    spark-streaming-flume-assembly_2.11-2.3.0.jar放入到flume的lib目录下

    1. 写flume的agent,注意既然是拉取的方式,那么flume向自己所在的机器上产数据就行
    1. 编写flume-pull.conf配置文件
    simple-agent.sources = netcat-source
    simple-agent.sinks = spark-sink
    simple-agent.channels = memory-channel

    # source
    simple-agent.sources.netcat-source.type = netcat
    simple-agent.sources.netcat-source.bind = localhost
    simple-agent.sources.netcat-source.port = 44444

    # Describe the sink
    simple-agent.sinks.spark-sink.type = org.apache.spark.streaming.flume.sink.SparkSink
    simple-agent.sinks.spark-sink.hostname = localhost
    simple-agent.sinks.spark-sink.port = 41414

    # Use a channel which buffers events in memory
    simple-agent.channels.memory-channel.type = memory

    # Bind the source and sink to the channel
    simple-agent.sources.netcat-source.channels = memory-channel
    simple-agent.sinks.spark-sink.channel=memory-channel
  • 5,启动flume

    flume-ng agent -n simple-agent -f flume-pull.conf -Dflume.root.logger=INFO,console

  • 6,编写word count代码

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.flume import FlumeUtils

sc=SparkContext("local[2]","FlumeWordCount_Pull")
#处理时间间隔为2s
ssc=StreamingContext(sc,2)

#利用flume工具类创建pull方式的流
lines = FlumeUtils.createPollingStream(ssc, [("localhost",41414)])

lines1=lines.map(lambda x:x[1])
counts = lines1.flatMap(lambda line:line.split(" "))\
.map(lambda word:(word,1))\
.reduceByKey(lambda a,b:a+b)
counts.pprint()
#启动spark streaming应用
ssc.start()
#等待计算终止
ssc.awaitTermination()

bin/spark-submit --jars xxx/spark-streaming-flume-assembly_2.11-2.3.0.jar xxx/flume_pull.py

push方式

大部分操作和之前一致

flume配置

simple-agent.sources = netcat-source
simple-agent.sinks = avro-sink
simple-agent.channels = memory-channel

simple-agent.sources.netcat-source.type = netcat
simple-agent.sources.netcat-source.bind = localhost
simple-agent.sources.netcat-source.port = 44444

simple-agent.sinks.avro-sink.type = avro
simple-agent.sinks.avro-sink.hostname = localhost
simple-agent.sinks.avro-sink.port = 41414
simple-agent.channels.memory-channel.type = memory
simple-agent.sources.netcat-source.channels = memory-channel

simple-agent.sources.netcat-source.channels = memory-channel
simple-agent.sinks.avro-sink.channel=memory-channel
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.flume import FlumeUtils

sc=SparkContext("local[2]","FlumeWordCount_Push")
#处理时间间隔为2s
ssc=StreamingContext(sc,2)
#创建push方式的DStream
lines = FlumeUtils.createStream(ssc, "localhost",41414)
lines1=lines.map(lambda x:x[1].strip())
#对1s内收到的字符串进行分割
words=lines1.flatMap(lambda line:line.split(" "))
#映射为(word,1)元祖
pairs=words.map(lambda word:(word,1))
wordcounts=pairs.reduceByKey(lambda x,y:x+y)
wordcounts.pprint()
#启动spark streaming应用
ssc.start()
#等待计算终止
ssc.awaitTermination()

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK