15

Checkpoint对齐机制源码分析

 4 years ago
source link: http://mp.weixin.qq.com/s?__biz=MzU5MTc1NDUyOA%3D%3D&%3Bmid=2247484180&%3Bidx=1&%3Bsn=e45fa67ba4e8adb866b022663a4e68b4
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

VJrYVzM.gif

点击箭头处 “蓝色字” ,关注我们哦!!

checkpoint是保证Flink状态容错的重要机制,通过checkpoint可以实现不同的数据语义,也就是我们所说的Exactly-Once与At-Least-Once,通过不同的checkpoint机制实现不同的数据语义,这里所说的机制表示的是checkpoint对齐机制:对齐,实现 Exactly-Once 语义,不对齐,实现 At- Least -Once 语义。

官方文档解释:

JrymYnV.jpg!web

对齐通常发生在需要接受上游多个输入流的操作中,例如keyBy、join等操作,接下来将会从源码角度分析对齐机制的实现。

checkpoint机制的处理发生在StreamInputProcessor/StreamTwoInputProcessor中,该类主要负责从远端读取数据然后交给StreamOperator处理,数据读取由CheckpointBarrierHandler完成,同时也负责对齐机制的处理,由getNextNonBlocked方法完成,该接口有两个不同的实现类BarrierBuffer与BarrierTracker:

//在StreamInputProcessor/StreamTwoInputProcessor 中创建CheckpointBarrierHandler

//被调用

public static CheckpointBarrierHandler createCheckpointBarrierHandler(

StreamTask<?, ?> checkpointedTask,

CheckpointingMode checkpointMode,

IOManager ioManager,

InputGate inputGate,

Configuration taskManagerConfig) throws IOException {


CheckpointBarrierHandler barrierHandler;

if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) {

long maxAlign = taskManagerConfig.getLong(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT);

if (!(maxAlign == -1 || maxAlign > 0)) {

throw new IllegalConfigurationException(

TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key()

+ " must be positive or -1 (infinite)");

}

if (taskManagerConfig.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL)) {

barrierHandler = new BarrierBuffer(inputGate, new CachedBufferBlocker(inputGate.getPageSize()), maxAlign);

} else {

barrierHandler = new BarrierBuffer(inputGate, new BufferSpiller(ioManager, inputGate.getPageSize()), maxAlign);

}

} else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) {

barrierHandler = new BarrierTracker(inputGate);

} else {

throw new IllegalArgumentException("Unrecognized Checkpointing Mode: " + checkpointMode);

}


if (checkpointedTask != null) {

barrierHandler.registerCheckpointEventHandler(checkpointedTask);

}

return barrierHandler;

}

由此可见 BarrierBuffer 用来实现对齐机制, BarrierTracker 用来实现非对齐机制。

对齐- BarrierBuffer

BarrierBuffer 包含了对齐使用的几个重要的成员变量:BufferBlocker类型的bufferBlocker、boolean类型数组的blockedChannels , Buffer Blocker 内部包含一个ArraryDeque的队列,用于缓存对齐时的数据, blockedChannels 用于判断通道是否处于对齐状态中。

对齐流程方法:

@Override

public BufferOrEvent getNextNonBlocked() throws Exception {

while (true) {


//.....

BufferOrEvent bufferOrEvent = next.get();

if (isBlocked(bufferOrEvent.getChannelIndex())) {

//当前获取数据channel处于对齐状态中则将数据添加到缓存中

//也就是 BufferBlocker中

bufferBlocker.add(bufferOrEvent);

checkSizeLimit();

}

else if (bufferOrEvent.isBuffer()) {

//buffer 则直接返回

return bufferOrEvent;

}

else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {

if (!endOfStream) {

// 处理CheckpointBarrier 类型的数据

processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());

}

}

//.......

}

}

processBarrier方法:

private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {

//barrierId表示当前批次的checkpointId

final long barrierId = receivedBarrier.getId();

// 如果是单输入流 则直接触发checkpoint

if (totalNumberOfInputChannels == 1) {

if (barrierId > currentCheckpointId) {

// new checkpoint

currentCheckpointId = barrierId;

notifyCheckpoint(receivedBarrier);

}

return;

}

//多输入流的处理,numBarriersReceived表示已接收到的

//当前批次checkpointId 的channel 个数

//numBarriersReceived >0 表示正在对齐过程中

if (numBarriersReceived > 0) {

// this is only true if some alignment is already progress and was not canceled

if (barrierId == currentCheckpointId) {

// regular case

onBarrier(channelIndex);

}

else if (barrierId > currentCheckpointId) {

// 如果到来的barrierId也就是checkpointId 大于当前正在

//发生对齐机制的checkpointId ,那么会取消当前的checkpoint(比喻说超时导致)

// 并且重置blockedChannels状态 重置numBarriersReceived为0

//然后开启下一次(barrierId) checkpoint对齐机制

LOG.warn("{}: Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +

"Skipping current checkpoint.",

inputGate.getOwningTaskName(),

barrierId,

currentCheckpointId);


notifyAbort(currentCheckpointId, new CheckpointDeclineSubsumedException(barrierId));

releaseBlocksAndResetBarriers();

beginNewAlignment(barrierId, channelIndex);

}

else {

// ignore trailing barrier from an earlier checkpoint (obsolete now)

return;

}

}

else if (barrierId > currentCheckpointId) {

//numBarriersReceived==0 开启一次新的chechpoint

//将对应的blockedChannels置为阻塞状态true

beginNewAlignment(barrierId, channelIndex);

}

else {

// either the current checkpoint was canceled (numBarriers == 0) or

// this barrier is from an old subsumed checkpoint

return;

}


// check if we have all barriers - since canceled checkpoints always have zero barriers

// this can only happen on a non canceled checkpoint

if (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels) {

// actually trigger checkpoint

if (LOG.isDebugEnabled()) {

LOG.debug("{}: Received all barriers, triggering checkpoint {} at {}.",

inputGate.getOwningTaskName(),

receivedBarrier.getId(),

receivedBarrier.getTimestamp());

}

//对齐完成 将缓存的数据(BufferBlocker中的数据)插入到消费队列中

//被消费 ,然后触发checkpoint

releaseBlocksAndResetBarriers();

notifyCheckpoint(receivedBarrier);

}

}


对齐总体流程:在接受上游多个输入情况,当从一个输入中接受到checkpointBarrier时,会暂时将该输入channel 置为阻塞状态,并且将后续从该channel读取到的数据暂存在缓存中,当后续所有channel的checkpointBarrier都达到后,将会处理缓存中的数据,并且开始checkpoint。

非对齐- BarrierTracker

对于非对齐机制相对来说就比较简单,不会发生数据缓存,当所有的channel的 checkpointB arrier 达到就开始执行checkpoint。

public BufferOrEvent getNextNonBlocked() throws Exception {

while (true) {

Optional<BufferOrEvent> next = inputGate.getNextBufferOrEvent();

if (!next.isPresent()) {

// buffer or input exhausted

return null;

}


BufferOrEvent bufferOrEvent = next.get();

if (bufferOrEvent.isBuffer()) {

return bufferOrEvent;

}

else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {

processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());

}

else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) {

processCheckpointAbortBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());

}

else {

// some other event

return bufferOrEvent;

}

}

}

processBarrier方法:

private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {

final long barrierId = receivedBarrier.getId();

// 如果只有一个输入则直接触发checkpoint

if (totalNumberOfInputChannels == 1) {

notifyCheckpoint(barrierId, receivedBarrier.getTimestamp(), receivedBarrier.getCheckpointOptions());

return;

}


// general path for multiple input channels

if (LOG.isDebugEnabled()) {

LOG.debug("Received barrier for checkpoint {} from channel {}", barrierId, channelIndex);

}


// find the checkpoint barrier in the queue of pending barriers

CheckpointBarrierCount cbc = null;

int pos = 0;

//寻找同一批次的checkpoint

for (CheckpointBarrierCount next : pendingCheckpoints) {

if (next.checkpointId == barrierId) {

cbc = next;

break;

}

pos++;

}


if (cbc != null) {

// add one to the count to that barrier and check for completion

int numBarriersNew = cbc.incrementBarrierCount();

if (numBarriersNew == totalNumberOfInputChannels) {

// 集齐七龙珠 可以触发checkpoint了

for (int i = 0; i <= pos; i++) {

pendingCheckpoints.pollFirst();

}


// notify the listener

if (!cbc.isAborted()) {

if (LOG.isDebugEnabled()) {

LOG.debug("Received all barriers for checkpoint {}", barrierId);

}


notifyCheckpoint(receivedBarrier.getId(), receivedBarrier.getTimestamp(), receivedBarrier.getCheckpointOptions());

}

}

}

else {

// 新的开始了

if (barrierId > latestPendingCheckpointID) {

latestPendingCheckpointID = barrierId;

pendingCheckpoints.addLast(new CheckpointBarrierCount(barrierId));


// make sure we do not track too many checkpoints

if (pendingCheckpoints.size() > MAX_CHECKPOINTS_TO_TRACK) {

pendingCheckpoints.pollFirst();

}

}

}

}

非对齐总体流程:在接受上游多个输入情况下,每一个批次的checkpoint不会发生数据缓存,会直接交给下游去处理,checkpoint信息会被缓存在一个 CheckpointBarrierCount 类型的队列中,CheckpointBarrierCount标识了一次checkpoint与其channel输入checkpointBarrier个数,当 checkpointB arri e r个数 与channel个数相同则会触发checkpoint。

36VN3az.jpg!web

关注回复 Flink

获取更多系列

原创不易,好看,就点个"在看"


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK