0

Flink SQL Hudi 实战

 1 year ago
source link: https://blog.51cto.com/u_15318160/5523790
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Flink SQL Hudi 实战

原创

hyunbar777 2022-07-28 20:43:09 ©著作权

文章标签 flink sql hadoop 文章分类 其它 大数据 阅读数158

Flink SQL Hudi 实战_flink

官网:https://hudi.apache.org

gitee:https://gitee.com/apache/Hudi

1.1 架构

Flink SQL Hudi 实战_flink_02

1.2 特点

  • Upserts, Deletes with fast, pluggable indexing.
  • Incremental queries, Record level change streams
  • Transactions, Rollbacks, Concurrency Control.
  • SQL Read/Writes from Spark, Presto, Trino, Hive & more
  • Automatic file sizing, data clustering, compactions, cleaning.
  • Streaming ingestion, Built-in CDC sources & tools.
  • Built-in metadata tracking for scalable storage access.
  • Backwards compatible schema evolution and enforcement
Flink SQL Hudi 实战_sql_03

2、启动flink

taskmanager.numberOfTaskSlots: 4

配置HADOOP的环境变量

export HADOOP_CLASSPATH=`hadoop classpath`

2.1 启动本地集群(Standalone)

duo@bigdata100:/flink-1.14.4$ ./bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host bigdata100.
Starting taskexecutor daemon on host bigdata100.

duo@bigdata100:/flink-1.14.4$ jps
18226 Jps
15333 NameNode
16038 ResourceManager
16649 JobHistoryServer
17900 StandaloneSessionClusterEntrypoint
15756 SecondaryNameNode
16381 NodeManager
15534 DataNode
启动sql-client
duo@bigdata100:/bigdata/module/flink-1.14.4$ ./bin/sql-client.sh embedded -j /home/duo/hudi-flink-bundle_2.11-0.10.1.jar

2.2 启动yarn-session集群

依赖于hadoop存储,所以只能以yarn模式启动集群

duo@bigdata100:/bigdata/module/flink-1.14.4$ ./bin/yarn-session.sh -nm duo -d

duo@bigdata100:/bigdata/module/flink-1.14.4$ jps
15333 NameNode
16038 ResourceManager
25191 YarnSessionClusterEntrypoint
16649 JobHistoryServer
25290 Jps
15756 SecondaryNameNode
16381 NodeManager
15534 DataNode

duo@bigdata100:/bigdata/module/flink-1.14.4$ ./bin/sql-client.sh embedded -s yarn-session -j /home/duo/hudi-flink-bundle_2.11-0.10.1.jar

3.1 sql执行语句

-- 创建表CREATE TABLE student( uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED, name VARCHAR(10), age INT, ts TIMESTAMP(3), `partition` VARCHAR(20))PARTITIONED BY (`partition`)WITH ( 'connector' = 'hudi', 'path' = 'hdfs:///flink/hudi/student', 'table.type' = 'MERGE_ON_READ');-- 插入数据INSERT INTO student VALUES ('id1','Danny',23,TIMESTAMP '2022-07-01 12:12:12','par1'), ('id2','Stephen',33,TIMESTAMP '2022-07-01 12:12:02','par1'), ('id3','Julian',53,TIMESTAMP '2022-07-01 12:12:03','par2'), ('id4','Fabian',31,TIMESTAMP '2022-07-01 12:12:04','par2'), ('id5','Sophia',18,TIMESTAMP '2022-07-01 12:12:05','par3'), ('id6','Emma',20,TIMESTAMP '2022-07-01 12:12:06','par3'), ('id7','Bob',44,TIMESTAMP '2022-07-01 12:12:07','par4'), ('id8','Han',56,TIMESTAMP '2022-07-01 12:12:08','par4');-- 更新key='id1' 的数据insert into t1 values ('id1','Danny',27,TIMESTAMP '1970-01-01 00:00:01','par1');SELECT * FROM student;

3.2 创建表并插入数据

Flink SQL> set execution.result-mode=tableau;[INFO] Session property has been set.Flink SQL> CREATE TABLE student(> uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,> name VARCHAR(10),> age INT,> ts TIMESTAMP(3),> `partition` VARCHAR(20)> )> PARTITIONED BY (`partition`)> WITH (> 'connector' = 'hudi',> 'path' = 'hdfs:///flink/hudi/student',> 'table.type' = 'MERGE_ON_READ'> );[INFO] Execute statement succeed.Flink SQL> INSERT INTO student VALUES> ('id1','Danny',23,TIMESTAMP '2022-07-01 12:12:12','par1'),> ('id2','Stephen',33,TIMESTAMP '2022-07-01 12:12:02','par1'),> ('id3','Julian',53,TIMESTAMP '2022-07-01 12:12:03','par2'),> ('id4','Fabian',31,TIMESTAMP '2022-07-01 12:12:04','par2'),> ('id5','Sophia',18,TIMESTAMP '2022-07-01 12:12:05','par3'),> ('id6','Emma',20,TIMESTAMP '2022-07-01 12:12:06','par3'),> ('id7','Bob',44,TIMESTAMP '2022-07-01 12:12:07','par4'),> ('id8','Han',56,TIMESTAMP '2022-07-01 12:12:08','par4');[INFO] Submitting SQL update statement to the cluster...[INFO] SQL update statement has been successfully submitted to the cluster:Job ID: 2b4962486c1fbcff9e6354ab17801ae1Flink SQL> SELECT * FROM student;

3.3 查询数据

Flink SQL Hudi 实战_flink_04

3.4 查看yarn任务

Flink SQL Hudi 实战_hadoop_05

3.5 查看HDFS文件

Flink SQL Hudi 实战_flink_06

3.6 Flink WebUI 查看任务

Flink SQL Hudi 实战_hadoop_07
Flink SQL Hudi 实战_hadoop_08

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK