22

Apahce Hudi: 建立在Hadoop之上剑指数据湖的增量处理框架

 4 years ago
source link: http://mp.weixin.qq.com/s?__biz=MzI0NTIxNzE1Ng%3D%3D&%3Bmid=2651218378&%3Bidx=1&%3Bsn=aacb67162933d05ecdbdead99bb1c79a
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

随着Apache Parquet和Apache ORC等存储格式以及Presto和Apache Impala等查询引擎的发展,Hadoop生态系统有潜力作为面向分钟级延时场景的通用统一服务层。然而,为了实现这一点,这需要在HDFS中实现高效且低延迟的数据摄取及数据准备。

为了解决这个问题,优步开发了Hudi项目,这是一个增量处理框架,高效和低延迟地为所有业务关键数据链路提供有力支持。事实上,Uber已经将Hudi开源 - https://github.com/uber/hudi。在深入的了解Hudi之前,我们首先讨论一下为什么将Hadoop作为统一的服务层是一个不错的想法。

FBvUZzI.jpg!web

专注于分享Apache Hudi相关的 技术及源码解读。

动机

Lambda架构是一种常见的数据处理体系结构,它的数据的处理依赖流式计算层(Streaming Layer)和批处理计算层(Batch Layer)的双重计算。每隔几个小时,批处理过程被启动以计算精确的业务状态,并将批量更新加载到服务层(Serving Layer)。同时,为了消除上述几个小时的等待时间我们会在流式计算层对这个业务数据进行实时的状态更新。然而,这个流计算的状态只是一个最终结果的近似值,最终需要被批处理的计算结果所覆盖。由于两种模式提供的状态差异,我们需要为批处理和流处理提供不同的服务层,并在这个上面再做合并抽象,或者设计应用一个相当复杂的服务系统(如Druid),用于同时在行级更新和批量加载中提供优异表现。

n2Ub2ie.jpg!web

Lambda架构需要双重计算和双重服务

对于是否需要一个额外单独的批处理层,Kappa架构认为一个单独的流式计算层足以成为数据处理的通用解决方案。广义上,所有数据计算都可以描述为生产者生产一个数据流,而消费者不断的逐条迭代消费这个流中的记录,如火山模型(Volcano Iterator model)。这就意味着流式计算层可以依靠堆资源以增加并行能力的方式来对业务状态进行重算更新。这类系统可以依靠有效的检查点(checkpoint)和大量的状态管理来让流式处理的结果不再只是一个近似值。这个模型被应用于很多的数据摄取任务。尽管如此,虽然批处理层在这个模型中被去掉了,但是在服务层仍然存在两个问题。

如今很多流式处理引擎都支持行级的数据处理,这就要求我们的服务层也需要能够支持行级更新的能力。通常,这类系统并不能对分析类的查询扫描优化到这个地步,除非我们在内存中缓存大量记录(如Memsql)或者有强大的索引支持(如ElasticSearch)。这些系统为了获得数据摄取和扫描的性能往往需要增加成本和牺牲服务的可扩展性。出于这个原因,这类服务系统的数据驻留的能力往往是有限的,从时间上可能30~90天,从总量上来说几个TB的数据就是他们的极限了。对于历史数据的分析又会被重新定向到时延要求不那么高的HDFS上。

v6NZnma.jpg!web

Kappa架构统一了处理层,但服务复杂性仍然存在

对于数据摄取延时、扫描性能和计算资源和操作复杂性的权衡是无法避免的。但是如果我们的业务场景对时延的要求并不是那么的高,比如能接受10分钟左右的延迟,在我们如果可以在HDFS上快速的进行数据摄取和数据准备的基础上,服务层中的Speed Serving就不必要了。这么做可以统一服务层,大大降低系统整体的复杂度和资源消耗。

要将HDFS用作统一的服务层,我们不但需要使它支持存储变更日志(或者叫日志记录系统),而且需要支持根据实际业务维度来分区、压缩、去重的业务状态管理。这类统一服务层需具备如下几个特性:

  • 大型HDFS数据集的快速变更能力

  • 数据存储需要针对分析类扫描进行优化(列存)

  • 有效的连接和将更新传播到上层建模数据集的能力

被压缩的业务状态变更是无法避免的,即使我们以事件时间(Event time)作为业务分区字段。由于迟到数据和事件时间和处理时间(Processing time)的不一致,在数据摄取场景中我们依然需要对老的分区进行必要的更新操作。最后就算我们把处理时间作为分区字段,依然存在一些需要进行更新的场景,比如由于安全、审计方面的原因对原数据进行校正的需求。

Hudi简介: Hi, Hudi

作为一个增量处理框架,我们的Hudi支持前面章节中所述的所有需求。一言以蔽之,Hudi是一种针对分析型业务的、扫描优化的数据存储抽象,它能够使HDFS数据集在分钟级的时延内支持变更,也支持下游系统对这个数据集的增量处理。

Hudi数据集通过自定义的 InputFormat 兼容当前Hadoop生态系统,包括Apache Hive,Apache Parquet,Presto和Apache Spark,使得终端用户可以无缝的对接。

FZZZRvj.jpg!web

基于Hudi简化的服务架构,分钟级延时

该数据流模型通过时延和数据完整性保证两个维度去权衡以构建数据管道。下图所示的是Uber Engineering如何根据这两个维度进行处理方式的划分。

a6fiyqq.jpg!web

Uber在不同延迟和完整性级别上的用例分布

对于很少一些需要真正做到约1分钟的延时的用例及简单业务指标的展示应用,我们基于行级的流式处理。对于传统的机器学习和实验有效性分析用例,我们选择更加擅长较重计算的批处理。对于包含复杂连接或者重要数据处理的近实时场景,我们基于Hudi以及它的增量处理原语来获得两全其美的结果。想要了解Uber使用Hudi的更多用例和场景,可以去Apache文档(https://hudi.apache.org/use_cases.html)里面看一下。

Hudi数据集的存储

Hudi数据集的组织目录结构与Hive表示非常相似,一份数据集对应这一个根目录。数据集被打散为多个分区,分区字段以文件夹形式存在,该文件夹包含该分区的所有文件。在根目录下,每个分区都有唯一的分区路径。每个分区记录分布于多个文件中。每个文件都有惟一的 fileId 和生成文件的 commit 所标识。如果发生更新操作时,多个文件共享相同的fileId,但会有不同的 commit

每条记录由记录的key值进行标识并映射到一个fileId。一条记录的key与fileId之间的映射一旦在第一个版本写入该文件时就是永久确定的。换言之,一个fileId标识的是一组文件,每个文件包含一组特定的记录,不同文件之间的相同记录通过版本号区分。

Hudi Storage由三个不同部分组成:

  1. Metadata- 以时间轴(timeline)的形式将数据集上的各项操作元数据维护起来,以支持数据集的瞬态视图,这部分元数据存储于根目录下的元数据目录。一共有三种类型的元数据:

  • Commits - 一个单独的commit包含对数据集之上一批数据的一次原子写入操作的相关信息。我们用单调递增的时间戳来标识commits,标定的是一次写入操作的开始。

  • Cleans - 用于清除数据集中不再被查询所用到的旧版本文件的后台活动。

  • Compactions - 用于协调Hudi内部的数据结构差异的后台活动。例如,将更新操作由基于行存的日志文件归集到列存数据上。

Index- Hudi维护着一个索引,以支持在记录key存在情况下,将新记录的key快速映射到对应的fileId。索引的实现是插件式的,

  • Bloom filter - 存储于数据文件页脚。默认选项,不依赖外部系统实现。数据和索引始终保持一致。

  • Apache HBase - 可高效查找一小批key。在索引标记期间,此选项可能快几秒钟。

Data- Hudi以两种不同的存储格式存储所有摄取的数据。这块的设计也是插件式的,用户可选择满足下列条件的任意数据格式:

  • 读优化的列存格式(ROFormat)。缺省值为Apache Parquet

  • 写优化的行存格式(WOFormat)。缺省值为Apache Avro

rYzIBrf.jpg!web

Hudi存储内核

写Hudi文件

Compaction

Hudi对HDFS的使用模式进行了优化。Compaction是将数据从写优化格式转换为读优化格式的关键操作。Compaction操作的基本并行单位是对一个fileID的重写,Hudi保证所有的数据文件的大小和HDFS的块大小对齐,这样可以使Compaction操作的并行度、查询的并行度和HDFS文件总数间取得平衡。Compaction操作也是插件式的,可以扩展为合并不频繁更新的老的数据文件以进一步减少文件总数。

写入方式

Hudi是一个Spark的第三方库,以Spark Streaming的方式运行数据摄取作业,这些作业一般建议以1~2分钟左右的微批(micro-batch)进行处理。当然,在权衡自己业务在时延要求和资源层面的前提下,我们也可以用Apache Oozie或者Apache Airflow来进行离线作业周期性调度。

在默认配置下,Hudi使用以下写入路径:

  1. Hudi从相关的分区下的parquet文件中加载BloomFilter索引,并通过传入key值映射到对应的文件来标记是更新还是插入。此处的连接操作可能由于输入数据的大小,分区的分布或者单个分区下的文件数问题导致数据倾斜。通过对连接字段进行范围分区以及新建子分区的方式处理,以避免Spark某些低版本中处理Shuffle文件时的2GB限制的问题 - https://issues.apache.org/jira/browse/SPARK-6190。

  2. Hudi按分区对 insert 进行分组,分配一个fileId,然后对相应的日志文件进行append操作,知道文件大小达到HDSF块大小。然后,新的fileId生成,重复上述过程,直到所有的数据都被插入。

  • 一个有时间限制compaction操作会被后台以几分钟为周期调度起来,生成一个compactions的优先级列表,并压缩一个fileId包含的所有avro文件以生成进行当前parquet文件的下一个版本。

  • Compaction操作是异步的,锁定几个特定的日志版本进行压缩,并以新的日志记录更新到对应fileId中。锁维护在Zookeeper中。

  • Compaction操作的优先级顺序由被压缩的日志数据大小决定,并基于一个Compaction策略可配置。每一轮压缩迭代过程中,大文件优先被压缩,因为重写parquet文件的开销并不会根据文件的更新次数进行分摊。

Hudi在针对一个fileId进行更新操作时,如果对应的日志文件存在则append,反之,会新建日志文件。

如果数据摄取作业成功,一个 commit 记录会在Hudi的元数据时间轴中记录,即将inflight文件重命名为commit文件,并将分区和所创建fileId版本的详细信息记录下来。

HDFS块对齐

如上所述,Hudi会努力将文件大小和HDFS底层块大小对齐。取决于一个分区下数据的总量和列存的压缩效果,compaction操作依然能够创建parquet小文件。因为对分区的插入操作会是以对现有小文件的更新来进行的,所有这些小文件的问题最终会被一次次的迭代不断修正。最终,文件大小会不断增长直到与HDFS块大小一致。

故障恢复

首先,Spark的本身的重试机制会cover一些间歇性的异常,当然如果超过了重试次数的阈值,我们的整个作业都会失败。下一次的迭代作业会在同一批次数据上进行重试。以下列出两个重要的区别:

  1. 摄取失败可能在日志文件中生成包含部分数据的avro块 - 这个问题通过在 commit 元数据中存储对应数据块的起始偏移量和日志文件版本来解决。当读取日志文件时,偶尔发生的部分写入的数据块会被跳过,且会从正确的位置开始读取avro文件。

  2. Compaction过程失败会生产包含部分数据的parquet文件 - 这个问题在查询阶段被解决,通过 commit 元数据进行文件版本的过滤。查询阶段只会读取最新的完成的compaction后的文件。这些失败的compaction文件会在下一个compaction周期被回滚。

读取Hudi文件

commit 时间轴元数据可以让我们在同一份HDFS数据上同时享有读取优化的视图和实时视图。客户端可以基于延迟要求和查询性能决定使用哪种视图。Hudi以自定义的 InputFormat 和一个Hive注册模块来提供这两种视图,后者可以将这两种视图注册为Hive Metastore表。这两种输入格式都可以识别fileId和 commit 时间,可以筛选并读取最新提交的文件。然后,Hudi会基于这些数据文件生成输入分片供查询使用。

InputFormat 的具体信息如下:

  • HoodieReadOptimizedInputFormat - 提供扫描优化的视图,筛选所有的日志文件并获取最新版本的parquet压缩文件

  • HoodieRealtimeInputFormat - 提供一个实时的视图,除了会获取最新的parquet压缩文件之外,还提供一个 RecordReader 以合并与parquet文件相关的日志文件。

这两类 InputFormat 都扩展了 MapredParquetInputFormatVectorizedParquetRecordReader ,因此所有针对parquet文件的优化依然被保留。依赖于 hoodie-hadoop-mr 类库,Presto和Spark SQL可以对Hudi格式的Hive Metastore表做到开箱即用。

B7RbymZ.jpg!web

Hudi筛选出最新版本,在提供记录之前将他们与日志文件合并

增量处理

前面提到过,数据模型表需要在HDFS中处理和提供,才能使得HDFS算得上是一个统一的服务层。构建低延时的数据模型表需要能够链接HDFS数据集进行增量处理。由于Hudi在元数据中维护了每次提交的提交时间以及对应的文件版本,使得我们可以基于起始时间戳和结束时间戳从特定的Hudi数据集中提取增量的变更数据集。

这个过程基本上与普通的查询大致相同,只是选取特定时间范围内的文件版本进行读取而不是选最新的,提交时间会最为过滤条件被谓词下推到文件扫描阶段。这个增量结果集也收到文件自动清理的影响,如果某些时间范围内的文件被自动清理掉了,那自然也是不能被访问到了。

这样我们就可以基于watermark做双流join和流与静态数据的join以对存储在HDFS中的数据模型表计算和 upsert

RzeAvqV.jpg!web

基于Hudi增量计算的建模过程

参考文献

[1]https://eng.uber.com/hoodie/

[2]https://whatis.techtarget.com/definition/data-ingestion

[3]https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101

[4]https://github.com/uber/hudi

6fiyQvn.jpg!web


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK