163

Apache Hudi与Delta Lake对比

 4 years ago
source link: http://mp.weixin.qq.com/s?__biz=MzIyMzQ0NjA0MQ%3D%3D&%3Bmid=2247484636&%3Bidx=1&%3Bsn=f219238e88aa0236a03c4a8136f558c8
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

1. 引入

在类Hadoop系统上支持ACID有了更大的吸引力,其中Databricks的Delta Lake和Uber开源的Hudi也成为了主要贡献者和竞争对手。两者都通过在“parquet”文件格式中提供不同的抽象以解决主要问题;很难选择一个比另一个更好。此博客将使用一个非常基本的示例来了解这些工具的工作原理,并让读者来比较两者的优缺点。

我们将使用与本系列下一篇文章中相反的方法,后面我们将讨论Hadoop上Data Lake的重要性,以及为什么会出现对诸如Delta/Hudi之类的系统的需求,以及数据工程师在过去如何为Lakes孤立地构建易错的ACID系统。

2. 初始化

2.1 环境

源数据库:AWS RDS MySQL

CDC工具:AWS DMS

Hudi:AWS EMR 5.29.0

Delta:Databricks运行时6.1

对象/文件存储:AWS S3

上面的工具集主要用于演示;也可以使用以下工具替代

源数据库:任何传统/基于云的RDBMS

CDC工具:Attunity,Oracle Golden Gate,Debezium,Fivetran,自定义Binlog解析器

Hudi:开源/企业Hadoop上的Apache Hudi

Delta:开源/企业Hadoop上的Delta Lake

对象/文件存储:ADLS / HDFS

2.2 数据准备步骤

create database demo;
use demo;
create table hudi_delta_test
(
pk_id integer,
name varchar(255),
value integer,
updated_at timestamp default now() on update now(),
created_at timestamp default now(),
constraint pk primary key(pk_id)
);
insert into hudi_delta_test(pk_id,name,value) values(1,’apple’,10);
insert into hudi_delta_test(pk_id,name,value) values(2,’samsung’,20);
insert into hudi_delta_test(pk_id,name,value) values(3,’dell’,30);
insert into hudi_delta_test(pk_id,name,value) values(4,’motorola’,40);

NfuUBjv.png!web

现在使用DMS将数据加载到S3中的某个位置,并使用文件夹名称full_load来标识该位置。为了更贴合标题,我们将跳过DMS的设置和配置。加载到S3后如下图所示。

JNbURnu.jpg!web

接着在MySQL表中执行一些插入/更新/删除操作

insert into hudi_delta_test(pk_id,name,value) values(4,’motorola’,40);
update hudi_delta_test set value = 201 where pk_id=2;
delete from hudi_delta_test where pk_id=3;

QfmMNfR.png!web

继续略过DMS阶段,将CDC数据按以下方式加载到S3,如下图所示

b6vQ7vY.jpg!web

意: DMS将填充一个名为“ Op”的附加字段,表示“操作”,Op取值I/U/D,分别对应插入、更新和删除。 以下图所示显示了CDC数据的内容。

df = spark.read.parquet('s3://development-dl/demo/hudi-delta-demo/raw_data/cdc_load/demo/hudi_delta_test')
df.show()

imAJVzn.jpg!web

完成了数据准备后正式开始比对。DMS将持续将CDC事件传送到S3(供Hudi和Delta Lake使用),此S3为数据源。两种工具的最终状态都旨在获得一致的统一视图,如上图MySQL所示。

3. 使用Apache HUDI

Hudi有两种方式处理UPSERTS [1]

  • 写时复制(CoW):数据以列格式(Parquet)存储,并且在更新时会创建文件的新版本。此存储类型最适合于读繁重的工作负载,因为数据集的最新版本始终在有效的列格式文件中可用。

  • 读时合并(MoR):数据以列(Parquet)和基于行(Avro)的格式存储;更新记录到基于行的“增量文件”中,并在以后进行压缩,以创建列文件的新版本。此存储类型最适合于写繁重的工作负载,因为新提交会以增量文件的形式快速写入,但是读取数据集需要合并列文件与增量文件。

3.1 启动Spark Shell

使用以下命令打开Spark Shell并进行相关导入.

spark-shell — conf “spark.serializer=org.apache.spark.serializer.KryoSerializer” — conf “spark.sql.hive.convertMetastoreParquet=false” — jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar

import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._ 
import org.apache.hudi.DataSourceWriteOptions 
import org.apache.hudi.config.HoodieWriteConfig 
import org.apache.hudi.hive.MultiPartKeysValueExtractor

3.2 使用CoW

val inputDataPath = “s3://development-dl/demo/hudi-delta-demo/raw_data/full_load/demo/hudi_delta_test”
val hudiTableName = “hudi_cow”
val hudiTablePath = “s3://development-dl/demo/hudi-delta-demo/hudi_cow”
val hudiOptions = Map[String,String]
 (
 DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> “pk_id”,
 DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> “created_at”, 
 HoodieWriteConfig.TABLE_NAME -> hudiTableName, 
 DataSourceWriteOptions.OPERATION_OPT_KEY ->
 DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, 
 DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY -> “COPY_ON_WRITE”,
 DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> “updated_at”, 
 DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> “true”, 
 DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> hudiTableName, 
 DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> “created_at”, 
 DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY -> “false”,
 DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName
 )
val temp = spark.read.format(“parquet”).load(inputDataPath)
val fullDF = temp.withColumn(“Op”,lit(‘I’))
fullDF.write.format(“org.apache.hudi”).options(hudiOptions).mode(SaveMode.Overwrite).save(hudiTablePath)

由于在Hudi选项中使用了Hive自动同步配置,因此会在Hive中创建一个名为“ hudi_cow”的表。该表使用具有Hoodie格式的Parquet SerDe创建,表结构如下图所示。

IJZfeaf.jpg!web

表数据如下图所示

BFRnIz2.jpg!web

val updateDF = spark.read.parquet(“s3://development-dl/demo/hudi-delta-demo/raw_data/cdc_load/demo/hudi_delta_test”)
updateDF.write.format(“org.apache.hudi”).options(hudiOptions).option(DataSourceWriteOptions.OPERATION_OPT_KEY,DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL).mode(SaveMode.Append).save(hudiTablePath)

进行更新操作,表“hudi_cow”将有最新的更新数据,如下图所示

V73Qf23.jpg!web

如CoW定义中所述,当我们以hudi格式将updateDF写入同一S3位置时,更新的数据在写时被复制,并且快照和增量数据使用同一张表。

3.3 使用MoR

val inputDataPath = “s3://development-dl/demo/hudi-delta-demo/raw_data/full_load/demo/hudi_delta_test”
val hudiTableName = “hudi_mor”
val hudiTablePath = “s3://development-dl/demo/hudi-delta-demo/hudi_mor”
val hudiOptions = Map[String,String]
 (
 DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> “pk_id”,
 DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> “created_at”, 
 HoodieWriteConfig.TABLE_NAME -> hudiTableName, 
 DataSourceWriteOptions.OPERATION_OPT_KEY ->
 DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, 
 DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY -> “MERGE_ON_READ”,
 DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> “updated_at”, 
 DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> “true”, 
 DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> hudiTableName, 
 DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> “created_at”, 
 DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY -> “false”,
 DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName
 )
val temp = spark.read.format(“parquet”).load(inputDataPath)
val fullDF = temp.withColumn(“Op”,lit(‘I’))
fullDF.write.format(“org.apache.hudi”).options(hudiOptions).mode(SaveMode.Overwrite).save(hudiTablePath)

还是开启了Hive自动同步,将在Hive中创建两张名为“hudi_mor”和“ hudi_mor_rt”的表。hudi_mor是经过读优化的表,具有快照数据,而hudi_mor_rt将具有增量和实时合并数据。数据将会以频繁的压缩间隔被压缩,并提供给hudi_mor。hudi_mor_rt利用Avro格式存储增量数据。正如MoR定义所示,通过hudi_mor_rt读取数据时将即时合并。这对于高更新源表很有用,同时还提供一致且非最新的读优化表。

注意:“ hudi_mor”和“ hudi_mor_rt”都指向相同的S3存储桶,只是定义了不同的存储格式。

AZVZVz2.jpg!web

3AbUFj6.jpg!web

可以看到加载后两表内容相同,内容如下所示

FNZ3yuv.jpg!web

val updateDF = spark.read.parquet(“s3://development-dl/demo/hudi-delta-demo/raw_data/cdc_load/demo/hudi_delta_test”)
updateDF.write.format(“org.apache.hudi”).options(hudiOptions).option(DataSourceWriteOptions.OPERATION_OPT_KEY,DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL).mode(SaveMode.Append).save(hudiTablePath)

表hudi_mor在很短时间内就具有相同的内容(因为演示中的数据很小,并且很快会被压缩),只要merge成功,表hudi_mor_rt就会有最新数据。

uqy2qqA.jpg!web

现在看看这些Hudi格式表的S3日志的变化。底层存储格式为parquet,同时通过日志方式管理ACID。通常生成以下类型的文件:

  • hoodie_partition_metadata:这是一个小文件,包含有关给定分区中partitionDepth和最后一次commitTime的信息

  • hoodie.properties:存储表名称、存储类型信息

  • commit和clean:文件统计信息和有关正在写入的新文件的信息,以及诸如numWrites,numDeletes,numUpdateWrites,numInserts和一些其他相关审计字段之类的信息,存储在这些文件中。这些文件在每次提交后生成

以上3个文件对于CoW和MoR类型的表都是通用的。另外对于MoR表,额外有为UPSERTED分区创建的avro格式的日志文件。如下所示的第一个log文件是CoW表中不存在的日志文件。

M3MrAf3.jpg!web

4. 使用Delta Lake

使用下面的代码片段,我们以parquet格式读取完整的数据,并以delta格式将其写入不同的位置

from pyspark.sql.functions import *
inputDataPath = "s3://development-dl/demo/hudi-delta-demo/raw_data/full_load/demo/hudi_delta_test"
deltaTablePath = "s3://development-dl/demo/hudi-delta-demo/delta_table"
fullDF = spark.read.format("parquet").load(inputDataPath)
fullDF = fullDF.withColumn("Op",lit('I'))
fullDF.write.format("delta").mode("overwrite").save(deltaTablePath)

mamY7zm.jpg!web

在Databricks Notebook的SQL界面中使用以下命令可以创建一个Hive外表,“using delta”关键字会包含基础SERDE和FILE格式的定义。

%sql
create table delta_table 
using delta
location 's3://development-dl/demo/hudi-delta-demo/delta_table'

BV7BVjE.png!web

该表的DDL如下所示。

%sql
show create table delta_table

NRNzeqM.png!web

表会包含与完整加载文件相同的所有记录。

%sql
select * from delta_table

AJVbe2F.jpg!web

使用以下命令读取CDC数据并在Hive中注册为临时视图

updateDF = spark.read.parquet("s3://development-dl/demo/hudi-delta-demo/raw_data/cdc_load/demo/hudi_delta_test")
updateDF.createOrReplaceTempView("temp")

biU7rqj.png!web

MERGE命令:下面是执行UPSERT的MERGE SQL,它作为SQL很方便地被执行,也可以在spark.sql()方法调用中执行

%sql
MERGE INTO delta_table target
USING 
(SELECT Op,latest_changes.pk_id,name,value,updated_at,created_at
  FROM temp latest_changes
 INNER JOIN (
   SELECT pk_id,  max(updated_at) AS MaxDate
   FROM temp
   GROUP BY pk_id
) cm ON latest_changes.pk_id = cm.pk_id AND latest_changes.updated_at = cm.MaxDate) as source
ON source.pk_id == target.pk_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED  THEN INSERT *

jEb6r2N.jpg!web

MERGE之后,Hive中delta_table的内容也更新了。

%sql
select * from delta_table

buAjqmn.jpg!web

与Hudi一样,Delta Lake基本文件存储格式也是“parquet”。Delta提供带有日志和版本控制的ACID功能。接着看看S3在装载和CDC合并后的变化。

imAvaeY.jpg!web

增量日志包含JSON格式的日志,文件中包含每次提交后的schema和最新文件的信息。

VFZjqi6.jpg!web

在CDC合并的情况下,由于可以插入/更新或删除多条记录。初始parquet文件的内容分为多个较小的parquet文件,这些较小的文件会被重写。如果对表进行了分区,则仅与更新的分区相对应的CDC数据将受到影响。初始parquet文件仍存在于该文件夹中,但已从新的日志文件中删除。如果我们在此表上运行VACUUM,则可以物理删除该文件。也可以使用OPTIMIZE命令[6]来串联这些较小的文件。

ay6BfaA.jpg!web

Delta日志附加了另一个JSON格式的日志文件,该文件存储schema和指向最新文件的文件指针。

RFFzMru.jpg!web

5. 总结

上述两个示例中都按原样保留了删除的记录,并通过Op ='D'标识删除,这是故意而为以显示DMS的功能,下面的参考资料显示了如何将这种软删除转换为硬删除。

希望这是一个有用的比较,有助于做出合理的选择,选择合适的数据湖框架。

eiuIjey.png!web

yU3MZrm.jpg!web

参考资料

  1. https://aws.amazon.com/blogs/aws/new-insert-update-delete-data-on-s3-with-amazon-emr-and-apache-hudi/

  2. https://databricks.com/blog/2019/07/15/migrating-transactional-data-to-a-delta-lake-using-aws-dms.html

  3. https://hudi.apache.org/

  4. https://docs.delta.io/

  5. https://databricks.com/blog/2019/08/21/diving-into-delta-lake-unpacking-the-transaction-log.html

  6. https://docs.databricks.com/delta/optimizations/index.html


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK