8

Flink SQL CDC实践以及一致性分析

 3 years ago
source link: https://mp.weixin.qq.com/s?__biz=MzIwNDkwMjc1OQ%3D%3D&%3Bmid=2247484847&%3Bidx=1&%3Bsn=8f011c282484a13ce38b4b425654e561
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

一. 背景

数据准实时复制(CDC)是目前行内实时数据需求大量使用的技术,随着国产化的需求,我们也逐步考虑基于开源产品进行准实时数据同步工具的相关开发,逐步实现对商业产品的替代。我们评估了几种开源产品,canal,debezium,flink CDC等产品。作了如下的对比:

组件 Canal Debezium Flink 开源方 阿里 redhat flink社区+阿里 开发语言 Java Java Java 支持数据库 MySQL "MongoDB、MySQL、PostgreSQL、SQL Server 、Oracle( 孵化)、DB2( 孵化)、Cassandra( 孵化)" MySQL、PostgreSQL 是否支持bootstrap 否 是 是 是否支持解析DDL同步 是 是 是 是否支持HA 是 基于kafka-connector flink集群高可用 社区活跃(2020.07.20) "release:2019.09.02,star:14.8k,last-commit:2020.03.13" "release:2020.07.16,star:3.4k,last-commit:2020.07.16" release:2020.07.14,star:14k,last-commit:2020.07.17 文档 中文,百度可以解决 英文,官方文档十分详细 官方中文文档,github readme 文档 MQ集成 RocketMQ、Kafka kafka(按照主键分发) kafka(支持轮转+自定义)、ES、PG、Mysql等

二.什么是Flink SQL CDC Connectors

在Flink 1.11引入了CDC机制,CDC的全称是Change Data Capture,用于捕捉数据库表的增删改查操作,是目前非常成熟的同步数据库变更方案。Flink CDC Connectors是Apache Flink的一组源连接器,是可以从MySQL、PostgreSQL数据直接读取全量数据和增量数据的Source Connectors,开源地址:https://github.com/ververica/flink-cdc-connectors。目前(1.11版本)支持的Connectors如下:

Connector Database Database Version Flink Version MySQL CDC MySQL Database: 5.7, 8.0.x
JDBC Driver: 8.0.16 1.11+ Postgres CDC PostgreSQL Database:9.6, 10, 11, 12
JDBC Driver:42.2.12 1.11+

另外支持解析Kafka中debezium-json和canal-json格式的Change Log,通过Flink进行计算或者直接写入到其他外部数据存储系统(比如Elasticsearch),或者将Changelog Json格式的Flink数据写入到Kafka:

Format Supported Connector Flink Version Changelog Json Apache Kafka 1.11+

三. Flink SQL CDC原理介绍

在公开的 CDC 调研报告中,Debezium 和 Canal 是最流行使用的 CDC 工具,这些CDC工具的核心原理是抽取数据库日志获取变更。在经过一系列调研后,我行采用的是Debezium(支持全量、增量同步,同时支持Mysql、PostgreSQL、Oracle等数据库)。

Flink SQL CDC内置了Debezium引擎,利用其抽取日志获取变更的能力,将changelog转换为Flink SQL认识的RowData数据。(以下右侧是Debezium的数据格式,左侧是Flink的RowData数据格式)

RjABjev.png!mobile

RowData 代表了一行的数据,在RowData 上面会有一个元数据的信息RowKind,RowKind 里面包括了插入(+I)、更新前(-U)、更新后(+U)、删除(-D),这样和数据库里面的 binlog 概念十分类似。通过 Debezium 采集的数据,包含了旧数据(before)和新数据行(after)以及原数据信息(source),op 的 u表示是 update 更新操作标识符(op字段的值c,u,d,r分别对应create,update,delete,reade),ts_ms 表示同步的时间戳。

三.三种数据同步方案

3.1 方案一:Debezium+Kafka+计算程序+存储系统

目前我行在生产上采用的就是这个方案,采用Debezium订阅MySQL的Binlog传输到Kafka,后端是由计算程序从Kafka里消费,最后将数据写入到其他存储,架构类似如下:

Q7re2iz.png!mobile

这种方案中利用Kafka消息队列做解耦,Change Log可供任何其他业务系统使用,消费端可采用Kafka Sink Connector或者自定义消费程序,但是由于原生Debezium中的Producer端未采用幂等特性, 因此消息可能存在重复 ,另外Kafka Sink Connector(比如JDBC Sink Connector只能保证At least once)或者自定义消费程序在保证数据的一致性上也有难度。

3.2 方案二:Debezium+Kafka+Flink SQL+存储系统

从第二章节我们知道Flink SQL具备解析Kafka中debezium-json和canal-json格式的Change Log能力,我们可以采用如下同步架构:

QzmEzeN.png!mobile

与方案一的区别就是,采用Flink通过创建Kafka表,指定format格式为debezium-json,然后通过Flink进行计算后或者直接插入到其他外部数据存储系统。方案二和方案一类似,组件多维护繁杂,而前述我们知道Flink 1.11中CDC Connectors内置了Debezium引擎,可以替换Debezium+Kafka方案,因此有了更简化的方案三。

3.3 方案三:Flink SQL CDC + JDBC Connector

将如下架构虚线部分用Flink SQL替换,

Fr6V7jm.png!mobile

我们得到如下改进的同步方案架构: MVvequZ.png!mobile

从官方的描述中,通过Flink CDC connectors替换Debezium+Kafka的数据采集模块,实现Flink SQL采集+计算+传输(ETL)一体化,优点很多:

  • 开箱即用,简单易上手

  • 减少维护的组件,简化实时链路,减轻部署成本

  • 减小端到端延迟

  • Flink 自身支持 Exactly Once 的读取和计算

  • 数据不落地,减少存储成本

  • 支持全量和增量流式读取

  • binlog 采集位点可回溯

四. Flink SQL CDC + JDBC Connector同步方案验证

4.1 测试环境和脚本

测试环境测试场景 使用flink sql CDC 从MySQL数据库同步数据到目标MySQL,KAFKA。

CREATE TABLE sbtest1 (
id INT,
k INT,
c STRING,
pad STRING
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '197.XXX.XXX.XXX',
'port' = '3306',
'username' = 'debezium',
'password' = 'PASSWORD',
'database-name' = 'cdcdb',
'table-name' = 'sbtest1',
'debezium.snapshot.mode' = 'initial'
);

到DB:
create table printSinkTable (
id INT,
k INT,
c STRING,
pad STRING,
primary key (id) NOT ENFORCED
) with (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://197.XXX.XXX.XXX:3306/mydb?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL&serverTimezone=UTC',
'username' = 'debezium',
'password' = 'PASSWORD',
'table-name' = 'sbtest',
'driver' = 'com.mysql.cj.jdbc.Driver',
'sink.buffer-flush.interval' = '3s',
'sink.buffer-flush.max-rows' = '1',
'sink.max-retries' = '5');
INSERT INTO printSinkTable SELECT * FROM sbtest1;

到KAFKA:
CREATE TABLE kafka_gmv (
id INT,
k INT,
c STRING,
pad STRING
) WITH (
'connector' = 'kafka-0.11',
'topic' = 'kafka_gmv',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = '197.XXX.XXX.XXX:9092',
'format' = 'changelog-json'
);

INSERT INTO kafka_gmv SELECT * FROM sbtest1;

4.2 测试结论

4.2.1  功能测试

目标场景 初始化操作 插入操作 更新操作 删除操作 数据一致性 目标KAFKA 支持 正常 正常 正常 一致 目标MySQL 支持 正常 正常 正常 一致

4.2.2 异常测试

4.2.2.1 常规功能测试

场景 操作 异常恢复 初始化 kill目标库 恢复同步任务后,目标库存在残留数据,jdbc sink 使用upset方式更新,数据能保障和源库一致 初始化 kill源库 和目标库异常恢复过程相同 同步数据 kill源库、目标库 同步任务恢复后,flink会根据checkpoint位点,继续同步异常点时的GTID位点数据,保障数据不丢失

4.2.2.2 基于DNS的数据库切换测试

测试示意图:DNS.png yI3EnyE.png!mobile

flink需要配置的参数:任务失败后延迟5秒重启,重试10次。restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 10 restart-strategy.fixed-delay.delay: 5s

MySQL环境信息:一主库两个从库。

DNS配置:DNS申请了一个域名:XX.XX.cmbc.cn 策略:当前域名指向其中一个从库,探测数据库服务端口,每个2分钟自动探测一次。当前数据库异常后DNS修改指向到第二个从库。

操作系统和JVM缓存配置:JVM缓存配置30秒,操作系统缓存30秒。

测试结果:1,当flink参数未设置上述参数的情况下,kill当前访问数据库,flink 任务报错退出,查看DNS没有访问记录。2,flink配置上述参数后,flink后台尝试访问上述数据库,本地DNS缓存在访问失败的情况下失效,重新请求DNS域名服务器获取新数据库访问信息,任务继续复制。

4.2.2.3 flink高可用测试

在flink高可用测试中,我们使用Standalone 集群高可用性方案进行测试,一个主JobManager,一个从JobManager,当主节点异常之后,备选节点成为新的leader,并接管Flink集群。新JobManager成为新的leader后,集群恢复正常,并可以进行任务的调度,异常的任务恢复运行。这里备选和主节点是一样的,也就是说每个JobManager都可以充当备选和主节点。官网的下图展示了这一过程:

q6R3Ijy.png!mobileimg

异常测试步骤和结果如下:

操作 集群恢复过程 kill JobManger 进程 集群所有任务失败,从JobManager成为新leader后,异常任务恢复。 kill TaskManager进程 JobManager 调度任务到其他slave节点,使用checkpoint中的信息恢复任务。

4.2.3 性能测试

性能测试进行了累计测试,用以检测flink cdc的极限性能,分别测试了kafka和MySQL作为目标的场景。

测试描述

使用sysbench进行压测,插入200余万数据,表结构如下:

 CREATE TABLE `sbtest1` (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`k` int(10) unsigned NOT NULL DEFAULT '0',
`c` char(120) NOT NULL DEFAULT '',
`pad` char(60) NOT NULL DEFAULT '',
PRIMARY KEY (`id`),
KEY `k_1` (`k`)
);

累计性能测试结果:

目标端 性能 目标KAFKA 10万+ 目标MySQL-insert 1万+ 目标MySQL-update 6000+ 目标MySQL-delete 1.2万+

五. Flink SQL CDC + JDBC Connector端到端一致性分析

Flink SQL CDC + JDBC Connector本质上是一个Source和Sink并行度为1的Flink Stream Application,Source和Sink之间无Operator,下面我们逐步分析Flink SQL CDC + JDBC Connector端到端如何保证一致性。

5.1 端到端一致性实现条件

一致性就是业务正确性,在“流系统中间件”这个业务领域,端到端一致性就代表Exacly Once Msg Processing(简称EOMP),即一个消息只被处理一次,造成一次效果。即使机器或软件出现故障,既没有重复数据,也不会丢数据。

幂等就是一个相同的操作,无论重复多少次,造成的效果和只操作一次相等。

流系统端到端链路较长,涉及到上游Source层、中间计算层和下游Sink层三部分,要实现端到端的一致性,需要实现以下条件:

1.上游可以replay,否则中间计算层收到消息后未计算,却发生failure而重启,消息就会丢失。

2.记录消息处理进度,并保证存储计算结果不出现重复,二者是一个原子操作,或者存储计算结果是个幂等操作,否则若先记录处理进度,再存储计算结果时发生failure,计算结果会丢失,或者是记录完计算结果再发生failure,就会replay生成多个计算结果。

3.中间计算结果高可用,应对下游在接到计算结果后发生failure,并未成功处理该结果的场景,可以考虑将中间计算结果放在高可用的DataStore里。

4.下游去重,应对下游处理完消息后发生failure,重复接收消息的场景,这种可通过给消息设置SequcenceId实现去重,或者下游实现幂等。

在Flink SQL CDC + JDBC Connector方案中,上游是数据库系统的日志,是可以replay的,满足条件1“上游可replay”,接下来我们分别分析Flink SQL CDC如何实现条件2和3,JDBCConnector如何实现条件4,最终实现端到端的一致性。以MySQL->MySQL为例,架构图如下(目前Flink SQL是不支持Source/Sink并行度配置的,Flink SQL中各算子并行度默认是根据Source的Partition数或文件数来决定的,而DebeziumSource的并行度是1,因此整个Flink Task的并行度为1):

eYvER3V.png!mobile

5.2 Flink SQL CDC的一致性保证

Flink SQL CDC用于获取数据库变更日志的Source函数是DebeziumSourceFunction,且最终返回的类型是RowData,该函数实现了CheckpointedFunction,即通过Checkpoint机制来保证发生failure时不会丢数,实现exactly once语义,这部分在函数的注释中有明确的解释。

/**
* The {@link DebeziumSourceFunction} is a streaming data source that pulls captured change data
* from databases into Flink.
* 通过Checkpoint机制来保证发生failure时不会丢数,实现exactly once语义
* <p>The source function participates in checkpointing and guarantees that no data is lost
* during a failure, and that the computation processes elements "exactly once".
* 注意:这个Source Function不能同时运行多个实例
* <p>Note: currently, the source function can't run in multiple parallel instances.
*
* <p>Please refer to Debezium's documentation for the available configuration properties:
* https://debezium.io/documentation/reference/1.2/development/engine.html#engine-properties</p>
*/

@PublicEvolving
public class DebeziumSourceFunction<T> extends RichSourceFunction<T> implements
CheckpointedFunction,
ResultTypeQueryable<T>
{

为实现CheckpointedFunction,需要实现以下两个方法:

public interface CheckpointedFunction {
//做快照,把内存中的数据保存在checkpoint状态中
void snapshotState(FunctionSnapshotContext var1) throws Exception;

//程序异常恢复后从checkpoint状态中恢复数据
void initializeState(FunctionInitializationContext var1) throws Exception;
}

接下来我们看看DebeziumSourceFunction中都记录了哪些状态。

/** Accessor for state in the operator state backend. 
offsetState中记录了读取的binlog文件和位移信息等,对应Debezium中的
*/

private transient ListState<byte[]> offsetState;

/**
* State to store the history records, i.e. schema changes.
* historyRecordsState记录了schema的变化等信息
* @see FlinkDatabaseHistory
*/

private transient ListState<String> historyRecordsState;

再回到端到端一致性的条件2和3

2.记录消息处理进度,并保证存储计算结果不出现重复,二者是一个原子操作,或者存储计算结果是个幂等操作,否则若先记录处理进度,再存储计算结果时发生failure,计算结果会丢失,或者是记录完计算结果再发生failure,就会replay生成多个计算结果。
3.中间计算结果高可用,应对下游在接到计算结果后发生failure,并未成功处理该结果的场景,可以考虑将中间计算结果放在高可用的DataStore里。

我们发现在Flink SQL CDC是一个相对简易的场景,没有中间算子,是通过Checkpoint持久化binglog消费位移和schema变化信息的快照,来实现Exactly Once。接下来我们分析Sink端。

5.2.1 JDBC Sink Connector如何保证一致性

我们在官网上发现对于JDBC Sink Connector的幂等性中有如下解释:

如果定义了主键,JDBC 写入时是能够保证 Upsert 语义的, 如果 DB 不支持 Upsert 语法,则会退化成 DELETE + INSERT 语义。 Upsert query 是原子执行的,可以保证幂等性

这个在官方文档中也详细描述了更新失败或者存在故障时候如何做出的处理,下面的表格是不同的 DB 对应不同的 Upsert 语法:

Database Upsert Grammar MySQL INSERT .. ON DUPLICATE KEY UPDATE .. PostgreSQL INSERT .. ON CONFLICT .. DO UPDATE SET ..

因此我们可以通过写入时保证Upsert语义,从而保证下游Sink端的幂等性,再Review一次到端到端一致性实现条件4,下游去重也可以通过实现幂等从而实现下游的Exactly Once语义。

4.下游去重,应对下游处理完消息后发生failure,重复接收消息的场景,这种可通过给消息设置SequcenceId实现去重,或者下游实现幂等。

5.2.2 Flink SQL CDC + JDBC Sink Connector组合后如何保证一致性

在前两小节我们分析了组件各自如何保证一致性,接下来,我们分析组合后在源库异常、Flink作业异常、目标库异常三种异常场景下如何保证端到端一致性

BVrINjM.png!mobile

5.2.2.1 Debezium Source对MySQL进行Snapshot时发生异常

在Flink Task启动后,首先会进行MySQL全表扫描,也就是做Snapshot,这里有个需要注意的地方就是,在Snapshot阶段,在扫描全表数据时,没有可用于恢复的位点,所以无法在全表扫描阶段去执行Checkpoint。为了不执行Checkpoint,MySQL的CDC源表会让执行中的Checkpoint一直等待(通过持有checkpoint锁实现),甚至Checkpoint超时(如果表超级大,扫描耗时非常长)。这块可以从DebeziumChangeConsumer的代码中看到:

@Override
public void handleBatch(
List<ChangeEvent<SourceRecord, SourceRecord>> changeEvents,
DebeziumEngine.RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> committer)
throws InterruptedException
{
try {
for (ChangeEvent<SourceRecord, SourceRecord> event : changeEvents) {
SourceRecord record = event.value();
deserialization.deserialize(record, debeziumCollector);

if (isInDbSnapshotPhase) {
if (!lockHold) {
MemoryUtils.UNSAFE.monitorEnter(checkpointLock);
lockHold = true;
//在snapshot阶段不做checkpoint
LOG.info("Database snapshot phase can't perform checkpoint, acquired Checkpoint lock.");
}
if (!isSnapshotRecord(record)) {
MemoryUtils.UNSAFE.monitorExit(checkpointLock);
isInDbSnapshotPhase = false;
LOG.info("Received record from streaming binlog phase, released checkpoint lock.");
}
}

// emit the actual records. this also updates offset state atomically
emitRecordsUnderCheckpointLock(debeziumCollector.records, record.sourcePartition(), record.sourceOffset());
}
...

在做Snapshot阶段,可能会碰到源库MySQL异常或者Flink任务本身异常,那我们分别分析下异常后如何恢复:

1.若遇到源库MySQL异常,Flink Task发现无法连接数据库异常退出,重新启动Flink Task(或者retry),因为没有做snapshot没做checkpoint,那么会重新再做一次Snapshot,这些全量数据最后发送到目的MySQL,由于下游MySQL实现了写幂等,因此最终保持一致性。

2.若遇到Flink任务异常,重新启动(或者retry),同上面情况一样,重新做一次Snapshot,最终也能保持一致性。

3.若遇到目标库MySQL异常,同场景一一致,Flink Task无法往目标数据库写入异常退出,在需要重新启动或retry后,重新做一次Snapshot,全量数据最后发送到目的MySQL,由于目的下游M有SQL实现了写幂等,最终保持一致性。

5.2.2.2 Snapshot完成后读取binlog时发生异常

在全量数据完成同步后,开始进行增量获取,此时Flink会进行定时Checkpoint,将读取binlog的位移信息和schema信息存入到StateBackend,若此时发生异常,那我们分析下异常后如何恢复:

1.若源MySQL异常,Flink Task发现无法连接数据库异常退出,重新启动Flink Task(或者retry),将会从最近一次Checkpoint的数据进行恢复,由于可以读取到mysql binlog位移信息,实现继续同步,不会丢失数据,最终也能保持一致性。

2.若Flink任务异常,重新启动或retry后,同场景1一致,继续读取binlog,能保持一致性。

3.若目的MySQL异常,jdbc connector无法往目标数据库写入,cdc connector读取到的binlog位移信息也不再更新,两个操作是一个原子性操作,在Flink Task恢复后,从最近一次Checkpoint进行恢复,最终保持一致性。

5.3 总结

分布式系统中端到端一致性需要各个组件参与实现,Flink SQL CDC + JDBC Connector可以通过如下方法保证端到端的一致性:

  • 源端是数据库的binlog日志,全量同步做Snapshot异常后可以再次做Snapshot,增量同步时,Flink SQL CDC中会记录读取的日志位移信息,也可以replay

  • Flink SQL CDC作为Source组件,是通过Flink Checkpoint机制,周期性持久化存储数据库日志文件消费位移和状态等信息(StateBackend将checkpoint持久化),记录消费位移和写入目标库是一个原子操作,保证发生failure时不丢数据,实现Exactly Once

  • JDBC Sink Connecotr是通过写入时保证Upsert语义,从而保证下游的写入幂等性,实现Exactly Once

再来回顾一下端到端保持一致性的条件,发现全都能满足。

1.上游可以replay,否则中间计算层收到消息后未计算,却发生failure而重启,消息就会丢失。

2.记录消息处理进度,并保证存储计算结果不出现重复,二者是一个原子操作,或者存储计算结果是个幂等操作,否则若先记录处理进度,再存储计算结果时发生failure,计算结果会丢失,或者是记录完计算结果再发生failure,就会replay生成多个计算结果。

3.中间计算结果高可用,应对下游在接到计算结果后发生failure,并未成功处理该结果的场景,可以考虑将中间计算结果放在高可用的DataStore里。

4.下游去重,应对下游处理完消息后发生failure,重复接收消息的场景,这种可通过给消息设置SequcenceId实现去重,或者下游实现幂等。

六. Flink SQL CDC目前存在的缺陷

  • 1,使用正则匹配原表后(多个源端表),到目标表无法进行一对一的映射。需要逐个匹配。

  • 2,CDC source端定义时,需要指定所有字段,目前不支持省略字段定义。

  • 3,CDC到KAFKA时无法按照主键进行自动分区分发、无法指定分区键分发数据。到KAFKA的数据格式指定(JSON,AVRO JSON等)。

  • 4,目标端支持需求:DB2、ADB/GreenPlum、Oracle暂不支持。不支持DDL同步,不支持表的创建。

  • 5,任务管理和监控的REST API 不完善。

参考资料:

《端到端一致性,流系统Spark/Flink/Kafka/DataFlow对比总结》https://zhuanlan.zhihu.com/p/77677075

《基于Flink SQL CDC的实时数据同步方案》https://developer.aliyun.com/article/777502

《Flink SQL 1.11新功能与最佳实践》https://developer.aliyun.com/article/771773

《分布式快照算法》https://zhuanlan.zhihu.com/p/53482103

作者介绍:

文乔:2012年硕士毕业后加入民生银行生产运营部系统管理中心,天眼日志平台主要参与人,目前在开源工具组负责Flume、Kafka的源码研究和工具开发等相关工作。

王健:2011年加入民生银行科技部,数据库管理员(负责DB2,Oracle,MySQL等运维工作,对MPP等数据库有很长的维护和实施经验,擅长数据迁移等等),同时负责行内KAFKA集群运维和实施工作,负责行内数据库实时复制等工作。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK