

(2)sparkstreaming滚动窗口和滑动窗口演示
source link: https://blog.51cto.com/u_14465598/5651197
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

(2)sparkstreaming滚动窗口和滑动窗口演示
精选 原创一、滚动窗口(Tumbling Windows) 滚动窗口有固定的大小,是一种对数据进行均匀切片的划分方式。窗口之间没有重叠,也不会有间隔,是“首尾相接”的状态。滚动窗口可以基于时间定义,也可以基于数据个数定义;需要的参数只有一个,就是窗口的大小(window size)。

在sparkstreaming中,滚动窗口需要设置窗口大小和滑动间隔,窗口大小和滑动间隔都是StreamingContext的间隔时间的倍数,同时窗口大小和滑动间隔相等,如:
.window(Seconds(10),Seconds(10)) 10秒的窗口大小和10秒的滑动大小,不存在重叠部分
import com.pojo.WaterSensor;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction2;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
/**
* Created by lj on 2022-07-12.
*/
public class SparkSql_Socket_Tumble {
private static String appName = "spark.streaming.demo";
private static String master = "local[*]";
private static String host = "localhost";
private static int port = 9999;
public static void main(String[] args) {
//初始化sparkConf
SparkConf sparkConf = new SparkConf().setMaster(master).setAppName(appName);
//获得JavaStreamingContext
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.minutes(1));
/**
* 设置日志的级别: 避免日志重复
*/
ssc.sparkContext().setLogLevel("ERROR");
//从socket源获取数据
JavaReceiverInputDStream<String> lines = ssc.socketTextStream(host, port);
JavaDStream<WaterSensor> mapDStream = lines.map(new Function<String, WaterSensor>() {
private static final long serialVersionUID = 1L;
public WaterSensor call(String s) throws Exception {
String[] cols = s.split(",");
WaterSensor waterSensor = new WaterSensor(cols[0], Long.parseLong(cols[1]), Integer.parseInt(cols[2]));
return waterSensor;
}
}).window(Durations.minutes(3), Durations.minutes(3)); //滚动窗口:需要设置窗口大小和滑动间隔,窗口大小和滑动间隔都是StreamingContext的间隔时间的倍数,同时窗口大小和滑动间隔相等。
mapDStream.foreachRDD(new VoidFunction2<JavaRDD<WaterSensor>, Time>() {
@Override
public void call(JavaRDD<WaterSensor> waterSensorJavaRDD, Time time) throws Exception {
SparkSession spark = JavaSparkSessionSingleton.getInstance(waterSensorJavaRDD.context().getConf());
Dataset<Row> dataFrame = spark.createDataFrame(waterSensorJavaRDD, WaterSensor.class);
// 创建临时表
dataFrame.createOrReplaceTempView("log");
Dataset<Row> result = spark.sql("select * from log");
System.out.println("========= " + time + "=========");
//输出前20条数据
result.show();
}
});
//开始作业
ssc.start();
try {
ssc.awaitTermination();
} catch (Exception e) {
e.printStackTrace();
} finally {
ssc.close();
}
}
}
代码中定义了一个3分钟的时间窗口和3分钟的滑动大小,运行结果可以看出数据没有出现重叠,实现了滚动窗口的效果:

二、滑动窗口(Sliding Windows)与滚动窗口类似,滑动窗口的大小也是固定的。区别在于,窗口之间并不是首尾相接的,而是可以“错开”一定的位置。如果看作一个窗口的运动,那么就像是向前小步“滑动”一样。定义滑动窗口的参数有两个:除去窗口大小(window size)之外,还有一个滑动步长(window slide),代表窗口计算的频率。

在sparkstreaming中,滑动窗口需要设置窗口大小和滑动间隔,窗口大小和滑动间隔都是StreamingContext的间隔时间的倍数,同时窗口大小和滑动间隔不相等,如:
.window(Seconds(10),Seconds(5)) 10秒的窗口大小和5秒的活动大小,存在重叠部分
import com.pojo.WaterSensor;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.*;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import java.util.ArrayList;
import java.util.List;
/**
* Created by lj on 2022-07-12.
*/
public class SparkSql_Socket {
private static String appName = "spark.streaming.demo";
private static String master = "local[*]";
private static String host = "localhost";
private static int port = 9999;
public static void main(String[] args) {
//初始化sparkConf
SparkConf sparkConf = new SparkConf().setMaster(master).setAppName(appName);
//获得JavaStreamingContext
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.minutes(1));
/**
* 设置日志的级别: 避免日志重复
*/
ssc.sparkContext().setLogLevel("ERROR");
//从socket源获取数据
JavaReceiverInputDStream<String> lines = ssc.socketTextStream(host, port);
JavaDStream<WaterSensor> mapDStream = lines.map(new Function<String, WaterSensor>() {
private static final long serialVersionUID = 1L;
public WaterSensor call(String s) throws Exception {
String[] cols = s.split(",");
WaterSensor waterSensor = new WaterSensor(cols[0], Long.parseLong(cols[1]), Integer.parseInt(cols[2]));
return waterSensor;
}
}).window(Durations.minutes(4), Durations.minutes(2)); //滑动窗口:指定窗口大小 和 滑动频率 必须是批处理时间的整数倍
mapDStream.foreachRDD(new VoidFunction2<JavaRDD<WaterSensor>, Time>() {
@Override
public void call(JavaRDD<WaterSensor> waterSensorJavaRDD, Time time) throws Exception {
SparkSession spark = JavaSparkSessionSingleton.getInstance(waterSensorJavaRDD.context().getConf());
Dataset<Row> dataFrame = spark.createDataFrame(waterSensorJavaRDD, WaterSensor.class);
// 创建临时表
dataFrame.createOrReplaceTempView("log");
Dataset<Row> result = spark.sql("select * from log");
System.out.println("========= " + time + "=========");
//输出前20条数据
result.show();
}
});
//开始作业
ssc.start();
try {
ssc.awaitTermination();
} catch (Exception e) {
e.printStackTrace();
} finally {
ssc.close();
}
}
}

数据演进过程解释:

Recommend
-
50
-
39
戳蓝字「TopCoder 」关注我们哦! ...
-
24
每日一句英语学习,每天进步一点点: 前言 前一篇「
-
20
你还在为TCP重传、滑动窗口、流量控制、拥塞控制发愁吗?看完图解就不愁了 相信大家都知道 TCP 是一个可靠传输的协议,那如何它是如何保证可靠的呢?今天,将重点介绍 TCP 的重传机制、滑动窗口、流量控制、拥塞控制。...
-
26
本文已收录至 Github《小白学算法》系列: https://github.com/vipstone/algorithm 这是一道比较基础的算法题,涉及到的数据结构也是我们之前讲过的,我这里先买一个关子...
-
14
滑动窗口算法思想 202...
-
13
特殊数据结构:单调队列 👆让天下没有难刷的算法!若 GitBook 访问太慢,可尝试
-
21
滑动窗口算法框架 👆让天下没有难刷的算法!若 GitBook 访问太慢,可尝试
-
25
剑指 Offer 57 - II. 和为s的连续正数序列 输入一个正整数 target ,输出所有和为 target 的连续正整数序列(至少含有两个数)。 序列内的数字由小到大排列,不同序列按照首个数字从小到...
-
16
题目-无重复字符的最长子串 链接: https://leetcode-cn.com/problems/longest-substring-without-repeating-characters/ 给定一个字符串,找出其中不含有重复字符的 最长子串 的长度。
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK