5

Checkpoint barrier 对齐源码解析

 3 years ago
source link: https://mp.weixin.qq.com/s?__biz=MzkxOTE3MDU5MQ%3D%3D&%3Bmid=2247484510&%3Bidx=1&%3Bsn=a13c24792ca3def70f9bd729dbc51092
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

众所周知,Checkpoint 机制是 Flink 吊打其他流计算引擎的一个重要特性。如果 Checkpoint 语义是 Exactly Once,那么 Checkpoint 过程中需要进行 barrier 对齐。

本文会分析 Checkpoint barrier 对齐的部分源码,主要包括以下几部分:

  • barrier 对齐简介

  • barrier 对齐时,barrier 之后的数据缓存到哪里了?

  • BufferStorage 实现原理

  • 文末福利:渣渣瑞关于阅读源码的思考,讲解看源码的正确姿势。

1、 barrier 对齐简介

假设当前 TaskB 上游的 TaskA 有 3 个并行度。分别是:subtaskA0、A1、A2。如果当前 TaskB 已经处理了 subtaskA0 和 A1 的 barrier,subtask A2 的 barrier 还没有来。为了保障数据的一致性,此时 subtaskA0 和 A1 barrier 之后的数据不能被处理,需要缓存起来。直到 TaskB 接收到上游 TaskA 所有 subtask 的 barrier 以后,TaskB 才能处理 barrier 之后的数据。

详细介绍参考官网:https://ci.apache.org/projects/flink/flink-docs-release-1.10/internals/stream_checkpointing.html#barriers

对应官网图示:

eqiIvuF.png!mobile

从 barrier 对齐原理,我们能发现:所有 barrier 到达之前,barrier 之后的数据不能被处理,这些数据需要缓存起来,那这些数据到底缓存到哪里了呢?我们来看下一部分。

2、 barrier 对齐时,barrier 之后的数据缓存到哪里了?

对应到源码中,barrier 之后的数据缓存到 BufferStorage 中。

所有的源码只是对原理的一种具体实现而已,所以这里先不用关注 BufferStorage 的具体实现。先从 Checkpoint barrier 对齐的实现原理出发,BufferStorage 需要达到哪些功能?

2.1 源码背景:

正常数据处理有多个来源:

1、如果是 Source,则从外部组件读取数据到 Flink 中,然后进行处理

2、 如果是非 Source,则处理从上游  InputGate 发来的数据

3、 Checkpoint 时,需要将 barrier 之后的数据缓存到 BufferStorage 中,等 barrier 对齐以后,需要处理 BufferStorage 中缓存的数据。

2.2 BufferStorage 需要支持的接口

BufferStorage 作为 buffer 的存储介质最起码要有两个方法,一个插入,一个读取。对应到源码中就是 add 和 pollNext 方法。

如果仅仅有 add 和 pollNext,那么跟普通的 Queue 没有什么区别。请问 Java 的 Queue 能满足需求吗?

显然不能,有一个重要功能 Queue 是不支持的。即:如果 barrier 还没有全到达,此时 pollNext 方法允许读到数据吗?

答:当所有 barrier 来之前,不应该从 BufferStorage 中读取到数据。当所有 barrier 到达以后,才允许从 barrier 中读取到数据。

所以 BufferStorage 应该如何设计呢?

简单的思路:add 方法往 BufferStorage 中插入数据,但插入的数据并不能立即被 pollNext 方法读取到。而是要调用一个类似于数据滚动(roll)的接口。即:所有 barrier 到达以后调用 roll 方法将数据滚动以后, pollNext 方法才能消费到数据。

好巧,BufferStorage 就是这么设计的,BufferStorage 另外设计了一个重要的接口:rollOver。BufferStorage 的 rollOver 方法与 roll 方法的语义相同。

从需求的角度出发,我们并不关注 BufferStorage 内部实现。只需要 BufferStorage 对外提供三个接口:

  • add 方法:将 barrier 后的数据加入到 BufferStorage 中,此时外界消费不到

  • rollOver 方法:将添加到 BufferStorage 中数据滚动,使外界可以消费到。换言之:不调用 rollOver ,pollNext 拿不到数据。

  • pollNext  方法:消费 BufferStorage 中的数据(必须调用 add 方法添加了数据,且调用了 rollOver 对数据进行了滚动才能消费到)

把需求梳理明白了,很容易看懂 BufferStorage 的源码。

3、 BufferStorage 实现详解

对应到源码中,barrier 之后的数据缓存到 BufferStorage 中。

如果严谨一些,上面这句话描述是错误的。BufferStorage 仅仅持有一个引用,真正的数据被封装成 MemorySegment,占用的是 NetworkBuffer 的内存。

当然我们这里重点关注 BufferStorage 的实现,看 BufferStorage 是如何管理好这些引用的。

BufferStorage 是个接口,有多个实现类。实现类主要关注 CachedBufferStorage,其他实现类都是一些封装或者空的处理。

3.1 CachedBufferStorage add 方法实现

add 方法作用:Checkpoint 时,在所有 barrier 来之前,barrier 之后的数据通过调用 add 方法缓存在 BufferStorage 中。

add 方法实现:将 BufferOrEvent 加入到队列中。并记录当前 CachedBufferStorage 中的数据量大小。

// 缓存数据的队列
private ArrayDeque<BufferOrEvent> cachedBuffers;

@Override
public void add(BufferOrEvent boe) {
// 记录数据量大小
bytesBlocked += pageSize;
// 加入到队列中
cachedBuffers.add(boe);
}

3.2 CachedBufferStorage rollOver 方法实现

rollOver 方法作用:将 cachedBuffers 队列中的 BufferOrEvent,滚动成 BufferOrEventSequence。

rollOver 方法实现:

重点在于将 cachedBuffers 队列传递给 BufferOrEventSequence,BufferOrEventSequence 即可读取到 cachedBuffers 队列了。

BufferOrEventSequence 代码:

private static class BufferOrEventSequence {

/** 重点是这里,缓存的数据 */
private final ArrayDeque<BufferOrEvent> queuedBuffers;

/** The total size of the cached data. */
private final long size;
}

3.3 CachedBufferStorage pollNext 方法实现

pollNext 方法作用:消费 BufferOrEventSequence 中的数据。

pollNext 方法实现:

pollNext 会从 BufferOrEventSequence 中读数据。只有调用 rollOver 时,BufferOrEventSequence 中才能读取到数据。

pollNext 会调用 BufferOrEventSequence 的 getNext,BufferOrEventSequence 的 getNext 会从队列中拿数据。代码很简单自行查看。

3.4 代码总结

总结三个方法的作用:

  • add 方法只会将 BufferOrEvent 放到一个队列中。

  • rollOver 方法将队列中的 BufferOrEvent 滚动成 BufferOrEventSequence。

  • pollNext 方法从 BufferOrEventSequence 中拿 BufferOrEvent。

分析完 BufferStorage 的实现,很容易想到:什么时候调用 rollOver 呢?答:当然是所有 barrier 到了就会调用 rollOver 方法,使上层可以消费到缓存的数据。

当然源码中会看到很多调用 rollOver 的地方,大多数都是异常情况处理,例如:1、 Checkpoint 取消了 2、 buffer 放满了,任务要挂掉 3、 当前上游数据结束了,上游 partition 没数据了

4、 关于看源码的思考

看源码一时爽,一直看源码一直爽。

所有的源码只是对原理的一种具体实现而已,看源码之间最好先把原理搞懂,有目的的去看源码。

如果不懂 barrier 对齐的原理,直接看 BufferStorage 会导致一脸懵逼,为什么要设计一个 rollOver 接口?干嘛用的?宝宝看完表示一脸懵逼。。。

看源码的正确姿势:从原理到源码,深入了解细节。千万不能从源码到原理,你会一脸懵逼的。

为什么会写这篇文章呢?是因为昨天在我的大数据交流群里,有个群友提问:BufferStorage 中 rollOver 接口 是干嘛的?

渣渣瑞二话不说就开始撸源码,渣渣瑞虽然从事 Flink 引擎开发,但并不是阅读了 Flink 所有源码。代码量巨大,不可能全部阅读完,一般是有需求才会对某些模块详细阅读,当然很多核心代码渣渣瑞都是阅读过的。

看到群里提问,我刚开始看代码时也是有点蒙蔽,不理解 rollOver 接口是干嘛的?为什么要将数据滚动一波才对外可见呢?然后看代码注释和外层调用,才明白了 BufferStorage 是用来缓存 barrier 之后的数据。再一想:barrier 后的数据确实不应该马上读取到,应该是所有 barrier 到达后,外层才能读取到,瞬间明白了 rollOver 接口的作用。

这里需要对 barrier 对齐有深入的理解才能立即联想起来,整个过程不到 10 分钟,群友直呼:惊叹大佬的学习能力。

好的,吹牛逼结束。再次建议大家,看源码的正确姿势:从原理到源码,深入了解细节。千万不能从源码到原理,你会一脸懵逼的。除非你对原理非常精通,否则不建议二话不说就撸源码,只会越撸越不自信,直到劝退。

顺便补一句:想加群的小伙伴在公众号菜单栏中找二维码即可加渣渣瑞微信,渣渣瑞拉你进活跃的大数据技术交流群。

6rqaai7.png!mobile

uYrERna.png!mobile


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK