18

流式计算中的分布式快照

 5 years ago
source link: http://xargin.com/distributed-snapshot-in-stream-sys/?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

流式计算系统是近些年发展较快的领域,虽然发展迅速,但实际上直到现在都没有能让所有人都满意的系统出现,哪怕是 flink/blink。

流式计算的理论基石是 leslie lamport 在 1985 年发表的论文《Distributed Snapshots: Determining Global States of a Distributed System》。首先将分布式系统中的进程定义为 process,然后将进程间的连接定义为 channel。当需要计算全局状态时,因为没有能够全局同步的时钟,所以需要有别的办法能够记录整个计算集群的状态值。

uaeUBv3.png!web

process 及连接 process 的 channel 共同组成一个计算的拓补图。因此这里说的全局状态,distributed snapshot,或者叫 global snapshot,其中不只包含各个 process 的状态,还需要包含连接 process 的 channel 的状态。如果只记录 process 的状态会导致那些正在 channel 中传输的信息丢失掉,这样所谓的全局状态是不完整的。而记录 channel 的状态也有讲究,比如我们可以由 channel 两端的 process 都记录其状态,但会导致全局的 event 数多发生计算。所以在 lamport 的论文中,把 channel 状态的记录任务分给了 channel 指向的 process 来做。即每个 process 负责记录自己的状态,以及指向自己的那些 channel 的状态。大概是下面这样:

myy63aN.png!web

在这个模型中,每一个节点都有可能会触发 record global snapshot 的请求。这时候会由该节点记录其自身的状态,并发出一个特殊的 event,论文中称之为 marker。

所有收到 marker 的节点都需要做出一系列的判断和动作,直到 marker 从所有 channel 都经过了一遍 之后,算法便得到终止。这时候就可以通过任意的手段采集全局状态了。

论文中对于 marker sender 和 receiver 阶段的描述如下:

7JnM3u6.png!web

这几句话实在是读了很久都没读明白(汗),查阅了大量的解释说明之后,才终于搞明白了。。翻译版:

Marker 发送规则:
    process p 先记录自己的状态,然后向所有从 p 指向其它节点的 channel 发送 marker。
    在记录状态,发送 marker 这两个步骤中,不能发送任何后续的消息。

这里的发送规则,实际上有两种情况:

  1. 一种是初始化节点,会记录自己的状态,然后向所有出 channel 发送 marker,这种比较简单
  2. 一种是非初始化节点,会在第一次收到 marker 时,向所有出 channel 发送 marker。但是如果在某些 channel 中又收到了相同的 marker 时,虽然会标记对应 channel 的状态,但不会再向出 channel 发送 marker 了,要不然没完没了。
Marker 接收规则:
   当 process q 从 channel c 中接收到一个 marker 消息时:
   if q 还没有记录过自己的状态
       begin 那么马上记录状态
         将接收到 marker 的 channel c 标记为 empty
       end
   else
       q 开始 record 从 channel c 中来的所有消息,并将其记录为 channel c 的状态

这个算法的难懂之处主要还是在于描述,lamport 大佬写论文的时候显然没有考虑易读易理解性 orz。这里的 else 其实是说,如果 q 已经从 c0 中接收到了 marker,需要先将 c0 标记为 empty,然后开始 record 除了 c0 以外的其它 channel (c1,c2...cn)的状态,在没有从其它 channel 中收到 marker 之前,其它 channel 每一个 channel 都对应一个 fifo 队列,其队列中所包含的消息,就是这个 channel 的状态值。所以一个完整的快照是类似下面这种形式的:

p: state of p

q: state of q

r: state of r

cpq:

cpr: <e1, e2, e3>

cqr: cqp:

crp: <e4, e5>

crq:

所有 process 的 state 和 channel 的 state 都记录到,才是完整的 global state。lamport 论文的状态记录法,能够保证任意时刻得到的 global state 都能够保持一致性,这里的一致性,我个人思考的话,应该可以有两种解释:

  1. 如果是转账的场景,可以保证全局的金钱数额不变,即不多也不少。比如初始全局的总额为 rmb 100,没有外部输入的情况下,全局快照中求得的全局总额应该也为 rmb 100。
  2. 如果是发消息的场景,可以保证全局的消息与初始时的消息数目一致,不会发生消息重复,即特定的消息 ex,一定会被记录到某个节点或者某条 channel 的状态中,不会同时出现在多个地方。

lamport 的论文实际上就是在解决不重复记录的问题,并且对其进行了证明,对证明过程感兴趣的同学就自己去读论文吧。。

在 lamport 的论文中有一些假设,如:channel 为 FIFO,不会乱序,process 可以正常工作,不会 crash。但显然这些假设对于实际的分布式系统来说是不现实的。所以后续的研究基本都是在弱化这些假设。毕竟真实世界的分布式系统中,各种失败是不可避免的。

2015 年 flink 团队发表了一篇:《Lightweight Asynchronous Snapshots for Distributed Dataflows》,提出了比 lamport 更高效或者简单的算法。

论文认为之前的算法需要对所有连接 process 的 channel 记录状态,需要记录大量的中间消息,从而导致 snapshot 大小膨胀。所以希望能够解决这个问题。

该论文首先将计算的模型分为了两类:即有向无环图,和有环图。并且分别针对两种情况设计了两套算法。无环图算法较为简单,有环图的算法是在无环图算法的基础上改进得到。先看看无环的情况:

qMj6nmB.png!web

无环图中 不需要对 channel 的 state 进行记录 。flink 快照论文中提到了几个关键点:

  1. channel 可以认为相对可靠,并且遵循 FIFO 运输顺序,并且可以被 block 或 unblock。当 channel 被 block 时,所有消息都会被 buffer 住,并在执行 unblock 之前不进行传输。
  2. 任务能够在其 channel 上执行 block 和 unblock 操作,并且能通过 channel 来发送消息。在所有 output channel 上可以执行消息广播操作。
  3. 消息会从 source 任务进行注入。从 source 节点来看的话,这些输入 channel 是特殊的 "Nil" channel。

中心 coordinator 周期性地将 stage barrier 注入到所有 source 节点。当 source 节点接收到一个 barrier 时,会对其 自身 的状态进行一次快照,然后向所有 output 广播 barrier (Fig.2(a))。当非 source 节点从指向它的某个 input channel 中收到了 barrier 时,会对该 input channel 执行 block 操作,阻塞一直持续到该节点从所有 input channel 均接收到了该 barrier(Fig.2(b)),然后该节点对当前状态记录一次快照并将 barrier 广播到所有的 output(Fig.2(c))。然后该节点对所有 input channel 执行 unblock 操作继续其计算过程(Fig.2(d))。完整的全局快照只包含各个 operator(可以理解为 flink 中的 process)的状态。

完整的算法伪代码:

eArQj2y.png!web

还是比较好理解的。有环的过程会稍微复杂一些:

BvMZzur.png!web

首先需要对用户的计算拓补图进行静态分析(static analysis),然后根据分析结果判断图上是否有环。有环图指向环起点的那条被被称为回边(back-edge)。干掉所有回边之后的图和之前的无环图处理方式基本一致。而回边指向的节点需要做一些特殊处理,具体细节为:

有回边的任务节点在将 barrier 向所有 output channel 广播出去之后,会先创建一个其本身状态的快照(Fig.3 (b))。然后,从这个时间点开始,需要开始 log 所有从回边发来的数据,直到从所有指向本节点的回边收到了 barrier。如 Fig. 3(c)所示,这样可以将所有需要将那些循环中的数据也记录到当前快照中来。最终的快照包含所有节点的状态,以及所有回边上的状态(内容是收到 barrier 之前的数据)。

伪代码:

eI7Bv2z.png!web

ps. 大致研究明白了分布式快照,才明白很多架构师嘴里的最终一致性就是放屁。

参考资料:

  1. https://lamport.azurewebsites.net/pubs/chandy.pdf
  2. https://blog.acolyer.org/2015/04/22/distributed-snapshots-determining-global-states-of-distributed-systems/
  3. https://arxiv.org/abs/1506.08603

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK