18

开源 | WPaxos:生产级Paxos算法实现解析

 3 years ago
source link: http://mp.weixin.qq.com/s?__biz=MzI1NDc5MzIxMw%3D%3D&%3Bmid=2247489747&%3Bidx=1&%3Bsn=7ef2eb1f92a0bde54fa615ae5899e3e0
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

开源项目专题系列

1. 开源项目名称:WPaxos

2. github地址:

https://github.com/wuba/WPaxos

3. 简介:

WPaxos是58同城开源的一致性算法Paxos的生产级高性能Java实现,用于解决高并发、高可靠分布式系统中多副本数据状态不一致问题以及分布式共识问题。

背景

本文为WPaxos源码解析系列的第一篇文章,主要介绍核心Paxos算法实现,之后我们还会在源码解析系列其它文章中分别对每个模块的设计与实现进行详细介绍,欢迎大家持续关注,计划文章列表如下:

  • WPaxos:存储模块解析

  • WPaxos:如何实现多副本数据快速对齐

  • WPaxos:master自主选举机制

  • WPaxos:高可靠、高并发场景下应用实践

Paxos是什么

Paxos是1990年Leslie Lamport在论文《The Part-Time Parliament》提出的一种基于消息传递且具有高度容错性的一致性算法,是解决分布式一致性问题最有效的算法之一。在基于异步通信的分布式环境中,机器宕机、网络异常等情况时常发生,引入Paxos算法可以保证发生以上异常情况时,分布式系统仍然可以对某个提案快速达成一致,同时不会破坏系统已有的一致性状态。但论文中提到的Paxos协议比较难于理解,又叫Basic Paxos。

Basic Paxos算法中每次发起的数据同步请求,称为一个提案,每个提案都拥有编号(M)以及提案内容(V),分布式集群中的每个节点都同时具有Proposer(提案的发起者)、Acceptor(提案的接受者)、Learner(提案的学习者)三个角色,每个节点都可以作为Proposer并发发起提案。

为了保证多节点提案能够达成一致,提案的选定有以下两个原则:

P1:每个Acceptor必须批准它接收到的第一个提案;

P2:如果一个提案[M0,V0]被选定后,任何Proposer提出更高编码的提案,最终选定的值都为V0;

基于这两个原则,Paxos算法提案选定过程分为以下两个阶段:

bYN7vir.png!mobile

算法解析

直观的理解Basic Paxos算法可能会有很多疑惑,比方说为什么需要经过Prepare、Accept两个阶段过半节点响应才能提交提案,多个节点并发发起提案发生冲突如何处理等等,《The Part-Time Parliament》文章中有对算法进行详细的数学推理论证,本文不做重复论述,仅结合一些异常场景简单介绍下Basic Paxos算法的鲁棒性。
假如分布式系统中有三个节点,分别有Proposer P1 P2 P3,Acceptor A1 A2 A3,编号相同的Proposer与Acceptor为同一个节点,通过Paxos算法实现三个节点数据强一致性。

Case1:出现网络异常

网络异常包括多种情况,可能为网络出现分区,如图1所示,P3与P1 P2发生网络隔离,或者部分节点之间网络中断,如图2所示,P1与P2、P2与P3访问正常,但P1与P3网络不通。

I3QJjub.png!mobile图1

图1的场景下,P1与P3并发发起提案,P3发起的提案将得不到过半节点响应,提案不会得到批准,当网络恢复时,节点3作为Learner可从另外两个节点学习对齐数据,实现数据最终一致性。

iuIj2me.png!mobile图2

而图2的场景下,P1与P3都能与过半节点通信,但是由于A2节点只能接受编号更高的提案,所以P1与P3发起的提案,只有一个能够最终被确定提交,而另外一个节点,也会通过Learner将已提交的提案学习同步过来,保证数据一致;
其它网络异常的场景还有很多,Paxos算法要求任何提案必须经过过半节点响应,而 N个节点中任意两个N/2+1集合必有交集,所以出现网络异常时,不会出现数据分裂情况。但是是否仅通过过半节点响应机制就可以保证数据强一致性?假如在图2的场景下省去Prepare阶段,每个节点发起Accept请求被过半节点响应后就Commit提交会怎样?如图3所示:

nauumyV.png!mobile图3

A2节点先后收到P1和P3节点发起的提案编号递增的Accept请求,并都接受通过,若P3的Commit v3请求先到达A2被执行,A2之后接收到P1的Commit请求后,就会执行失败,最终会导致节点1和节点2、3存储的数据不一致。因此,多节点并发发起提案时,同时必须要经过两阶段过半节点的交互,才能提交提案。

Case2:提案编号冲突

前面介绍Paxos算法时,以编号的大小标志提案的优先级,编号越大优先级越高。每个Proposer维护自己的提案编号,有可能会出现两个节点同时发起编号相同的提案请求。如图4所示,

QvQRF36.png!mobile图4

P1与P3,同时发起编号为m1的提案,由于Prepare阶段仅能批准编号更大的提案,所以A2节点会只批准P1发起的第一个m1提案请求,而拒绝P3发起的第二个m1提案,同时A2会将它当前承诺的最大提案编号返回给P3节点,P3接收到后,将提案编号更新为已接收到的最大提案编号加1,重新发起提案,但此时仍然有很大概率冲突。虽然提案编号冲突不会导致数据不一致,但是增加了无效的网络交互,有损性能。

因此,提案编号最好能够保持全局唯一递增,常见的解决方案是在提案编号中同时加入节点NodeID信息,如提案编号组成为[ProposeID,NodeID],ProposeID为原有每个节点自己维护的编号,当提案中ProposeID相同时,再比较NodeID大小,或者是不同节点提案编号分段取值,如ProposeID *(node size)+ NodeID。

Case3:异常情况下的提案冲突

在分布式系统并发比较高的情况下,多节点并发发起的提案请求可能以任意序列到达,但最终每个节点提交的提案序列期望是完全一致的,已经通过Prepare、Accept两个阶段交互进入Commit阶段的提案数据,不能再被覆盖或丢失。但节点上已经Accept批准的提案数据可能有两种状态,一种是提案最终通过了过半节点回复,进入Commit阶段,另一种是提案仅被自己接受,最终并未被过半节点批准,如果发起该提案的Proposer出现故障,其它节点是很难区分以上两种状态。为了不出现数据丢失,算法中的Prepare阶段,Acceptor会将已批准尚未提交的值返回给Proposer,由Proposer代为提交。

bM3uem6.png!mobile图5

图5展示了,如果Prepare阶段Acceptor未将已批准的值返回给Proposer的影响。P1、P3并发发起提案,P1发起的提案得到过半响应后,P3节点此时也并发发起了编号更高的提案,A2节点如果接收到P3的Prepare请求未返回已批准的P1提案值,A2优先接收到P3的Commit请求,就会把P1已经批准过进入Commit阶段的数据覆盖。而图6展示的是Acceptor将已批准值返回给Proposer的正常交互过程,最终节点数据状态是一致的。

QRZjMfU.png!mobile图6

在上面场景下,假如P1发起的Accept请求并未得到过半节点响应,如图7所示,之后P3发起提案P[m3,v3],若A1在接收到P3节点广播的Prepare请求时,先于A2节点将已Accept存储但未批准通过的值v1给P3,P3最终将提交的值替换为了v1,而P3原来预提交的值v3,会通过重新发起新的提案进行提交。这种情况下,作为发起提案P[m1,v1]的上层调用方,得到的反馈结果虽然是提案提交失败的,实际却是提交成功的状态。这时Paxos只能保证多节点数据最终状态一致,但保证不了事务执行的准确性。

vERzAbq.png!mobile图7

Case4:算法的活性

前面也提到,在多节点高并发发起提案的过程中,每个节点接收到的请求序列不确定性很大。假如有以下场景,集群中的两个节点P1、P3,并发发起了提案,P1发起P[m1,v1],P3发起P[m3,v3],Acceptor A2相继接收到Prepare(m1)与Prepare(m3)(m3>m1)请求,并做出响应,之后再接收到Accept(m1,v1)的请求,就会拒绝,P1的Accept请求被拒后,提升提案编号,重试发起提案P[m4,v1],A2接收到后会响应通过,而之后接收到P3发起Accept(m3,v3)请求同样会被拒,如图8所示,依次循环,有可能一直无法收敛。

FBrMjiY.png!mobile图8

优化这个问题常见两种方案,一种是在Prepare或Accept请求未被过半节点响应通过时,随机延迟一段时间再发起重试,这样能够降低一定程度冲突概率;还有一种方案是在集群中只允许一个Proposer发起提案,尽量避免冲突。

工程化实现

上面是对Basic Paxos算法的一些简单介绍,由于Basic Paxos每个提案的确定需要经过两阶段与多数派节点交互才能达成共识,多个节点并发发起提案有很大的冲突概率,所以它的性能并不是很高,不太适用于高并发场景。WPaxos组件除了实现基本Basic Paxos算法协议,同时还在算法交互层面做了以下生产级优化调整,如下,

1. 添加Master选举机制,可指定由Master串行发起Propose,但Propose的权限并不严格受限于Master节点,其它节点仍然有发起Propose的权限;

2. 在上一次Propose请求执行成功,并且整个过 程无任何请求执行超时或被拒的情况下,下次Propose可跳过Prepare阶段,网络RTT与写盘次数优化(2,2)->(1,1)。

NFFjyye.png!mobile

Node1节点在一次Prepare阶段通过后,如果Accept阶段也被通过并且没有任何节点拒绝接受,说明其它节点承若过的最大提案编号一直未发生变化,后面的提交过程,可以跳过Prepare阶段。一旦发现有请求被拒,则重新走Prepare阶段。

如果指定只有Master节点可以发起提案,多数情况下Propose过程都可以跳过Prepare阶段,性能有很大提升;但在没有指定Master节点或者Master选举竞争时,会出现多个节点并发发起提案的情况下,可能会遇到多个节点同时具备跳过Prepare阶段的条件,由于这些节点最后一次通过的Prepare阶段被批准的提案编号必不相同,所以在下一次发起提案直接进入Accept阶段时,只有提案编号最大的Accept请求才能被批准通过,不破坏Basic Paxos原有原则。

vYnaYzZ.png!mobile

3. 每次发起的提案编号由两部分组成[ProposeID,NodeID],在ProposeID相等的情况下,比较NodeID,NodeID是由ip、port组成,ProposeID只有在Prepare阶段递增,更新规则为:取已知提案编号的最大值+1,已知编号包括当前节点最新的ProposeID,以及发起Prepare、Accept请求被拒时,其它节点返回的已承诺最大ProposeID。

4. 每个发起的提案,除了提案编号还有一个InstanceID,用来标志当前提交的提案值为第几条数据记录,InstanceID为单调递增,每次确定一个值并提交至状态机执行成功,则InstanceID增加1,一旦一个InstanceID对应的值被确定,之后不会被修改。只有当接收到Paxos提案的InstanceID与当前节点期待写入的InstanceID一致,Acceptor才会对Paxos请求做判断响应处理,可以防止因网络并发或其它异常情况导致Paxos提案请求错乱处理。

Ubeua2q.png!mobile

还有其它一些生产级性能优化,如支持多Paxos分组,批量提交等,本文暂时不做详细介绍。

总结

本文通过结合一些异常场景对Basic Paxos算法进行了分析,并介绍了WPaxos开源项目中关于Paxos算法交互部分的工程化改造,希望能够帮助大家进一步理解Paxos算法,或者有机会将WPaxos应用到自己的分布式系统中。

如果上面有些实现细节介绍不太清楚的,欢迎大家阅读源码深入了解,或在WPaxos开源社区反馈咨询。附录中贴出WPaxos开源项目中的Paxos源码。

附录--源码解析:

1.  Proposer:Propose准备

public int newValue(byte[] value) {
// 初始化新的提案值,若上次提案提交成功,在准备接收新的instance记录前,value清空,反之,提交失败进行重试时,value不更新仍然为原有值;
if(this.proposerState.getValue().length == 0) {
this.proposerState.setValue(value);
}


// canSkipPrepare : 是否可以跳过prepare阶段;首次执行或上次propose执行失败时不可跳过;
// wasRejectBySomeone :上次propose过程是否被其它节点拒绝过;
if(this.canSkipPrepare && !this.wasRejectBySomeone) {
// 其它节点承诺过得提案编号未发生变化,本次可跳过prepare阶段
accept();
} else {
//if not reject by someone, no need to increase ballot
prepare(this.wasRejectBySomeone);
}
return 0;
}

2.   Proposer: 发起Prepare请求

public void prepare(boolean needNewBallot) {


// 退出上轮Accept阶段
exitAccept();
this.isPreparing = true;
this.canSkipPrepare = false;
this.wasRejectBySomeone = false;


// 清空历史prepare阶段,其它节点返回的已accept的提案信息
this.proposerState.resetHighestOtherPreAcceptBallot();
if(needNewBallot) {
// 提升proposeID:条件为首次propose或上次propose过程被其它节点拒绝过
this.proposerState.newPrepare();
}


PaxosMsg paxosMsg = new PaxosMsg();
paxosMsg.setMsgType(PaxosMsgType.paxosPrepare.getValue());
paxosMsg.setInstanceID(getInstanceID());
paxosMsg.setNodeID(this.pConfig.getMyNodeID());
paxosMsg.setProposalID(this.proposerState.getProposalID());


// 初始化新的Prepare阶段统计值
this.msgCounter.startNewRound();


// 添加prepare请求超时定时器,超时后,随机延迟一定时间后重新发起prepare请求
addPrepareTimer(0);


// Proposer同时也为Acceptor
int runSelfFirst = BroadcastMessageType.BroadcastMessage_Type_RunSelf_First.getType();
int sendType = MessageSendType.UDP.getValue();
paxosMsg.setTimestamp(System.currentTimeMillis());
// UDP广播请求到其它节点
broadcastMessage(paxosMsg, runSelfFirst, sendType);
}


public void newPrepare() {
// 取已知提案编号的最大值+1
long maxProposalId = this.proposalID > this.highestOtherProposalID ? this.proposalID : this.highestOtherProposalID;
this.proposalID = maxProposalId + 1;
}

3.   Acceptor:接收到Prepare请求并做出回应

public int onPrepare(PaxosMsg paxosMsg) {
PaxosMsg replyPaxosMsg = new PaxosMsg();
replyPaxosMsg.setInstanceID(getInstanceID());
replyPaxosMsg.setNodeID(this.pConfig.getMyNodeID());
replyPaxosMsg.setProposalID(
paxosMsg.getProposalID());
replyPaxosMsg.setMsgType(
PaxosMsgType.paxosPrepareReply.getValue());


// BallotNumber类用于封装提案编号
BallotNumber ballot = new BallotNumber(paxosMsg.getProposalID(), paxosMsg.getNodeID());
BallotNumber pbn = this.acceptorState.getPromiseBallot();
if(ballot.gt(pbn)) {
// PaxosMsg中的提案编号比当前承诺的提案编号大,承若不再接受比ballot更小的值
int ret = updateAcceptorState4Prepare(replyPaxosMsg, ballot);
if(ret != 0) return ret;
} else {
// PaxosMsg中的提案编号小于等于当前承诺的提案编号,拒绝prepare请求,并返回当前已承诺的最大提案编号
replyPaxosMsg.setRejectByPromiseID(this.acceptorState.getPromiseBallot().getProposalID());
}


long replyNodeId = paxosMsg.getNodeID();
sendMessage(replyNodeId, replyPaxosMsg);
return 0;
}


private int updateAcceptorState4Prepare(PaxosMsg replyPaxosMsg, BallotNumber ballot) {
// 返回已批准但未提交的最大提案编号,未批准过值时为0
replyPaxosMsg.setPreAcceptID(this.acceptorState.getAcceptedBallot().getProposalID());
replyPaxosMsg.setPreAcceptNodeID(this.acceptorState.getAcceptedBallot().getNodeId());


if(this.acceptorState.getAcceptedBallot().getProposalID() > 0) {
// 返回当前已批准但未提交的值
replyPaxosMsg.setValue(this.acceptorState.getAcceptedValue());
}


this.acceptorState.setPromiseBallot(ballot);
// acceptorState同步持久化存储
int ret = this.acceptorState.persist(getInstanceID(), getLastChecksum());
if(ret != 0) {
return -1;
}


return 0;
}

4. Proposer:Prepare返回结果处理

public void onPrepareReply(PaxosMsg paxosMsg) {
// prepare阶段已经退出,说明当前接收到的是一条历史过期prepare回复,不做处理
if(!this.isPreparing) {
return ;
}
// 与当前prepare阶段不一致,不做处理
if(paxosMsg.getProposalID() != this.proposerState.getProposalID()) {
return;
}


this.msgCounter.addReceive(paxosMsg.getNodeID());


if(paxosMsg.getRejectByPromiseID() == 0) {
// prepare请求得到响应
BallotNumber ballot = new BallotNumber(paxosMsg.getPreAcceptID(),paxosMsg.getPreAcceptNodeID());
// 统计已经批准通过的节点
this.msgCounter.addPromiseOrAccept(paxosMsg.getNodeID());
// 更新其它节点已经批准的最大提案值
this.proposerState.addPreAcceptValue(ballot, paxosMsg.getValue());
} else {
// prepare请求被拒
this.msgCounter.addReject(paxosMsg.getNodeID());
this.wasRejectBySomeone = true;
// 更新其它节点已承诺的最大提案编号
this.proposerState.setOtherProposalID(paxosMsg.getRejectByPromiseID());
}


if(this.msgCounter.isPassedOnThisRound()) {
// 过半节点通过则进入accept阶段
int useTimeMs = this.timeStat.point();
this.canSkipPrepare = true;
accept();
} else if(this.msgCounter.isRejectedOnThisRound() || this.msgCounter.isAllReceiveOnThisRound()) {
// 过半节点未通过,随机延迟10-40毫秒后再进行重试,此处随机重试机制在讲Paxos算法活性时提到过,可一定程度上降低提案冲突的概率
addPrepareTimer(OtherUtils.fastRand() % 30 + 10);
}
}

若接收到的其它节点已批准的提案值,则取最大编号的提案值作为Accept阶段预提交的提案值

public void addPreAcceptValue(BallotNumber otherPreAcceptBallot, byte[] otherPreAcceptValue) {
if(otherPreAcceptBallot.isNull()) {
return ;
}
if(otherPreAcceptBallot.gt(this.highestOtherPreAcceptBallot)) {
this.highestOtherPreAcceptBallot = otherPreAcceptBallot;
this.value = otherPreAcceptValue;
}
}

5. Proposer:发起Accept请求

public void accept() {
this.timeStat.point();


// 退出上轮prepare阶段
exitPrepare();
this.isAccepting = true;


PaxosMsg paxosMsg = new PaxosMsg();
paxosMsg.setMsgType(PaxosMsgType.paxosAccept.getValue());
paxosMsg.setInstanceID(getInstanceID());
paxosMsg.setNodeID(this.pConfig.getMyNodeID());
paxosMsg.setProposalID(this.proposerState.getProposalID());
paxosMsg.setValue(this.proposerState.getValue());
paxosMsg.setLastChecksum(getLastChecksum());


// 启动新的Accept阶段统计计数
this.msgCounter.startNewRound();


// 添加Accept请求超时定时器,超时后,重新发起prepare请求,当请求未得到正常响应时,很可能其它节点承诺过得最大提案编号已经发生变化,此时为了减少冲突,不直接进入Accept阶段重试;
addAcceptTimer(0);


int runSelfFirst = BroadcastMessageType.BroadcastMessage_Type_RunSelf_Final.getType();
int sendType = MessageSendType.UDP.getValue();


// 广播Accept请求到所有Acceptor,包括Proposer节点自己
broadcastMessage(paxosMsg, runSelfFirst, sendType);
}

6. Acceptor:接收到Accept请求并做出回应

public void onAccept(PaxosMsg paxosMsg) {
PaxosMsg replyPaxosMsg = new PaxosMsg();
replyPaxosMsg.setInstanceID(getInstanceID());
replyPaxosMsg.setNodeID(this.pConfig.getMyNodeID());
replyPaxosMsg.setProposalID(paxosMsg.getProposalID());
replyPaxosMsg.setMsgType(PaxosMsgType.paxosAcceptReply.getValue());


BallotNumber ballot = new BallotNumber(paxosMsg.getProposalID(), paxosMsg.getNodeID());
BallotNumber promiseBallot = this.acceptorState.getPromiseBallot();
if(ballot.ge(promiseBallot)) {
// 提案编号大于等于已承诺过得最大提案编号,则批准提案
this.acceptorState.setPromiseBallot(ballot);
BallotNumber acceptedBallot = new BallotNumber(ballot.getProposalID(),ballot.getNodeId());
this.acceptorState.setAcceptedBallot(acceptedBallot);
this.acceptorState.setAcceptedValue(paxosMsg.getValue());
// 持久化存储已批准提案状态
updateAcceptorState4Accept(replyPaxosMsg);
} else {
// 提案编号小于已承诺过得最大提案编号,则拒绝请求,并返回已承诺的最大提案编号
replyPaxosMsg.setRejectByPromiseID(this.acceptorState.getPromiseBallot().getProposalID());
}


long replyNodeId = paxosMsg.getNodeID();
sendMessage(replyNodeId, replyPaxosMsg);
}

7. Proposer:Accept返回结果处理

Accept请求返回结果处理同样是统计过半节点返回ack情况,过半节点通过后,广播提案提交请求到所有节点,优先由本节点的Learner执行状态机,若执行成功并且Propose请求未超时,则可以返回给上层逻辑执行数据提交成功。最终提案提交阶段,Proposer不需要等待其它节点执行成功的反馈。

public void onAcceptReply(PaxosMsg paxosMsg) {
if(!this.isAccepting) {
return ;
}

if(paxosMsg.getProposalID() != this.proposerState.getProposalID()) {
return ;
}

this.msgCounter.addReceive(paxosMsg.getNodeID());
if(paxosMsg.getRejectByPromiseID() == 0) {
this.msgCounter.addPromiseOrAccept(paxosMsg.getNodeID());
} else {
this.msgCounter.addReject(paxosMsg.getNodeID());
this.wasRejectBySomeone = true;
this.proposerState.setOtherProposalID(paxosMsg.getRejectByPromiseID());
}

if(this.msgCounter.isPassedOnThisRound()) {
// 向所有节点广播提交提案
int useTimeMs = this.timeStat.point();
exitAccept();
this.learner.proposerSendSuccess(getInstanceID(),this.proposerState.getProposalID());
} else if(this.msgCounter.isRejectedOnThisRound() || this.msgCounter.isAllReceiveOnThisRound()) {
// 同样随机延迟10-40ms后,重新发起prepare请求
addAcceptTimer(OtherUtils.fastRand() % 30 +10);
}
}


public void proposerSendSuccess(long learnInstanceID, long proposalID) {
PaxosMsg msg = new PaxosMsg();
msg.setMsgType(PaxosMsgType.paxosLearnerProposerSendSuccess.getValue());
msg.setInstanceID(learnInstanceID);
msg.setNodeID(this.pConfig.getMyNodeID());
msg.setProposalID(proposalID);
msg.setLastChecksum(getLastChecksum());

// 同样通过UDP广播给所有节点,进行提案提交
broadcastMessage(msg, BroadcastMessageType.
BroadcastMessage_Type_RunSelf_First.getType(),
MessageSendType.UDP.getValue());
}

8.Learner:校验提交的数据并执行状态机

public void onProposerSendSuccess(PaxosMsg paxosMsg) {
if(paxosMsg.getInstanceID() != getInstanceID()) {
//insantceID不一致时,不是期望写入的数据序列
logger.debug("InstanceID not same, skip msg, paxosMsg instanceID {}, now instanceID{}.", paxosMsg.getInstanceID(), getInstanceID());
return ;
}

if(this.acceptor.getAcceptorState().getAcceptedBallot().getProposalID() == 0) {
//尚未批准过任何值
logger.debug("I haven\'t accpeted any proposal");
return ;
}

BallotNumber ballot = new BallotNumber(paxosMsg.getProposalID(), paxosMsg.getNodeID());
BallotNumber thisBallot = this.acceptor.getAcceptorState().getAcceptedBallot();
if(thisBallot.getProposalID() != ballot.getProposalID() || thisBallot.getNodeId() != ballot.getNodeId()) {
// 与批准的提案编号不一致,有可能提案值发生变化
logger.warn("ProposalBallot not same to AcceptedBallot");
return ;
}

// 学习数据并执行状态机
this.learnerState.learnValueWithoutWrite(paxosMsg.getInstanceID(),this.acceptor.getAcceptorState().getAcceptedValue(),this.acceptor.getAcceptorState().getCheckSum());

// 将数据转发给当前节点的follower节点
transmitToFollower();
}

学习并执行状态机:

{
SMCtx smCtx = new SMCtx();
boolean isMyCommit =this.commitCtx.isMycommit(this.learner.getInstanceID(),this.learner.getLearnValue(), smCtx);

// ……
// 执行状态机
if (!smExecute(this.learner.getInstanceID(), this.learner.getLearnValue(), isMyCommit, smCtx)) {
// 状态机执行失败时,设置Commit结果提交失败
this.commitCtx.setResult(PaxosTryCommitRet.PaxosTryCommitRet_ExecuteFail.getRet(),this.learner.getInstanceID(), this.learner.getLearnValue());
// 取消跳过prepare阶段
this.proposer.cancelSkipPrepare();
return -1;
}

// 状态机执行成功时,设置Commit结果提交成功
this.commitCtx.setResult(PaxosTryCommitRet.PaxosTryCommitRet_OK.getRet(),this.learner.getInstanceID(), this.learner.getLearnValue());
// ……


// 更新checksum值
this.lastChecksum = this.learner.getNewChecksum();
// 清空instance状态,准备接收下一条instance数据
newInstance();
}

想了解更多开源项目信息?

与项目成员零距离交流?

扫码加入项目群

一切应有尽有

NjmYFr2.jpg!mobile

RRFj636.png!mobile

可添加“58技术小秘书”微信 : jishu-58

添加小秘书微信后由小秘书拉您进项目交流群

Nb2iMvQ.jpg!mobile


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK