

第一个spark应用开发详解(java版)
source link: https://blog.51cto.com/zq2599/5569399
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

第一个spark应用开发详解(java版)
推荐 原创欢迎访问我的GitHub
这里分类和汇总了欣宸的全部原创(含配套源码): https://github.com/zq2599/blog_demos
- WordCount是大数据学习最好的入门demo,今天就一起开发java版本的WordCount,然后提交到Spark2.3.2环境运行;
- 操作系统:CentOS7;
- JDK:1.8.0_191;
- Spark:2.3.3;
- Scala:2.11.12;
- Hadoop:2.7.7;
- Maven:3.5.0;
关于hadoop环境
- 本次实战用到了hadoop的hdfs,关于hadoop的部署,请参考《Linux部署hadoop2.7.7集群》
关于spark环境
- 本次实战用到了spark2.3.3,关于spark集群的部署,请参考《部署spark2.2集群(standalone模式)》
请注意,由于2.3.3版本的spark-core的jar包不支持scala2.12,所以在部署spark的时候,scala版本请使用2.11;
关于本次实战开发的应用
- 本次实战开发的应用是经典的WorkCount,也就是指定一个文本文件,统计其中每个单词出现的次数,再取出现次数最多的10个,打印出来,并保存在hdfs文件中;
本次统计单词数用到的文本
- 本次用到的txt文件,是我在网上找到的pdf版本的《乱世佳人》英文版,然后导出为txt,读者您可以自行选择适合的txt文件来测试;
- 在hdfs服务所在的机器上执行以下命令,创建input文件夹:
~/hadoop-2.7.7/bin/hdfs dfs -mkdir /input
- 在hdfs服务所在的机器上执行以下命令,创建output文件夹:
~/hadoop-2.7.7/bin/hdfs dfs -mkdir /output
- 把本次用到的text文件上传到hdfs服务所在的机器,再执行以下命令将文本文件上传到hdfs的/input文件夹下:
~/hadoop-2.7.7/bin/hdfs dfs -put ~/GoneWiththeWind.txt /input
- 接下来详细讲述应用的编码过程,如果您不想自己写代码,也可以在GitHub下载完整的应用源码,地址和链接信息如下表所示:
名称 | 链接 | 备注 |
---|---|---|
项目主页 | https://github.com/zq2599/blog_demos | 该项目在GitHub上的主页 |
git仓库地址(https) | https://github.com/zq2599/blog_demos.git | 该项目源码的仓库地址,https协议 |
git仓库地址(ssh) | [email protected]:zq2599/blog_demos.git | 该项目源码的仓库地址,ssh协议 |
- 这个git项目中有多个文件夹,本章源码在sparkwordcount这个文件夹下,如下图红框所示:
- 基于maven创建一个java应用sparkwordcount,pom.xml的内容如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.bolingcavalry</groupId>
<artifactId>sparkwordcount</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.2</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/java</sourceDirectory>
<testSourceDirectory>src/test/java</testSourceDirectory>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<executable>java</executable>
<includeProjectDependencies>false</includeProjectDependencies>
<includePluginDependencies>false</includePluginDependencies>
<classpathScope>compile</classpathScope>
<mainClass>com.bolingcavalry.sparkwordcount.WordCount</mainClass>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
- 创建WrodCount类,关键代码位置都有注释,就不再赘述了:
package com.bolingcavalry.sparkwordcount;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
/**
* @Description: spark的WordCount实战
* @author: willzhao E-mail: [email protected]
* @date: 2019/2/8 17:21
*/
public class WordCount {
public static void main(String[] args) {
String hdfsHost = args[0];
String hdfsPort = args[1];
String textFileName = args[2];
SparkConf sparkConf = new SparkConf().setAppName("Spark WordCount Application (java)");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
String hdfsBasePath = "hdfs://" + hdfsHost + ":" + hdfsPort;
//文本文件的hdfs路径
String inputPath = hdfsBasePath + "/input/" + textFileName;
//输出结果文件的hdfs路径
String outputPath = hdfsBasePath + "/output/"
+ new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());
System.out.println("input path : " + inputPath);
System.out.println("output path : " + outputPath);
//导入文件
JavaRDD<String> textFile = javaSparkContext.textFile(inputPath);
JavaPairRDD<String, Integer> counts = textFile
//每一行都分割成单词,返回后组成一个大集合
.flatMap(s -> Arrays.asList(s.split(" ")).iterator())
//key是单词,value是1
.mapToPair(word -> new Tuple2<>(word, 1))
//基于key进行reduce,逻辑是将value累加
.reduceByKey((a, b) -> a + b);
//先将key和value倒过来,再按照key排序
JavaPairRDD<Integer, String> sorts = counts
//key和value颠倒,生成新的map
.mapToPair(tuple2 -> new Tuple2<>(tuple2._2(), tuple2._1()))
//按照key倒排序
.sortByKey(false);
//取前10个
List<Tuple2<Integer, String>> top10 = sorts.take(10);
//打印出来
for(Tuple2<Integer, String> tuple2 : top10){
System.out.println(tuple2._2() + "\t" + tuple2._1());
}
//分区合并成一个,再导出为一个txt保存在hdfs
javaSparkContext.parallelize(top10).coalesce(1).saveAsTextFile(outputPath);
//关闭context
javaSparkContext.close();
}
}
- 在pom.xml目录下执行以下命令,编译构建jar包:
mvn clean package -Dmaven.test.skip=true
- 构建成功后,在target目录下生成文件sparkwordcount-1.0-SNAPSHOT.jar,上传到spark服务器的**~/jars/**目录下;
- 假设spark服务器的IP地址为192.168.119.163,在spark服务器执行以下命令即可提交任务:
~/spark-2.3.2-bin-hadoop2.7/bin/spark-submit \
--master spark://192.168.119.163:7077 \
--class com.bolingcavalry.sparkwordcount.WordCount \
--executor-memory 512m \
--total-executor-cores 2 \
~/jars/sparkwordcount-1.0-SNAPSHOT.jar \
192.168.119.163 \
8020 \
GoneWiththeWind.txt
- 上述命令的最后三个参数,是java的main方法的入参,具体的使用请参照WordCount类的源码;
- 提交成功后立即开始执行任务,看到类似如下信息表示任务完成:
2019-02-08 21:26:04 INFO BlockManagerMaster:54 - BlockManagerMaster stopped
2019-02-08 21:26:04 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:54 - OutputCommitCoordinator stopped!
2019-02-08 21:26:04 INFO SparkContext:54 - Successfully stopped SparkContext
2019-02-08 21:26:04 INFO ShutdownHookManager:54 - Shutdown hook called
2019-02-08 21:26:04 INFO ShutdownHookManager:54 - Deleting directory /tmp/spark-c3e2ea9e-7daf-4cab-a207-26f0a0394017
2019-02-08 21:26:04 INFO ShutdownHookManager:54 - Deleting directory /tmp/spark-d60e4d75-4189-4f33-a5e2-fbe9b06bdae7
- 往前翻滚一下控制台输出的信息,如下所示,可以见到单词统计的前十名已经输出在控制台了:
2019-02-08 21:36:15 INFO DAGScheduler:54 - Job 1 finished: take at WordCount.java:61, took 0.313008 s
the 18264
and 14150
to 10020
of 8615
a 7571
her 7086
she 6217
was 5912
in 5751
had 4502
2019-02-08 21:36:15 INFO deprecation:1173 - mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir
2019-02-08 21:36:15 INFO FileOutputCommitter:108 - File Output Committer Algorithm version is 1
- 在hdfs服务器执行查看文件的命令,可见/output下新建了子目录20190208213610:
[hadoop@node0 ~]$ ~/hadoop-2.7.7/bin/hdfs dfs -ls /output
Found 1 items
drwxr-xr-x - hadoop supergroup 0 2019-02-08 21:36 /output/20190208213610
- 查看子目录,发现里面有两个文件:
[hadoop@node0 ~]$ ~/hadoop-2.7.7/bin/hdfs dfs -ls /output/20190208213610
Found 2 items
-rw-r--r-- 3 hadoop supergroup 0 2019-02-08 21:36 /output/20190208213610/_SUCCESS
-rw-r--r-- 3 hadoop supergroup 108 2019-02-08 21:36 /output/20190208213610/part-00000
- 上面看到的**/output/20190208213610/part-00000**就是输出结果,用cat命令查看其内容:
[hadoop@node0 ~]$ ~/hadoop-2.7.7/bin/hdfs dfs -cat /output/20190208213610/part-00000
(18264,the)
(14150,and)
(10020,to)
(8615,of)
(7571,a)
(7086,her)
(6217,she)
(5912,was)
(5751,in)
(4502,had)
- 可见与前面控制台输出的一致;
- 在spark的web页面,可见刚刚执行的任务信息:

- 至此,第一个spark应用的开发和运行就完成了,接下来的文章中,咱们一起来完成更多的spark实战;
欢迎关注51CTO博客:程序员欣宸
- 赞
- 收藏
- 评论
- 分享
- 举报
Recommend
-
98
【Win 10 应用开发】MIDI 音乐合成——乐理篇 针对 MIDI...
-
35
上一篇我们创建了第一个简单的flask应用程序,这一篇我们来看一下,这个最简单的应用程序都做了哪些事第一行代码,导入了flask类fromflaskimportFlask第二步创建了Flask类的实例app=Flask(__name__)这行代码里有一个参数name,这个参数用到告诉flask你的applicatio...
-
7
【spark】sql.functions详解 2019年01月26日 Author: Guofei 文章归类: 1-1-算法平台 ,文章编号: 155 版权声明:本文作者是郭飞。转载随意,但...
-
4
本文目录 一、Apache Spark 二、Spark SQL发展历程 三、Spark SQL底层执行原理 四、Catalyst 的两大优化 完整版传送门:
-
9
Apache Spark RDD and Java Streams A few months ago, I was fortunate enough to participate in a few PoCs (proof-of-concepts) that used Apache Spark. There, I got the chance to use resilient distributed datasets (RDDs for short), t...
-
9
java web开发(编写第一个servlet程序)
-
11
java web开发(第一个jsp工程)
-
6
java web开发(第一个spring程序)
-
8
java web开发(编写第一个spring mvc)
-
3
java web开发(IDEA安装 + 第一个java工程) ...
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK