35

实战 | 数据湖中的流式数据摄取之DeltaSteamer

 3 years ago
source link: http://mp.weixin.qq.com/s?__biz=MzI0NTIxNzE1Ng%3D%3D&%3Bmid=2651219114&%3Bidx=1&%3Bsn=dcbd3a711556261dd2974ed67164f029
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

一、首先从 https://github.com/apache/incubator-hudi.git 将hudi clone到自己本地idea 使用clean install -DskipTests -DskipITs -Dcheckstyle.skip=true进行编译 注意: 1、目前hudi使用的是hadoop2.7.3版本,如果使用hadoop3.x版本,请修改pom重新编译

<hadoop.version>3.0.0</hadoop.version>

2、可以跳过集成测试模块

<modules>
    <module>hudi-common</module>
    <module>hudi-cli</module>
    <module>hudi-client</module>
    <module>hudi-hadoop-mr</module>
    <module>hudi-hive-sync</module>
    <module>hudi-spark</module>
    <module>hudi-timeline-service</module>
    <module>hudi-utilities</module>
    <module>packaging/hudi-hadoop-mr-bundle</module>
    <module>packaging/hudi-hive-sync-bundle</module>
    <module>packaging/hudi-spark-bundle</module>
    <module>packaging/hudi-presto-bundle</module>
    <module>packaging/hudi-utilities-bundle</module>
    <module>packaging/hudi-timeline-server-bundle</module>
    <module>docker/hoodie/hadoop</module>
    <!--<module>hudi-integ-test</module>-->  <!--将此注释即可-->
  </modules>

3、编译过后会得到hudi-utilities-bundle_2.11-0.6.0-SNAPSHOT.jar和hudi-hadoop-mr-bundle-0.6.0-SNAPSHOT.jar两个我们所需要的jar包

二、使用hudi自带的DeltaStreamer工具写数据到hudi,开启--enable-hive-sync 即可同步数据到hive表

DeltaStreamer启动命令

spark-submit --master yarn   \      
  --driver-memory 1G \
  --num-executors 2 \
  --executor-memory 1G \
  --executor-cores 4 \
  --deploy-mode cluster \
  --conf spark.yarn.executor.memoryOverhead=512 \
  --conf spark.yarn.driver.memoryOverhead=512 \
  --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls /.../hudi-utilities-bundle_2.11-0.5.2-SNAPSHOT.jar` \
  --props hdfs://../kafka.properties \ '启动delateStreamer所需要的配置文件'
  --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
  --source-class org.apache.hudi.utilities.sources.JsonKafkaSource \  '这里我选择从kafka消费json数据格式'
  --target-base-path hdfs://../business \ 'hudi数据存储地址'
  --op UPSERT \
  --target-table business  \    '这里其实并不是hive表的名称,实际表名是在kafka.properties中配置'
  --enable-hive-sync \          '开启同步至hive'
  --table-type MERGE_ON_READ \
  --source-ordering-field create_time \
  --source-limit 5000000

kafka.properties配置实例

hoodie.upsert.shuffle.parallelism=2
hoodie.insert.shuffle.parallelism=2
hoodie.bulkinsert.shuffle.parallelism=2
hoodie.datasource.write.recordkey.field=uuid
hoodie.datasource.write.partitionpath.field=create_time
hoodie.datasource.write.precombine.field=update_time
hoodie.deltastreamer.schemaprovider.source.schema.file=hdfs://../t_business.avsc
hoodie.deltastreamer.schemaprovider.target.schema.file=hdfs://../t3_trip.t_business.avsc
hoodie.deltastreamer.source.kafka.topic=t_business_topic
group.id=t_business_group
bootstrap.servers=localhost
auto.offset.reset=latest
hoodie.parquet.max.file.size=134217728
hoodie.datasource.write.keygenerator.class=org.apache.hudi.utilities.keygen.TimestampBasedKeyGenerator
hoodie.deltastreamer.keygen.timebased.timestamp.type=DATE_STRING
hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd HH:mm:ss
hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd
hoodie.datasource.hive_sync.database=dwd
hoodie.datasource.hive_sync.table=test
hoodie.datasource.hive_sync.username=用户名
hoodie.datasource.hive_sync.password=密码
hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://.....
hoodie.datasource.hive_sync.partition_fields=分区字段

三、查询

Hudi支持以下存储数据的视图

  • 读优化视图 : 在此视图上的查询将查看给定提交或压缩操作中数据集的最新快照。该视图仅将最新parquet文件暴露给查询,所以它有可能看不到最新的数据,并保证与非Hudi列式数据集相比,具有相同的列式查询性能

  • 增量视图 : 对该视图的查询只能看到从某个提交/压缩后写入数据集的新数据。该视图有效地提供了更改流,来支持增量数据管道。

  • 实时视图 : 在此视图上的查询将查看某个增量提交操作中数据集的最新快照。该视图通过动态合并最新的基本文件和增量文件来提供近实时数据集。

MOR模式

如果使用MOR模式写入数据会在hive的dwd库下面生成两张表。分别是test_ro 和 test_rt test_rt表支持:快照视图和增量视图查询 test_ro表支持:读优化视图查询

使用saprk查询

spark-shell --master yarn \
--driver-memory 1G \
--num-executors 1 \
--executor-memory 1G \
--executor-cores 1 \
--jars /home/t3cx/apps/hudi/hudi-spark-bundle_2.11-0.5.2-SNAPSHOT.jar \
--conf spark.sql.hive.convertMetastoreParquet=false   '在进行快照视图查询的时候需要添加此配置'

#快照视图
spark.sql("select count(*) from dwd.test_rt").show()
#读优化视图
spark.sql("select count(*) from dwd.test_ro").show()
#增量视图
spark sql暂不支持

使用hive查询

beeline -u jdbc:hive2://.. -n ..  -p ..  \
  --hiveconf hive.stats.autogather=false \
  
 #读优化查询
 select * from dwd.test_ro;
 #快照查询
 select * from dwd.test_rt;
 #增量查询
 set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
 set hoodie.test.consume.mode=INCREMENTAL;
 set hoodie.test.consume.max.commits=3;
 set hoodie.test.consume.start.timestamp=20200427114546;
 select count(*) from  dwd.test_rt where `_hoodie_commit_time` > '20200427114546';
 
 #注意:
#1、hudi中parquet做了shaded,我在测试中发现(CDH6.3.0)下必须加载hudi-hadoop-mr中的parquet-avro包才行,cloudera用户需要必须要重新安装mr所需要的jar
#2、set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat 最好显示设置,否则有可能在某种情况下无法加载到hive.input.formate,即便在create-table的时候已经指定

COW模式

如果使用COW模式写入数据,会在hive的dwd库下面生成一张表,test test表支持:快照视图和增量视图

使用spark查询

spark-shell --master yarn \
--driver-memory 1G \
--num-executors 1 \
--executor-memory 1G \
--executor-cores 1 \
--jars /home/t3cx/apps/hudi/hudi-spark-bundle_2.11-0.5.2-SNAPSHOT.jar \
--conf spark.sql.hive.convertMetastoreParquet=false

#快照视图
spark.sql("select count(*) from dwd.test").show()
//增量视图 无需遍历全部数据,即可获取时间大于20200426140637的数据
import org.apache.hudi.DataSourceReadOptions
val hoodieIncViewDF = spark.read.format("org.apache.hudi").option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL).option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "20200426140637").load("hdfs://..../t3_trip_t_business15")
spark.sql("select count(*) from dwd.test_rt where _hoodie_commit_time>'20200426140637'").show()  

使用hive查询

beeline -u jdbc:hive2://... -n ..  -p ..  \
  --hiveconf hive.stats.autogather=false \
  
  #快照查询
  select count(*) from dwd.test;
  #增量查询
  set hoodie.test.consume.mode=INCREMENTAL;
  set hoodie.test.consume.max.commits=3;
  set hoodie.test.consume.start.timestamp=20200427114546;
  select count(*) from  dwd.test where `_hoodie_commit_time` > '20200427114546';

注意:我们需要把和hudi-hadoop-mr-bundle-0.6.0-SNAPSHOT.jar放在hive辅助路径和hive/lib包下面。 cloud era用户需要在yarn配置界面,配置辅助路径地址(不支持hdfs目录)

AbuMfeA.jpg!web

RnMbmia.jpg!web


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK