15

Flink系列 - 实时数仓之Flink实时写入ClickHouse并实时大屏(四)

 3 years ago
source link: https://mp.weixin.qq.com/s?__biz=MzI0NTIxNzE1Ng%3D%3D&%3Bmid=2651220665&%3Bidx=1&%3Bsn=c1d07b1d29394ad81b224b759f21ac77
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

URzArm.gif!mobile

INRzaaj.png!mobile

分享嘉宾:lbship

编辑整理:仙子紫霞

出品平台: 数据仓库与Python大数据

正文

整体架构图

UfMZNrn.png!mobile

工具

Flink 1.11.2

Scala 2.11

Tableau 2020.2

一、模拟发送数据

新建一个类KafkaProducer用来模拟产生消费数据,代码如下:

package TopNitems


import java.text.SimpleDateFormat
import java.time.{LocalTime, ZonedDateTime}
import java.time.format.DateTimeFormatter
import java.util.{Date, Locale, Properties}


import scala.io.Source
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}


import Array._
import scala.util.Random.shuffle




object KafkaProducers {
def main(args: Array[String]): Unit = {
SendtoKafka("test")
}
def SendtoKafka(topic:String): Unit = {
val pro=new Properties()
pro.put("bootstrap.servers", "192.168.226.10:9092")
pro.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
pro.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer=new KafkaProducer[String,String](pro)
var member_id= range(1,10)
var goods=Array("Milk","Bread","Rice","Nodles","Cookies","Fish","Meat","Fruit","Drink","Books","Clothes","Toys")
//var ts=DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss",Locale.CHINA).format( ZonedDateTime.now())
while (true) {
var ts=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())
var msg = shuffle(member_id.toList).head + "\t" + shuffle(goods.toList).head + "\t" + ts+"\t"+"\n"
print(msg)
var record = new ProducerRecord[String, String](topic, msg)
producer.send(record)
Thread.sleep(2000)
}
//val source=Source.fromFile("C:\\UserBehavior.csv")
//for (line<-source.getLines()){
// val record=new ProducerRecord[String,String](topic,line)


//print(ts)
producer.close()


}


}

1.启动ZooKeeper

./zkServer.sh start

.2.启动Kafka

./kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

3.创建topic

./kafka-topics.sh --create --zookeeper 192.168.226.10:2181 --replication-factor 1 --partitions 1 --topic test

查看topic是否创建成功

./kafka-topics.sh --list --zookeeper 192.168.226.10:2181

4.在IDEA运行KafkaProducer,可以看到每隔2秒产生一个消费

M7re6rN.png!mobile

启动监听

./kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server 192.168.226.10:9092

测试成功,说明可以被消费

7RRVBbr.png!mobile

二、数据写入Clickhouse 

Clickhouse可以直接作为Kafka的Consumer,这个是官网介绍,格式这里查看,但是直接消费,没有ETL过程,我们还是用flink来消费,方便其他处理。

Flink 在 1.11.0 版本对其 JDBC connector 进行了一次较大的重构,包的名字也不一样:

二者对 Flink 中以不同方式写入 ClickHouse Sink 的支持情况如下:

API名称 flink-jdbc flink-connector-jdbc DataStream 不支持 支持 Table API (Legecy) 支持 不支持 Table API (DDL) 不支持 不支持

本次使用flink 1.11.2版本,所以采用的方式为flink-connector-jdbc+DataStream的方式写入数据到ClickHouse

先添加依赖

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>1.11.2</version>
</dependency>
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2.4</version>
</dependency>
<!-- 添加 Flink Table API 相关的依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

代码如下,这里采用jdbc的方式写入,每5条批量写入一次

package TopNitems


import java.sql.PreparedStatement
import java.text.SimpleDateFormat
import java.util.{Date, Properties}


import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcExecutionOptions, JdbcSink, JdbcStatementBuilder}
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.table.descriptors.Kafka




//当前版本的 flink-connector-jdbc,使用 Scala API 调用 JdbcSink 时会出现 lambda 函数的序列化问题。我们只能采用手动实现 interface 的方式来传入相关 JDBC Statement build 函数
class CkSinkBuilder extends JdbcStatementBuilder[(Int, String, String)] {
def accept(ps: PreparedStatement, v: (Int, String, String)): Unit = {
ps.setInt(1, v._1)
ps.setString(2, v._2)
ps.setString(3, v._3)
}
}


object To_CK {
def main(args: Array[String]): Unit = {


//获得环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1) //设置并发为1,防止打印控制台乱序
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //Flink 默认使用 ProcessingTime 处理,设置成event time
val tEnv = StreamTableEnvironment.create(env) //Table Env 环境
//从Kafka读取数据
val pros = new Properties()
pros.setProperty("bootstrap.servers", "192.168.226.10:9092")
pros.setProperty("group.id", "test")
pros.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
pros.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
pros.setProperty("auto.offset.reset", "latest")
import org.apache.flink.api.scala._
val dataSource = env.addSource(new FlinkKafkaConsumer[String]("test", new SimpleStringSchema(), pros))
val sql="insert into ChinaDW.testken(userid,items,create_date)values(?,?,?)"
val result = dataSource.map(line => {
val x = line.split("\t")
//print("收到数据",x(0),x(1),x(2),"\n")
val member_id = x(0).trim.toLong
val item = x(1).trim
val times = x(2).trim
var time = 0l
try{time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(times).getTime} //时间戳类型
catch {case e: Exception => {print( e.getMessage)}}
(member_id.toInt, item.toString ,time.toLong)
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(Int, String, Long)](Time.seconds(2)) {
override def extractTimestamp(t: (Int, String, Long)): Long = t._3
}).map(x=>{(x._1,x._2,new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(x._3))}) //时间还原成datetime类型
//result.print()
result.addSink(JdbcSink.sink[(Int,String,String)](sql,new CkSinkBuilder,new JdbcExecutionOptions.Builder().withBatchSize(5).build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:clickhouse://XX.XX.XX.XX:8123")
.withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
.withUsername("default")
.build()
))



env.execute("To_CK")
}

}

到Clickhouse查询,数据已经成功写入

Y3MbMjz.png!mobile

三、利用Tableau进行可视化

可视化环节就比较简单了,这里选择了Tableau连接Clickhouse,因为简单方便,下面这个图大概就用了2分钟就搞定了,这里要说明一下,tableau必须2020版本以上,不然连接clickhouse可能发生字段被截取的情况。。首先安装好clickhouse的ODBC驱动,我安装的是clickhouse-odbc-1.1.7-win64.msi,然后在控制面板设置好ODBC的连接,如图

u6RFJbi.png!mobile

然后tableau配置clickhouse的ODBC,具体可以百度一下 Tableau如何连接Clickhouse

ZJJjEbi.png!mobile

简单拖拉做成下面这个表,现在还剩一个问题,Tableau如何作为大屏,自动刷新?强大的tableau当然有解决方法:

方法一:发布到Tableau server,然后利用浏览器自带的网页刷新功能,例如QQ浏览器,网址加&: refresh=yes

方法二:安装Tableau拓展程序 ,到官网找到Auto Refresh这个插件,然后拖进去就可以直接用了,可以看到右下角有一个刷新的倒计时。

iI7v6zA.png!mobile

到此,整个项目结束了。

今天的分享就到这里,谢谢大家。

有用的话,文末分享、点赞、在看~

作者:lbship

链接:文末阅读原文

著作权归作者所有,本公众号取得独家授权。欢迎广大技术人员投稿,加v:iom1128,备注:投稿

AbuMfeA.jpg!mobile

RnMbmia.jpg!mobile


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK