5

SparkStreaming_mingyunxiaohai的专栏-CSDN博客

 3 years ago
source link: https://blog.csdn.net/mingyunxiaohai/article/details/81505149
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

SparkStreaming

chsmy2018 2018-08-08 11:24:53 321
分类专栏: 大数据

很多重要的应用要处理大量在线流式数据, 并返结果,比如社交网络趋势追踪,网站指标统计,广告系统,可以使用Spark Streaming来处理

流计算的处理流程一般包含三个阶段:数据实时采集、数据实时计算、实时查询服务

Spark Streaming可整合多种输入数据源,如Kafka、Flume、HDFS,甚至是普通的TCP套接字。经处理后的数据可存储至文件系统、数据库,或显示在仪表盘里

Spark Streaming的基本原理是将实时输入数据流以时间片(秒级)为单位进行拆分,然后经Spark引擎以类似批处理的方式处理每个时间片数据

Storm和SparkStreaming对比

StormSparkStreaming流式计算框架批处理计算框架可以实现毫秒级响应只能到秒级以record为单位处理数据以RDD为单位处理数据支持micro-batch方式(Trident) , 将批处理转化为流式处理问题支持micro-batch流式处理数据(Spark Streaming),将流式处理转化为批处理问题

目前SparkStreaming用的越来越多,为什么呢?
易用性好

  • 供很多高级算子, 实现复杂运算非常简单
  • 流式API和批处理API很类似, 学习成本低
    平台统一
  • 不需要维护两套系统分别用于批处理和流式处理
  • 可以自由调用Spark的组件如Spark SQL & Mllib
    生态丰富
  • 支持各种数据源和数据格式
  • 社区活跃, 发展迅猛

Spark Streaming的运行步骤

  • Driver端会要求Executor端启动接收器Receiver,接收数据
  • 每个Receiver都会负责一个inputDStream(比如从文件中读取数据的文件流,比如套接字流,或者从Kafka中读取的一个输入流等等)
  • Receiver会把接收到的数据分解生多个block放在内存中,如果设置了多副本它会把数据备份,然后会把blocks的信息高速StreamingContext。
  • 当一个批次的时间到了之后,StreamingContext会告诉SparkContext,让SparkContext把job分发到Eexcutor上去执行

编写Spark Streaming程序的基本步骤

  1. 通过创建输入DStream来定义输入源
  2. 通过对DStream应用转换操作和输出操作来定义流计算
  3. 用streamingContext.start()来开始接收数据和处理流程
  4. 通过streamingContext.awaitTermination()方法来等待处理结束(手动结束或因为错误而结束)
  5. 可以通过streamingContext.stop()来手动结束流计算进程
//创建Spark Streaming的上下文
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
Durations.seconds(1));
//获取数据 这里是从socket中获取
JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
args[0], Integer.parseInt(args[1]),
StorageLevels.MEMORY_AND_DISK_SER);
//对获取的数据进行转换
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String,
String>() {
@Override
public Iterator<String> call(String x) {
return Arrays.asList(SPACE.split(x)).iterator();
}
});
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
//输出结果
wordCounts.print();
//启动
ssc.start();
ssc.awaitTermination();

Spark Streaming的输入流的数据来源

以文件为输入流读取数据源
监听一个文件夹下的文件的变化
应用场景: 日志处理

JavaDStream<String> lines = streamingContext.textFileStream("/data/input");

注意: textFileStream每个batch只会探测新产生的文件, 已有文件修改后, 不会
再被处理
以socket连接作为数据源读取数
每隔固定周期从socket上拉取数据, 放入内
存形成RDD

JavaReceiverInputDStream<String> lines =
streamingContext.socketTextStream("master", 9999 )

以RDD队列方式创建数据源

streamingContext.queueStream(queueOfRDD)

kafka作为输入源
在公司的大数据生态系统中,可以把Kafka作为数据交换枢纽,不同类型的分布式系统(关系数据库、NoSQL数据库、流处理系统、批处理系统等),可以统一接入到Kafka,实现和Hadoop各个组件之间的不同类型数据的实时高效交换

安装文件为Kafka_2.11-0.10.2.0.tgz,前面的2.11就是该Kafka所支持的Scala版本号,后面的0.10.2.0是Kafka自身的版本号

对于Spark2.1.0版本,如果要使用Kafka,则需要下载spark-streaming-kafka-0-8_2.11相关jar包,下载地址:
http://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8_2.11/2.1.0

Spark Streaming提供了两种访问Kafka的API
Receiver-based Approach:最原始的API,启动若干特殊的Task(Receiver),从Kafka上拉取数据,保存成RDD供处

Direct Approach:新API,无需启动Receiver,每一轮Sparkjob直接从Kafka上读取数据

import java.util.*;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka010.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import scala.Tuple2;

Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);

Collection<String> topics = Arrays.asList("topicA", "topicB");

JavaInputDStream<ConsumerRecord<String, String>> stream =
  KafkaUtils.createDirectStream(
    streamingContext,
    LocationStrategies.PreferConsistent(),
    ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
  );

stream.mapToPair(record -> new Tuple2<>(record.key(), record.value()));

滑动窗口window
事先设定一个滑动窗口的长度(也就是窗口的持续时间)
设定滑动窗口的时间间隔(每隔多长时间执行一次计算),让窗口按照指定时间间隔在源DStream上滑动
每次窗口停放的位置上,都会有一部分Dstream(或者一部分RDD)被框入窗口内,形成一个小段的Dstream
可以启动对这个小段DStream的计算

  • window(windowLength, slideInterval) 基于源DStream产生的窗口化的批数据,计算得到一个新的Dstream
  • countByWindow(windowLength, slideInterval) 返回流中元素的一个滑动窗口数
  • reduceByWindow(func, windowLength, slideInterval) 返回一个单元素流。利用函数func聚集滑动时间间隔的流的元素创建这个单元素流。函数func必须满足结合律,从而可以支持并行计算
  • reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) 应用到一个(K,V)键值对组成的DStream上时,会返回一个由(K,V)键值对组成的新的DStream。每一个key的值均由给定的reduce函数(func函数)进行聚合计算。注意:在默认情况下,这个算子利用了Spark默认的并发任务数去分组。可以通过numTasks参数的设置来指定不同的任务数
  • reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) 更加高效的reduceByKeyAndWindow,每个窗口的reduce值,是基于先前窗口的reduce值进行增量计算得到的;它会对进入滑动窗口的新数据进行reduce操作,并对离开窗口的老数据进行“逆向reduce”操作。但是,只能用于“可逆reduce函数”,即那些reduce函数都有一个对应的“逆向reduce函数”(以InvFunc参数传入)
  • countByValueAndWindow(windowLength, slideInterval, [numTasks]) 当应用到一个(K,V)键值对组成的DStream上,返回一个由(K,V)键值对组成的新的DStream。每个key的值都是它们在滑动窗口中出现的频率

SparkStreaming 一定要做CheckPoint

JavaStreamingContext ssc = JavaStreamingContext.getOrCreate(checkPointDir, new
Function0<JavaStreamingContext>() {
@Override
public JavaStreamingContext call() throws Exception {
return createContext(checkPointDir, host, port);
}
});

updateStateByKey
需要在跨批次之间维护状态时,就必须使用updateStateByKey操作
mapWithState
由Spark Streaming自己维护历史状态信息,而不是借助外部存储系统, 比如redis
tweets.mapWithStates(tweet => updateMood(tweet))

JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>>
stateWordCounts =
wordPairs.mapWithState(StateSpec.function(new Function3<String,
Optional<Integer>, State<Integer>, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> call(String word, Optional<Integer> one,
State<Integer> state) throws Exception {
Option<Integer> stateCount = state.getOption();
Integer sum = one.orElse(0);
if (stateCount.isDefined()) {
sum += stateCount.get();
}
state.update(sum);
return new Tuple2<String, Integer>(word, sum);
}
})); 

transform
将DStream的每个RDD转变为另外一个RDD
– 参数: transformFunc: RDD[T] => RDD[U]

words.transform(new Function<JavaRDD<String>, JavaRDD<String>>() {
@Override
public JavaRDD<String> call(JavaRDD<String> rdd) throws Exception {
return rdd.distinct();
}
});

输出流
将处理过的数据输出到外部系统,跟Spark SQL中的“action”一个概念
- print
- saveAsTextFiles
- saveAsObjectFiles
- saveAsHadoopFiles
- 自定义输出 (用得最多)– foreachRDD


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK