41

Spark SQL 分布式事务处理能力的探索与实践

 5 years ago
source link: https://mp.weixin.qq.com/s/czfXKLjHsjYmOQ4L7Ju-mA?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

云湖湖导读:

在当今大数据时代,随着存储数据量的增长,对数据库插入与读取性能的要求越来越严苛。为了提升访问数据库的性能,已经有越来越多成熟的方案来并发访问数据库,Spark JDBC Datasource就是其中之一。而另一方面,并发访问数据库也会带来各种各样的问题,比如数据的保序、数据倾斜、并发一致性等问题。

本文将会从具体案例入手,通过以下3点来详细描述如何利用Spark来处理分布式事务,解决并发一致性问题。

1、概述事务

2、案例分析

3、解决方案

更多优质内容请关注微信公众号“智能数据湖”

作者:William

责编:云湖湖

1、概述

1.1

数据库事务

事务 可以简单理解为对数据库的一批操作,这批操作要么全部成功,要么全部失败回滚。 事务有4大特性ACID

原子性(Atomicity)

事务作为一个整体被执行,包含在其中的对数据库的操作要么全部被执行,要么都不执行。

一致性(Consistency)

事务应确保数据库的状态从一个一致状态转变为另一个一致状态。一致状态的含义是数据库中的数据应满足完整性约束。

隔离性(Isolation)

多个事务并发执行时,一个事务的执行不应影响其他事务的执行。

持久性(Durability)

已被提交的事务对数据库的修改应该永久保存在数据库中。

7JZJRnF.png!web

以MySQL为例,一个简单的事务操作流程

1.2

分布式数据库事务

分布式事务是数据库事务的子类,在分布式系统与微服务架构盛行的今天,各个不同模块、服务之间进行数据库业务的操作,也是需要保证事务性质的。

现在,已经有很多分布式事务的解决方案,比如常见的2PC、3PC、MySQL的XA事务、SAGA等等。然而,各种方案都有各自的优缺点,面对分布式系统中的所有需求问题,至今是无法找到一个十全十美的解决方案。著名的CAP理论提到,在一个分布式系统中,最多只能满足C、A、P中的两个需求:

C:Consistency 一致性

同一数据的多个副本保持一致。

A:Availability 可用性

系统能够提供正常服务并能在需求的时间内返回结果。

P:Partition tolerance 分区容忍性

由于各种不可预知的问题,系统中必然不能一直使数据的各个副本保持同步,那么系统能够在可用的同时容忍数据不同步的极限称为分区容忍性。

由于至今没有有效的解法来否决CAP理论,因此在实际分布式的解决方案中,又引入了BASE理论来辅助解决分布式中的难题,该理论将会结合下面的实际案例分析来进行介绍。

1.3

Spark JDBC事务处理

Spark JDBC Datasource是Spark SQL用于访问关系型数据库的模块,其中自然运用到了事务逻辑。附录有详细源码分析。

2、案例分析

2.1

某企业需要定期将存放在HBase中的订单信息数据增量同步到PostGre数据库中,以便后续做BI报表展示。流程示意图如下:

RRRJniR.png!web

该企业要求每隔1小时将HBase中新产生的数据导入到PostGre中,在下一个导入周期开始前完成,每个周期的数据量在10~100GB级别左右。其中每张表都有时间戳(Timestamp)字段来表示数据插入的时间。另外,该企业还要求导入过程中有失败重试机制,保证所有订单数据最终导入成功,并且不能在PostGre中产生重复的订单数据。

考虑到10~100GB级别数据需要在一个周期内导入完成,性能还是有一定的需求。选型中确定了使用SparkSQL的分布式并发处理能力来进行数据的导入。

2.2

Spark JDBC中的并发问题

Spark能够并发启动多个task来同时进行数据的导入,解决了性能问题。但是并发也会带来其他的问题。

Spark是具有事务处理的能力,但是每个task都是一个独立的事务。task执行成功之后,数据会永久写入SQL数据库。如果其他的task执行失败,不会影响到已经执行成功的task进行回滚,这时SQL数据库将会产生脏数据。对于用户来讲,实际上只是执行了一条“insert”语句,但是Spark内部启动了并发task,将用户的SQL语句拆分成了多个SQL语句进行处理,而用户是不会感知的。从用户的角度考虑,如果需要事务的能力,要么这一条“insert”语句执行成功,数据正确写入数据库,要么执行失败,SQL语句回滚,没有数据写入数据库中。因此,Spark的并发处理能力无法满足用户对于事务的需求。

FZZRnuB.png!web

图 用户的一条insert语句被拆分成多个insert语句并发执行,当task4写入失败后,其他task写入的数据变成脏数据

3、解决方案

3.1

单并发事务处理

思路

1.2中提到了CAP理论,当C、A、P三者选其二的时候,如果选择CA,那么需要放弃掉数据副本,也就是说系统中只有一份数据,那么就减少了因为同步而带来的一致性问题(实际上考虑到分布式系统的可靠性,数据是需要多副本存放的,一般的分布式系统都会选择P,并在CA中二选一)。

根据这个思想,当并发存在问题的时候,采用单并发从根源上直接抹去分布式事务能力,是可以解决问题的。在Spark中可以设置参数spark.default.parallelism和spark.sql.shuffle.partitions为1来控制task的个数。这样利用Spark JDBC的事务能力,当单task执行失败的时候进行回滚,数据库就不会产生脏数据了。

JzYfAfa.png!web

图 单task成功写入即完成任务

nMVneqE.png!web

图 单task失败事务回滚,数据库不会产生脏数据

优点

完全解决了Spark因并发导致的分布式事务功能缺失问题。

缺点

无法使用Spark的并行计算能力,性能直线下降,无法满足客户需求。

3.2

二阶段式:协调者与执行者实现

1.2中介绍过,业界已经有很多分布式事务的解决方案,在这里我们介绍一下比较简单的2PC如何在Spark中实现,其他的方案也都可以根据其思想结合Spark现有的执行模式来实现。

思路

先来介绍一下2PC的实现,在2PC的系统中需要有2个角色——协调者和参与者,协调者负责任务调度,参与者负责执行任务。2PC活动分为2个阶段——准备阶段和提交阶段。

准备阶段,协调者会发送确认信息询问参与者是否已准备好,具备事务提交能力;当协调者接收到所有参与者回复之后,进入提交阶段,如果所有参与者都准备好事务提交,则提交事务,如果有一个参与者返回无法进行事务提交,则所有参与者进行回滚。

y6rm63f.png!web

图 2PC事务成功提交流程

fUZv2mm.png!web

图 2PC事务失败回滚流程

Spark中的Driver与Executor运行模式,正好填充了协调者与参与者的角色。Driver可以作为协调者,Executor中的每一个task可以作为参与者,这里还需要在Driver与task之间增加一个通信通道。准备阶段,Driver启动监听端口等待接收task反馈,各个task完成insert数据处理之后发送完成准备信息给Driver,并等待Driver下一步指令。提交阶段,Driver接受到所有task的准备信息之后,发送提交或者回滚指令。

6NNBFr7.png!web

图 Spark 2PC改造后实现分布式事务处理流程

在这里还需要打通2个难点。第一个问题是准备阶段,Driver在等待task反馈的时候,当task异常失败后有可能会无法反馈。这就需要Driver的监听与task状态检测机制关联,当遇到task异常退出的时候,Driver需要下发回滚指令,不能无止尽等待,防止Spark作业卡死。第二个问题是task是在队列等待下发的,当executor的资源不足以启动所有task的时候,会有部分task存在队列中,这时候driver等待所有task的反馈,启动中的task在等待driver下发新的指令,队列中未启动的task等待启动中的task完成任务释放资源,三方互相牵制,造成死循环。

YZVvEvq.png!web

图 当集群资源无法满足同时启动所有task的时候,造成死循环

这时需要通过Spark配置项来控制task并发数,确保executor有足够的资源能够一次性将所有并发task都启动。相关的配置项如下表所示。

f2yumqu.jpg!web

表 控制task并发数的相关配置项

优点

既能满足简单的分布式事务需求,还能充分利用Spark的并行计算能力提升性能。

缺点

1. 需要精确控制task并发数,确保所有task能同时执行,否则会造成死循环。

2. 需要在Spark中实现分布式事务的逻辑,具有一定的工作量。

除了以上这些,2PC协议本身的缺陷在结合了Spark中的一些可靠性机制之后会有所减少(比如协调者挂死、参与者挂死等),但并不代表2PC的所有缺陷Spark都能规避。因此,对于一些有复杂分布式事务处理能力的需求,该方案还是不太满足。

3.3

最佳方案:可幂等的insert ignore模式

思路

1.2中我们遗留了一个BASE理论,先来做一下介绍。BASE理论是eBay架构师提出的,它是CAP理论的一个延伸。BASE理论由BA(Basically Available, 基本可用),S(Soft State),E(Eventual Consistency, 最终一致性)三部分组成,核心思想是即使无法做到强一致性,但应用可以采用适合的方式达到最终一致性。

从案例背景中,我们可以看出来,实际上用户需要的只是最终能够完成所有数据的同步,即所谓的最终一致性,因此我们可以结合Spark的重试机制与类似MySQL的insert ignore功能来实现。

先来介绍一下MySQL的insert ignore。当表中存在主键(例如ID),那么在使用insert ignore into语法时,如果遇到插入主键相同的情况下,将会忽略此次插入。Oracle有类似的功能,叫做merge into,有一点区别是MySQL遇到冲突时,忽略插入,而Oracle遇到冲突时是进行更新操作。PostGre没有直接提供类似功能,但是网上有很多关于PostGre如何实现insert ignore的方法,在这里就不做介绍了。而在案例中涉及到的PostGre,在华为云解决方案中采用的是DWS数据仓库服务,除了兼容PostGre还提供了Oracle的merge into语法来实现该功能。

因此,针对本文的案例,具体的方案是先对表设置主键,如果只是静态的订单状态等信息,则可以简单将订单ID作为主键,而有些涉及到订单操作信息的数据(订单ID会多次出现,记录用户每次对该订单的操作,例如创建订单、对订单支付、撤销订单等),则需要新增一个唯一的事件ID作为主键。

jeymM3Z.png!web   表 订单操作信息表示例

ZFB3qqR.png!web

表 增加事件ID作为主键后的订单操作信息表示例

然后在Spark中使用insert ignore的功能,这样如果遇到并发执行时某个task失败的情况,重新执行作业,会对已经插入的数据进行忽略,并不造成数据的重复。

NfAf6vf.png!web

图 使用insert ignore,当task执行失败时部分数据写入,REDO时忽略已写入数据

优点

既解决了一致性问题,也能利用Spark的并行计算能力提升性能。

缺点

该方案使用的是最终一致性的思想,因此如果遇到强一致性的需求是无法满足的。

注释

1、 参考文献: 王能斌. 《数据库系统教程(上册)》. 电子工业出版社

2、 通过分析向数据库插入数据的源码,来介绍一下 Spark 中如何进行事务处理的(代码表可左右滑动):

// @param insertStmt是插入数据库的sql语句模板, 例如"insert into tablexx values(?, ?, ?...)"

// 根据rdd中的数据调用java jdbc batch接口将问号替换掉

// @param batchSize指定了每批次插入sql数据库的记录(行)数

// @param dialect为各种不同SQL数据库提供类型转换器

// @param isolationLevel事务的隔离级别, 同时也是用于判断是否开启事务的标志

def savePartition(

getConnection: () => Connection,

table: String,

iterator: Iterator[Row],

rddSchema: StructType,

insertStmt: String,

batchSize: Int,

dialect: JdbcDialect,

isolationLevel: Int): Iterator[Byte] = {

val conn = getConnection()

var committed = false

// 判断对方数据库是否支持相应的隔离级别

// JDBC接口中隔离级别一共有5种:(网上有很多关于该级别的解释)

// "NONE"

// "READ_UNCOMMITTED"

// "READ_COMMITTED"

// "REPEATABLE_READ"

// "SERIALIZABLE"

//

// isolationLevel为用户指定的隔离级别, spark sql中可以由参数isolationLevel来指定, 默认为"READ_UNCOMMITTED"

// finalIsolationLevel为分析对方数据库的元数据信息后, 选择的最终隔离级别

//

var finalIsolationLevel = Connection.TRANSACTION_NONE

if (isolationLevel != Connection.TRANSACTION_NONE) {

try {

// 获取数据库的元数据信息

val metadata = conn.getMetaData

// 判断是否支持事务

if (metadata.supportsTransactions()) {

// 若数据库支持用户指定的隔离级别,则将最终隔离级别调整为用户指定

// 否则的话使用数据库的默认隔离级别

val defaultIsolation = metadata.getDefaultTransactionIsolation

finalIsolationLevel = defaultIsolation

if (metadata.supportsTransactionIsolationLevel(isolationLevel)) {

finalIsolationLevel = isolationLevel

} else {

logWarning(s"Requested isolation level $isolationLevel is not supported; " +

s"falling back to default isolation level $defaultIsolation")

}

} else {

// 若数据库不支持事务, finalIsolationLevel为'NONE', 即不开启事务

logWarning(s"Requested isolation level $isolationLevel, but transactions are unsupported")

}

} catch {

case NonFatal(e) => logWarning("Exception while detecting transaction support", e)

}

}

// 若最终隔离级别不为'NONE'则开启事务

val supportsTransactions = finalIsolationLevel != Connection.TRANSACTION_NONE

try {

// 若开启事务, 则关闭jdbc接口中的自动提交功能, 并设置隔离级别

if (supportsTransactions) {

conn.setAutoCommit(false) // Everything in the same db transaction.

conn.setTransactionIsolation(finalIsolationLevel)

}

// 根据不同的数据库(mysql, postgre等)构造不同的setters, 用于将insert模板语句中的"?"填充成数据

val stmt = conn.prepareStatement(insertStmt)

val setters = rddSchema.fields.map(f => makeSetter(conn, dialect, f.dataType))

val nullTypes = rddSchema.fields.map(f => getJdbcType(f.dataType, dialect).jdbcNullType)

val numFields = rddSchema.fields.length

try {

var rowCount = 0

while (iterator.hasNext) {

val row = iterator.next()

var i = 0

// 根据字段个数逐行对insert语句进行填充

while (i < numFields) {

if (row.isNullAt(i)) {

stmt.setNull(i + 1, nullTypes(i))

} else {

setters(i).apply(stmt, row, i)

}

i = i + 1

}

stmt.addBatch()

rowCount += 1

// 当插入数据的行数达到batchSize(默认1000)之后, 进行插入

if (rowCount % batchSize == 0) {

stmt.executeBatch()

rowCount = 0

}

}

// 最后全部处理完之后, 将剩余的数据插入数据库

if (rowCount > 0) {

stmt.executeBatch()

}

} finally {

stmt.close()

}

// 若开启了事务, 执行完stmt.executeBatch()之后数据库是不会生效的, 需要执行提交事务

if (supportsTransactions) {

conn.commit()

}

committed = true

Iterator.empty

} catch {

...

} finally {

if (!committed) {

// 提交失败, 如果开启了事务则进行回滚

if (supportsTransactions) {

conn.rollback()

}

conn.close()

} else {

...

}

}

}

▼  每周二18点,不见不散 

延伸阅读

实时流计算和时空数据库助力用户IoT的无限可能

《Serverless计算这么强大》

《Spark SQL在HBase的查询性能优化》

地理时空大数据概论

MNbYRbJ.jpg!web 咦,在看吗?点一下「在看」再走呗:point_down:


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK