71

Apache Flink 1.4.0:Standalone集群模式实践

 6 years ago
source link: http://shiyanjun.cn/archives/1759.html?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Apache Flink 1.4.0:Standalone集群模式实践

Apache Flink是一个开源的流处理框架,提供了分布式的、高性能的、高可用的特性,同时能够为流式应用程序提供多种编程语言的API,更多有关Flink的内容,不再累述,请参考官方文档,本文主要以Flink Standalone集群安装配置、编程实践为主。

Flink集群安装配置

首先,选择一主二从共3个节点来安装配置Flink Standalone集群:

Master:ali-bj01-tst-cluster-001.xiweiai.cn
Worker:ali-bj01-tst-cluster-002.xiweiai.cn
Worker:ali-bj01-tst-cluster-003.xiweiai.cn

为了方便安装文件远程拷贝,单独创建一个hadoop用户,并打通从Master节点使用ssh到Worker节点之间的免密码登录。
在Master节点上下载、准备Flink 1.4.0安装文件,执行如下命令:

wget http://mirror.bit.edu.cn/apache/flink/flink-1.4.0/flink-1.4.0-bin-hadoop26-scala_2.11.tgz
tar xvzf flink-1.4.0-bin-hadoop26-scala_2.11.tgz

接着,修改Flink配置文件flink-1.4.0/conf/flink-conf.yaml,修改后的内容如下所示:

env.java.home: /usr/local/java/jdk1.8.0_131/
jobmanager.rpc.address: ali-bj01-tst-cluster-001.xiweiai.cn
# The heap size for the JobManager JVM
jobmanager.heap.mb: 2048
# The heap size for the TaskManager JVM
taskmanager.heap.mb: 2048

通过设置env.java.home属性,指定JAVA_HOME环境变量的值;jobmanager.rpc.address表示Flink集群的JobManager的RPC地址;设置jobmanager.heap.mb和taskmanager.heap.mb分别指定JobManager和TaskManager的堆内存大小,这个需要根据实际节点的资源情况进行配置,这里每个Worker节点都配置为2G大小。
然后,修改conf/slaves文件,增加Worker节点的主机名或IP列表,内容如下所示:

ali-bj01-tst-cluster-002.xiweiai.cn
ali-bj01-tst-cluster-003.xiweiai.cn

最后,将配置完成的Flink安装文件,远程拷贝到各个Worker节点:

scp -r /mnt/bd/data/flink/flink-1.4.0 ali-bj01-tst-cluster-002.xiweiai.cn:/mnt/bd/data/flink/
scp -r /mnt/bd/data/flink/flink-1.4.0 ali-bj01-tst-cluster-003.xiweiai.cn:/mnt/bd/data/flink/

启动集群

启动Flink集群,执行如下命令:

bin/start-cluster.sh

它会在Master节点上启动JobManager进程,然后ssh到Worker节点上启动各个TaskManager进程。
当然可选地,也可以单独启动JobManager和TaskManager,在Master节点上启动JobManager:

bin/jobmanager.sh start

在各个Worker节点上启动TaskManager:

bin/taskmanager.sh start

启动完成后,可以查看对应的日志,验证Flink集群是否启动成功:

tail -100f log/flink-hadoop-jobmanager-0-ali-bj01-tst-cluster-001.xiweiai.cn.log
tail -100f log/flink-hadoop-taskmanager-0-ali-bj01-tst-cluster-002.xiweiai.cn.log
tail -100f log/flink-hadoop-taskmanager-0-ali-bj01-tst-cluster-003.xiweiai.cn.log

或者,查看Flink Web Dashboard UI来确认,通过浏览器打开链接:http://ali-bj01-tst-cluster-001.xiweiai.cn:8081/,如下图所示:

Flink-Web-Dashboard
可见,TaskManager都已经注册到JobManager,整个Flink集群具有了计算资源(Task Slot),集群启动成功。

创建Flink项目

通过Flink官方文档,可以看到如何创建quickstart项目,假设我们使用Scala来编写Flink程序,默认使用sbt来管理依赖和程序构建,可以执行如下命令来创建:

bash <(curl https://flink.apache.org/q/sbt-quickstart.sh)

可以根据向导提示,一步一步进行(示例是我们创建的flink-demo项目):

% Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
Dload  Upload   Total   Spent    Left  Speed
100 11354  100 11354    0     0   5393      0  0:00:02  0:00:02 --:--:--  5474
This script creates a Flink project using Scala and SBT.
Project name (Flink Project): flink-demo
Organization (org.example): org.shirdrn.flink
Version (0.1-SNAPSHOT): 0.0.1-SNAPSHOT
Scala version (2.11.7): 2.11.7
Flink version (1.3.2): 1.4.0
-----------------------------------------------
Project Name: flink-demo
Organization: org.shirdrn.flink
Version: 0.0.1-SNAPSHOT
Scala version: 2.11.7
Flink version: 1.4.0
-----------------------------------------------
Create Project? (Y/n): Y
Creating Flink project under flink-demo

基于该Demo项目,就可以开发我们自己的Flink程序了。
下面,我们分别选择使用Flink提供的API来开发批量数据处理程序、流式数据处理程序、基于窗口(Windowing)的数据处理,来实现通过Flink API进行编程并提交运行。

批量数据处理

我们基于Flink发行包自带的example代码,修改WordCount程序为HdfsWordCount,支持从HDFS读取文件,然后将计算结果写入到HDFS文件,非常简单,代码如下所示:

import org.apache.flink.api.scala._
object HdfsWordCount {
def main(args: Array[String]) {
if(args.length != 2) {
println("Parameter error!")
println("Usage: org.shirdrn.flink.HdfsWordCount <inputFile> <outputFile>")
System.exit(-1)
}
val inputFile = args(0)
val outputFile = args(1)
// set up the execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
// get input data
val text = env.readTextFile(inputFile)
val counts = text.flatMap { _.toLowerCase.split("\\W+") }
.map { (_, 1) }
.groupBy(0)
.sum(1)
// execute and save result
counts
.map(t => Seq(t._1, t._2).mkString("\t"))
.writeAsText(outputFile)
val jobName = getClass.getSimpleName
env.execute(jobName)
}
}

上面代码中,env.readTextFile(inputFile)返回了DataSet[String],表示Flink中对String类型数据元素的数据集的抽象;写入HDFS,调用DataSet的writeAsText方法,传递一个HDFS路径即可。
构建打包程序,执行如下命令:

sbt clean assembly

可以看到,生成了文件target/scala-2.11/flink-demo-assembly-0.0.1-SNAPSHOT.jar,可以将其提交到Flink集群上运行。有两种方式,一种方式是,通过Flink Web Dashboard,在页面上上传上述jar文件,然后配置输入的参数值,不做过多说明;另一种方式是,通过命令行的方式提交,这种方式比较方便,对应的提交运行命令,如下所示:

bin/flink run --class org.shirdrn.flink.HdfsWordCount flink-demo-assembly-0.0.1-SNAPSHOT.jar hdfs://ali-bj01-tst-cluster-001.xiweiai.cn:8020/data/beam/input/Notre-Dame-de-Paris.txt hdfs://ali-bj01-tst-cluster-001.xiweiai.cn:8020/user/hadoop/count_result.txt

输入和输出文件,都要加上HDFS前缀: hdfs://ali-bj01-tst-cluster-001.xiweiai.cn:8020,指明HDFS集群的地址。
提交运行后,可以在Flink Web Dashboard看到对应的Job运行情况,可以通过命令查看输出结果文件的内容:

hdfs dfs -cat /user/hadoop/count_result.txt

流式数据处理

示例中,我们打算从一个具有3节点的Kakfa集群的topic中读取数据,数据来自网站用户浏览的Apache Server的日志,我们需要将日志行中有用的信息抽取出来,然后再写入到一个新的Kakfa topic中,供下游的其它程序处理,比如可以使用的场景有:实时推荐、实时监控,等等。
首先,创建2个Kafka topic,执行如下命令行:

kafka-topics --zookeeper 172.16.117.63:2181,172.16.117.64:2181,172.16.117.65:2181 --partitions 3 --replication-factor 2 --create --topic raw_apache_log_event
kafka-topics --zookeeper 172.16.117.63:2181,172.16.117.64:2181,172.16.117.65:2181 --partitions 3 --replication-factor 2 --create --topic etld_user_behavior_event

raw_apache_log_event存储原始日志记录,etld_user_behavior_event存储经过抽取、过滤后的记录。
然后,基于上面创建的flink-demo项目,在build.sbt中添加Flink Streaming及其Kakfa connector依赖,如下所示:

val flinkDependencies = Seq(
"org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-connector-kafka-0.9" % flinkVersion)

接着,可以继续开发代码,实现我们计划的逻辑,代码如下所示:

package org.shirdrn.flink
import java.util.regex.Pattern
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer09, FlinkKafkaProducer09}
/**
* Read raw events from a Kafka topic, and then write ETLed events to another Kafka topic.
*/
object KafkaStreamingETL {
private val LINE_PATTERN = Pattern.compile(
"([^\\s]+)\\s\\-\\s\\-\\s\\[([^\\]]+)\\]\\s\"([^\"]+)\"\\s([^\\s]+)\\s\\-\\s\"([^\"]+).*(\\d+)$"
);
private def checkParams(params: ParameterTool) = {
if (params.getNumberOfParameters < 5) {
println("Missing parameters!\n"
+ "Usage: Kafka "
+ "--raw-topic <raw_topic> "
+ "--etled-topic <etled_topic> "
+ "--bootstrap.servers <kafka brokers> "
+ "--zookeeper.connect <zk quorum> "
+ "--group.id <group id>")
System.exit(-1)
}
}
def main(args: Array[String]): Unit = {
// parse & check input arguments
val params = ParameterTool.fromArgs(args)
checkParams(params)
val env = StreamExecutionEnvironment.getExecutionEnvironment
configureEnv(params, env)
// create a Kafka streaming source consumer for Kafka 0.9.x
val kafkaConsumer = new FlinkKafkaConsumer09(
params.getRequired("raw-topic"),
new SimpleStringSchema, params.getProperties
)
// process streaming events
val messageStream = env
.addSource(kafkaConsumer)
.map(parseEvent(_))
.filter(!_.isEmpty)
// create a Kafka producer for Kafka 0.9.x
val kafkaProducer = new FlinkKafkaProducer09(
params.getRequired("etled-topic"),
new SimpleStringSchema, params.getProperties
)
// write data into Kafka
messageStream.addSink(kafkaProducer)
env.execute("KafkaStreamingETL")
}
private def configureEnv(params: ParameterTool, env: StreamExecutionEnvironment) = {
env.getConfig.disableSysoutLogging
val restartAttempts = 3
val delayBetweenAttemptsMills = 30000
env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(
restartAttempts, delayBetweenAttemptsMills))
// create a checkpoint every 5 seconds
val checkpointIntervalMillis = 1000
env.enableCheckpointing(checkpointIntervalMillis)
// make parameters available in the web interface
env.getConfig.setGlobalJobParameters(params)
}
// processed event line example:
// 123.150.182.192    28/Dec/2017:01:05:21    /wp-content/plugins/akismet/_inc/form.js?ver=3.0.1    304    http://myhost.com/archives/30025.html    1
def parseEvent(msg: String): String = {
val m = LINE_PATTERN.matcher(msg)
if(m.find()) {
val ip = m.group(1)
val dt = m.group(2)
val resource = m.group(3)
val responseCode = m.group(4)
val pageUrl = m.group(5)
val timeTaken = m.group(6)
Seq(
ip, dt.split("\\s+")(0),
resource.split("\\s+")(1),
responseCode, pageUrl, timeTaken
).mkString("\t")
} else {
""
}
}
}

上面代码中,env.addSource(kafkaConsumer)返回了DataStream[T],后续map和filter操作都作用在该DataStream[T]上,DataStream[T]是Flink对流式数据集的抽象表示。
构建打包后,提交到Flink集群上运行,执行如下命令:

bin/flink run --parallelism 3 --class org.shirdrn.flink.KafkaStreamingETL flink-demo-assembly-0.0.1-SNAPSHOT.jar \
--raw-topic raw_apache_log_event \
--etled-topic etld_user_behavior_event \
--zookeeper.connect 172.16.117.63:2181,172.16.117.64:2181,172.16.117.65:2181 \
--bootstrap.servers ali-bj01-tst-cluster-002.xiweiai.cn:9092,ali-bj01-tst-cluster-003.xiweiai.cn:9092,ali-bj01-tst-cluster-004.xiweiai.cn:9092 \
--group.id G_XW_WLD_ETL

我们可以在Flink Web Dashboard上查看提交的KafkaStreamingETL的运行状况,如下图所示:

Flink-KafkaStreamingETL
也可以通过Kakfa自带的topic消费脚本工具,确认Streaming程序处理结果:
kafka-console-consumer --zookeeper 172.16.117.63:2181,172.16.117.64:2181,172.16.117.65:2181 --topic raw_apache_log_event --from-beginning

基于窗口操作(Windowing)的数据处理

我们基于前面经过ETL解析处理的日志事件记录,进行实时Windowing操作。从Kafka中读取数据,然后经过对IP进行Windowing和统计,结果输出到另一个新的Kakfa topic中,实现代码,如下所示:

object WindowedUserBrowseringAnalytics {
def checkParams(params: ParameterTool) = {
if (params.getNumberOfParameters < 6) {
println("Missing parameters!\n"
+ "Usage: Kafka "
+ "--etled-topic <etled_topic> "
+ "--windowed-topic <windowed_topic> "
+ "--bootstrap.servers <kafka brokers> "
+ "--zookeeper.connect <zk quorum> "
+ "--group.id <group id> "
+ "--window-size <window_size>")
System.exit(-1)
}
}
def main(args: Array[String]): Unit = {
// parse & check input arguments
val params = ParameterTool.fromArgs(args)
checkParams(params)
val env = StreamExecutionEnvironment.getExecutionEnvironment
configureEnv(params, env)
val windowSize = params.getRequired("window-size").toLong
val kafkaConsumer = new FlinkKafkaConsumer09(
params.getRequired("etled-topic"),
new SimpleStringSchema, params.getProperties
)
val kafkaProducer = new FlinkKafkaProducer09(
params.getRequired("windowed-topic"),
new SimpleStringSchema, params.getProperties
)
val stream: DataStream[String] = env.addSource(kafkaConsumer)
stream
.map(event => (event.split("\t")(0), 1L))
.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.seconds(windowSize)))
.reduce((event1, event2) => (event1._1, event1._2 + event2._2))
.map(r => {
val df = new SimpleDateFormat("yyyy-MM-dd:HH:mm")
Seq(df.format(new Date()), r._1, r._2).mkString("\t")
})
.addSink(kafkaProducer)
env.execute(getClass.getSimpleName)
}
}

构建打包,提交到Flink集群运行:

bin/flink run --parallelism 2 --class org.shirdrn.flink.WindowedUserBrowseringAnalytics flink-demo-assembly-0.0.1-SNAPSHOT.jar \
--etled-topic etld_user_behavior_event \
--windowed-topic windowed_analytics_result \
--zookeeper.connect 172.16.117.63:2181,172.16.117.64:2181,172.16.117.65:2181 \
--bootstrap.servers ali-bj01-tst-cluster-002.xiweiai.cn:9092,ali-bj01-tst-cluster-003.xiweiai.cn:9092,ali-bj01-tst-cluster-004.xiweiai.cn:9092 \
--window-size 60 \
--group.id G_XW_MON_ALTS

可以通过前面类似的方式,查看Job运行状况和结果。

参考链接

本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系

Post navigation

评论(4): “Apache Flink 1.4.0:Standalone集群模式实践”

  1. 博主好!我这两天也在尝试搭建Flink环境,根据官网教程(http://doc.flink-china.org/latest/quickstart/setup_quickstart.html#setup-download-and-start-flink)走的,用examples/streaming/SocketWindowWordCount.jar进行测试时,查看taskmanager的.out文件中有word的临时统计结果,查看jobmanager的.out文件中没有任何输出。。。菜鸟求教博主答疑解惑。。。

  2. 请问一下flink 的集群模式中一定要用到hdfs 或者nfs 这种嚒?不能都不依赖么?

    • 可以不依赖HDFS, 单建一个Standalone模式的Flink集群,但是要保证你选择的文件系统,必须是Flink支持的。

  3. 你好,想问下RunningJobs里的jobName可以设置吗?

发表评论 取消回复

电子邮件地址不会被公开。 必填项已用*标注

姓名 *

电子邮件 *

站点

评论


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK