35

基于 Apache Iceberg 打造 T + 0 实时数仓

 3 years ago
source link: https://mp.weixin.qq.com/s/yOAPzbsAHukRZpfEOoXZ5g
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

导语

大数据处理技术现今已广泛应用于各个行业,为业务解决海量存储和海量分析的需求。但数据量的爆发式增长,对数据处理能力提出了更大的挑战,同时对时效性也提出了更高的要求。业务通常已不再满足滞后的分析结果,希望看到更实时的数据,从而在第一时间做出判断和决策。典型的场景如电商大促和金融风控等,基于延迟数据的分析结果已经失去了价值。

为了同时满足大数据量和高时效性的双重要求,实时数仓和在线交互式(ad-hoc)分析技术,及相应的基础组件应运而生,并快速发展。其中包括通用计算引擎(如Spark和Flink),交互式分析系统(如Presto,Druid和ClickHouse),数据湖框架(如Iceberg,Hudi和Delta Lake),和底层存储(如Ozone)。

本文主要介绍基于Iceberg的特性,通过Spark和Flink,如何打造T+0实时数仓,以及相应功能在Iceberg社区的进展。

离线和实时数仓

传统的离线数仓可以通过Hive加HDFS搭建。借助Hive成熟和稳定的能力,以及丰富的上下游生态,构造数据处理和分析平台。它通常遇到如下痛点:

流批混合的作业难以基于同一套基础组件搭建; 难以保证端到端的”有且仅有一次“和”强一致“的语义; 流批衔接,即流式数据落地,通常环节多,流程长,时效性差; 难以保证ACID事务和读写分离,导致下游出现脏读等错误;如果通过外部逻辑实现ACID事务和强一致性,会进一步加长整个流程; 已写入的数据很难修正,或者只能以数据文件甚至整个分区这种较大的粒度进行操作,费时费力; 数据落地和处理过程难以实现端到端的增量处理等。

针对上述离线数仓的痛点,随着流式计算引擎的发展,越来越多的公司引入实时数仓,或者实时和离线融合的数据分析平台,以求达到秒级的实时响应。

基于Iceberg打造实时数仓

Iceberg最近已经顺利毕业,晋升为Apache顶级项目。它作为新兴的数据湖框架之一,开创性的抽象出”表格式“(table format)这一中间层,既独立于上层的计算引擎(如Spark和Flink)和查询引擎(如Hive和Presto),也和下层的文件格式(如Parquet,ORC和Avro)相互解耦。

ry2q6b3.png!web

同时,Iceberg还提供了许多额外的能力:

ACID事务; 时间旅行(time travel),以访问之前版本的数据; 完备的自定义类型、分区方式和操作的抽象; 列和分区方式可以进化,而且进化对用户无感,即无需重新组织或变更数据文件; 隐式分区,使SQL不用针对分区方式特殊优化; 面向云存储的优化等;

上述的抽象和能力使得Iceberg在流批衔接和实时数仓中可以发挥核心作用。

总体框架

如下图所示:

AnIniiv.png!web

使用Flink流式处理引擎消费数据总线,借助ACID事务的能力强一致的导入Iceberg;读写分离使交互式查询引擎可以第一时间读取正确的数据;Row-level update和delete可以通过Spark对数据进行修正;增量消费使得已落地的数据可以进一步的返回流式处理引擎中,并只处理和向后传递变化的数据;Iceberg中的数据也可以同时被报表系统消费和进一步处理。

ACID事务

Iceberg实现了ACID事务机制,使得边写边读成为可能,从而数据可以更快的被下游消费到。ACID事务机制保证下游只能看到已commit的snapshot所包含的数据,而不用担心读到部分或者未commit的数据。业务因此可以省去大量的用于保证ACID事务和失败恢复的逻辑。

如上图所示,虚线框代表即将被生成的snapshot,其中包含新写入但尚未commit的数据;实线框表示已经被commit的snapshot,下游可以访问最新的snapshot(S3)或者之前的snapshot(如S2等)中的数据。由ACID事务衍生出的row-level update和delete的能力将在后文中介绍。

Flink写入和读取Iceberg

Flink应用从总线中消费流式数据后,可以通过Flink sink汇聚并落地Iceberg,后续也可以通过Flink source继续消费Iceberg中的数据。Flink的checkpoint机制和Iceberg的ACID事务特性是保证端到端的“有且仅有一次“语义的关键。

关于Flink sink写入Iceberg,可以分为相互解耦的两个层次来实现:

相对底层的DataStream实现:实现SinkFunction和checkpoint相关接口。接入DataStream中,即可实现落地Iceberg。 相对高层的Table和SQL实现:按照对insert、delele和update的不同要求,实现StreamTableSink相关接口(AppendStreamTableSource,UpsertStreamTableSource或RetractStreamTableSource),以支持Table和SQL等高级语义和操作。但最终的写入和commit操作还是通过上述的底层来实现。此处还需要处理Flink的schema以及类型到Iceberg的schema以及类型的转换和映射。

关于Flink source读取Iceberg,也可以分为相互解耦的两个层次来实现:

相对底层的DataStream实现:实现SourceFunctin和checkpoint相关接口。接入DataStream中,即可被Flink引擎驱动发送ScanTask来读取Iceberg中的数据。 相对高层的Table和SQL实现:实现StreamTableSource,并辅助以ProjectableTableSource以支持projection从Flink下推到Iceberg,以及FilterableTableSource以支持表达式下推到Iceberg,从而过滤不满足条件的行。

社区正在积极推进 Flink Iceberg sink [1] 的开发和合入。同时Flink sink和source也有 不同的实现 [2] 。但目标都是希望Iceberg能够成为首个在社区官方支持Flink的数据湖框架。

Flink Iceberg sink后续改进主要包括:在现有只能append写入的基础上,增加update和delete语义,使延迟数据可以得到正确的处理。由于整个流程不可避免的出现数据延迟到达的情况,而落地通常使用“事件时间(event time)”来聚合和分区,因此如何把延迟到达的数据合入正确的分区是需要解决的问题。实现此功能后,CDC(Change Data Capture)的场景即可被全面支持了。但这仍然依赖于 row-level delete的实现 [3]

基于Spark进行数据修正

支持对已入库数据的修正,同时保证ACID的事务特性,是现代数仓的基本能力。Iceberg的重要特性之一就是在row-level这一细粒度下的update和delete的能力。

Row-level update和delete通常有Copy-on-Write和Merge-on-Read两种方案。其中Copy-on-Write把生成新数据文件的压力集中于写入的时候,适合对读有较高要求的场景;而Merge-on-Read把合并最终结果的压力放在读取的时候,适合于快速写入的场景。

我们在内部已经实现了基于Copy-on-Write的方式。同时也将Iceberg作为Spark 3.0的V2 Data Source和multi-catalog,和Spark进行了集成,用户可以方便的通过Spark SQL进行update、delete和merge into等DML操作,以及建表删表等DDL操作。

我们作为社区中spark-3分支的维护者,正在积极推进相关功能的开发和合入,让更多的人受益。

增量消费Iceberg中的数据

流式数据落地数仓以后,还可以通过增量消费的方式回到流式处理引擎当中,继续向下游传递,做进一步的处理。而且针对数据延迟到达的情况,增量消费也为下游提供了仅获取变化数据的方式(而非全部数据),提高了信息传递的效率。增量消费,配合Flink sink写入支持update和delete语义,使端到端支持增量处理,可以进一步降低整个流程的延迟。Iceberg可以方便的基于snapshot的历史实现增量消费。

基于Spark,指定开始和结束snapshot-id的 incremental scan [4] 已经合入社区,以此为基础,基于micro-batch的 Spark Structured Streaming Read [5] 也已经实现。

Flink也有类似的增量消费的实现。

数据和元数据的压缩合并

为了提高读取时job planning的效率,小文件的压缩合并(compaction)是数仓日常维护中的重要任务,特别是流式数据直接落地,和基于Merge-on-Read实现row-level update和delete的功能,更加剧了小文件的产生。这里要注意的是压缩合并的对象既包括数据文件,也包括元数据文件。

压缩合并可以分为三个级别:

Minor compaction:仅合并元数据文件(rewrite manifest),不操作数据文件; Major compaction:合并元数据和数据文件。未来还需要处理数据文件和Merge-on-Read产生的delete文件的合并; Optimization:合并元数据和数据文件的同时,清理过期的snapshot以及这些snapshot对应的元数据和数据文件。

对于天生就是小文件的元数据,Iceberg可以自动的通过MergeAppend进行合并;也可以通过RewriteManifests手动发起合并,但它使用起来不是很方便。社区已经开发出 对应的Spark Action [6] ,依靠外部的Spark计算资源,方便的进行元数据的合并。

而对于数据文件的合并,社区也正在积极推进 相应Spark Action [7] 的开发和合入。

总结

随着数据量的持续增大,和业务对时效性的严苛要求,实时数仓的作用愈发的重要。而Iceberg凭借ACID事务、时间旅行和优秀的抽象等特性,以及对Spark和Flink等计算引擎接入的广泛支持,作为实时数仓的核心组件,可以缩短导入流程,方便数据变更,加速数据读取。

参考

[1] Iceberg作为Flink sink:https://github.com/apache/iceberg/pull/856

[2] https://github.com/generic-datalake/iceberg-pro

[3] Row-level delete的里程碑任务分解:https://github.com/apache/iceberg/milestone/4

[4] 基于Spark实现incremental scan:https://github.com/apache/iceberg/pull/829

[5] Spark Structured Streaming读取Iceberg:https://github.com/apache/iceberg/pull/796

[6] 用于合并元数据的Spark action:https://github.com/apache/iceberg/pull/875

[7] 用于合并数据文件的Spark action:https://github.com/apache/iceberg/pull/1083


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK