32

Apache Hudi架构设计和基本概念

 4 years ago
source link: http://shiyanjun.cn/archives/2043.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Apache Hudi是一个Data Lakes的开源方案,Hudi是Hadoop Updates and Incrementals的简写,它是由Uber开发并开源的Data Lakes解决方案。Hudi具有如下基本特性/能力:

  • Hudi能够摄入(Ingest)和管理(Manage)基于HDFS之上的大型分析数据集,主要目的是高效的减少入库延时。
  • Hudi基于Spark来对HDFS上的数据进行更新、插入、删除等。
  • Hudi在HDFS数据集上提供如下流原语:插入更新(如何改变数据集);增量拉取(如何获取变更的数据)。
  • Hudi可以对HDFS上的parquet格式数据进行插入/更新操作。
  • Hudi通过自定义InputFormat与Hadoop生态系统(Spark、Hive、Parquet)集成。
  • Hudi通过Savepoint来实现数据恢复。
  • 目前,Hudi支持Spark 2.x版本,建议使用2.4.4+版本的Spark。

基本架构

与Kudu相比,Kudu是一个支持OLTP workload的数据存储系统,而Hudi的设计目标是基于Hadoop兼容的文件系统(如HDFS、S3等),重度依赖Spark的数据处理能力来实现增量处理和丰富的查询能力,Hudi支持Incremental Pulling而Kudu不支持。

Hudi能够整合Batch和Streaming处理的能力,这是通过利用Spark自身支持的基本能力来实现的。一个数据处理Pipeline通常由Source、Processing、Sink三个部分组成,Hudi可以作为Source、Sink,它把数据存储到分布式文件系统(如HDFS)中。

Apache Hudi在大数据应用场景中,所处的位置,如下图所示:

Hudi-Data-Lake-Overview.png

从上图中可见,Hudi能够与Hive、Spark、Presto这类处理引擎一起工作。Hudi有自己的数据表,通过将Hudi的Bundle整合进Hive、Spark、Presto等这类引擎中,使得这些引擎可以查询Hudi表数据,从而具备Hudi所提供的Snapshot Query、Incremental Query、Read Optimized Query的能力。

下面,先从Apache Hudi中提出的几个概念开始,来了解Hudi的设计:

Timeline

Hudi内部对每个表都维护了一个Timeline,这个Timeline是由一组作用在某个表上的Instant对象组成。Instant表示在某个时间点对表进行操作的,从而达到某一个状态的表示,所以Instant包含Instant Action,Instant Time和Instant State这三个内容,它们的含义如下所示:

  • Instant Action:对Hudi表执行的操作类型,目前包括COMMITS、CLEANS、DELTA_COMMIT、COMPACTION、ROLLBACK、SAVEPOINT这6种操作类型。
  • Instant Time:表示一个时间戳,这个时间戳必须是按照Instant Action开始执行的时间顺序单调递增的。
  • Instant State:表示在指定的时间点(Instant Time)对Hudi表执行操作(Instant Action)后,表所处的状态,目前包括REQUESTED(已调度但未初始化)、INFLIGHT(当前正在执行)、COMPLETED(操作执行完成)这3种状态。

下面,根据官网给出的一个例子来理解一下Timeline,如下图所示:

Hudi-Timeline.png

根据上图,说明如下:

  • 例子场景是,在10:00~10.20之间,要对一个Hudi表执行Upsert操作,操作的频率大约是5分钟执行一次。
  • 每次操作执行完成,会看到对应这个Hudi表的Timeline上,有一系列的COMMIT元数据生成。
  • 当满足一定条件时,会在指定的时刻对这些COMMIT进行CLEANS和COMPACTION操作,这两个操作都是在后台完成,其中在10:05之后执行了一次CLEANS操作,10:10之后执行了一次COMPACTION操作。

我们看到,从数据生成到最终到达Hudi系统,可能存在延迟,如图中数据大约在07:00、08:00、09:00时生成,数据到达大约延迟了分别3、2、1小时多,最终生成COMMIT的时间才是Upsert的时间。对于数据到达时间(Arrival Time)和事件时间(Event Time)相关的数据延迟性(Latency)和完整性(Completeness)的权衡,Hudi可以将数据Upsert到更早时间的Buckets或Folders下面。通过使用Timeline来管理,当增量查询10:00之后的最新数据时,可以非常高效的找到10:00之后发生过更新的文件,而不必根据延迟时间再去扫描更早时间的文件,比如这里,就不需要扫描7:00、8:00或9:00这些时刻对应的文件(Buckets)。

文件及索引

Hudi将表组织成HDFS上某个指定目录(basepath)下的目录结构,表被分成多个分区,分区是以目录的形式存在,每个目录下面会存在属于该分区的多个文件,类似Hive表,每个Hudi表分区通过一个分区路径(partitionpath)来唯一标识。在每个分区下面,通过文件分组(File Group)的方式来组织,每个分组对应一个唯一的文件ID。每个文件分组中包含多个文件分片(File Slice),每个文件分片包含一个Base文件(*.parquet),这个文件是在执行COMMIT/COMPACTION操作的时候生成的,同时还生成了几个日志文件(*.log.*),日志文件中包含了从该Base文件生成以后执行的插入/更新操作。

Hudi采用MVCC设计,当执行COMPACTION操作时,会合并日志文件和Base文件,生成新的文件分片。CLEANS操作会清理掉不用的/旧的文件分片,释放存储空间。

Hudi会通过记录Key与分区Path组成Hoodie Key,即Record Key+Partition Path,通过将Hoodie Key映射到前面提到的文件ID,具体其实是映射到file_group/file_id,这就是Hudi的索引。一旦记录的第一个版本被写入文件中,对应的Hoodie Key就不会再改变了。

Hudi表类型

Hudi具有两种类型的表:

  • Copy-On-Write表

使用专门的列式文件格式存储数据,例如Parquet格式。更新时保存多版本,并且在写的过程中通过异步的Merge来实现重写(Rewrite)数据文件。

Copy-On-Write表只包含列式格式的Base文件,每次执行COMMIT操作会生成新版本的Base文件,最终执行COMPACTION操作时还是会生成列式格式的Base文件。所以,Copy-On-Write表存在写放大的问题,因为每次有更新操作都会重写(Rewrite)整个Base文件。

通过官网给出的一个例子,来说明写入Copy-On-Write表,并进行查询操作的基本流程,如下图所示:

Hudi-Copy-On-Write-Table.png

上图中,每次执行INSERT或UPDATE操作,都会在Timeline上生成一个的COMMIT,同时对应着一个文件分片(File Slice)。如果是INSERT操作则生成文件分组的第一个新的文件分片,如果是UPDATE操作则会生成一个新版本的文件分片。

写入过程中可以进行查询,如果查询COMMIT为10:10之前的数据,则会首先查询Timeline上最新的COMMIT,通过过滤掉只会小于10:10的数据查询出来,即把文件ID为1、2、3且版本为10:05的文件分片查询出来。

  • Merge-On-Read表

使用列式和行式文件格式混合的方式来存储数据,列式文件格式比如Parquet,行式文件格式比如Avro。更新时写入到增量(Delta)文件中,之后通过同步或异步的COMPACTION操作,生成新版本的列式格式文件。

Merge-On-Read表存在列式格式的Base文件,也存在行式格式的增量(Delta)文件,新到达的更新都会写到增量日志文件中,根据实际情况进行COMPACTION操作来将增量文件合并到Base文件上。通常,需要有效的控制增量日志文件的大小,来平衡读放大和写放大的影响。

Merge-On-Read表可以支持Snapshot Query和Read Optimized Query,下面的例子展示了Merge-On-Read表读写的基本流程,如下图所示:

Hudi-Merge-On-Read-Table.png

上图中,每个文件分组都对应一个增量日志文件(Delta Log File)。COMPACTION操作在后台定时执行,会把对应的增量日志文件合并到文件分组的Base文件中,生成新版本的Base文件。

对于查询10:10之后的数据的Read Optimized Query,只能查询到10:05及其之前的数据,看不到之后的数据,查询结果只包含版本为10:05、文件ID为1、2、3的文件;但是Snapshot Query是可以查询到10:05之后的数据的。

Hudi查询类型

Hudi支持三种查询类型:

  • Snapshot Query

只能查询到给定COMMIT或COMPACTION后的最新快照数据。对于Copy-On-Write表,Snapshot Query能够查询到,已经存在的列式格式文件(Parquet文件);对于Merge-On-Read表,Snapshot Query能够查询到,通过合并已存在的Base文件和增量日志文件得到的数据。

  • Incremental Query

只能查询到最新写入Hudi表的数据,也就是给定的COMMIT/COMPACTION之后的最新数据。

  • Read Optimized Query

只能查询到给定的COMMIT/COMPACTION之前所限定范围的最新数据。也就是说,只能看到列式格式Base文件中的最新数据。

查询引擎支持能力矩阵

基于Hudi表和Hudi Bundle,外部的其他查询引擎可以非常方便的查询Hudi表,比如Hive、Spark SQL、Presto等。Hudi支持在Copy-On-Write表和Merge-On-Read表两种类型的表,同时支持基于Hudi表的Snapshot Query、Incremental Query、Read Optimized Query的能力。下面是Hudi支持的外部查询引擎支持查询的能力矩阵:

  • Copy-On-Write表

Hudi-Capability-Matrix-COW.jpg

  • Merge-On-Read表

Hudi-Capability-Matrix-MOR.jpg

参考链接

88x31.png

本文基于 署名-非商业性使用-相同方式共享 4.0 许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK