42

Filecoin – Sector状态管理逻辑

 3 years ago
source link: https://www.lianyi.com/zixun/2850008
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

好久不看go语言的代码了,最近有空换换脑子,看看Sector的状态管理,纯go语言实现。看看大型项目的代码设计,对代码以及项目的设计开发有莫大的好处。Lotus的go语言部分的最新的代码,采用模块化设计。Lotus的代码也是一步步演进的,第一版的代码也是所有的逻辑耦合在一起,后面才逐步的模块化。

Lotus Miner将用户需要存储的数据“打包”成一个个Sector,并对Sector进行处理。本文就讲讲Sector的状态管理。

1. 模块框架

Sector状态管理相关的模块如下:

UjUbUrb.jpg!web

Sector的状态管理基于状态机。Lotus实现了通用的状态机(statemachine),在go-statemachine包中。在StateMachine之上,进一步抽象出了StateGroup,管理多个状态机。StateMachine只是实现状态的转变,具体状态是通过go-statestore进行存储。在StateMachine之上,定义状态转变的规则以及状态对应的处理函数,这些就是在具体的业务。SectorState就是Lotus管理Sector的具体业务。在这些底层模块的基础上,Lotus的相关代码调用就比较简单。先从状态管理的底层模块,StateMachine讲起:

2. StateMachine

StateMachine定义在go-statemachine包中,machine.go:

type StateMachine struct {

planner  Planner

eventsIn chan Event

name      interface{}

st        *statestore.StoredState

stateType reflect.Type

stageDone chan struct{}

closing   chan struct{}

closed    chan struct{}

busy int32

}

其中,planner是抽象出来的状态机的状态转化函数。Planner接收Event,结合当前的状态user,确定下一步的处理。

type Planner func(events []Event, user interface{}) (interface{}, uint64, error)

st是存储的状态。stateType是存储状态的类型。

StateMachine的核心是run函数,分成三部分:接收Event,状态处理,下一步的调用。其中状态处理是关键:

err := fsm.mutateUser(func(user interface{}) (err error) {

nextStep, processed, err = fsm.planner(pendingEvents, user)

ustate = user

if xerrors.Is(err, ErrTerminated) {

terminated = true

return nil

}

return err

})

mutateUser就是查看当前存储的状态(StoredState),并执行planner函数,并将planner处理后的状态存储。planner函数返回下一步的处理,已经处理的Event 的个数,有可能的出错。run函数会启动另外一个go routine执行nextStep。

uENzuuN.jpg!web

mutateUser具体的实现是利用go的反射(reflect),实现抽象和模块化。相关的逻辑感兴趣的小伙伴可以自己查看。

3. StateGroup

往往业务需要很多StateMachine。举个例子,Lotus的Miner存储多个Sector,每个Sector都由一个StateMachine维护独立的状态。StateGroup就是实现多个“同样”的StateMachine,定义在group.go中。

type StateGroup struct {

sts       *statestore.StateStore

hnd       StateHandler

stateType reflect.Type

closing      chan struct{}

initNotifier sync.Once

lk  sync.Mutex

sms map[datastore.Key]*StateMachine

}

其中,sms就是StateMachine的状态数组。StateHandler是StateMachine的状态处理函数接口:

type StateHandler interface {

Plan(events []Event, user interface{}) (interface{}, uint64, error)

}

也就是说,在StateMachine状态机上,需要实现StateHandler接口(Plan函数,实现状态的转换)。

4. StoredState

StoredState是抽象的Key-Value的存储。相对比较简单,Get/Put接口,非常容易理解。

5. SectorState

storage-fsm实现了和Sector状态相关的业务逻辑。也就是状态的定义,状态的转换函数都是在这个包里实现。整个Sector的信息定义在storage-fsm/types.go中:

type SectorInfo struct {

State        SectorState

SectorNumber abi.SectorNumber

Nonce        uint64

SectorType abi.RegisteredProof

// Packing

Pieces []Piece

// PreCommit1

TicketValue   abi.SealRandomness

TicketEpoch   abi.ChainEpoch

PreCommit1Out storage.PreCommit1Out

// PreCommit2

CommD *cid.Cid

CommR *cid.Cid

Proof []byte

PreCommitMessage *cid.Cid

// WaitSeed

SeedValue abi.InteractiveSealRandomness

SeedEpoch abi.ChainEpoch

// Committing

CommitMessage *cid.Cid

InvalidProofs uint64

// Faults

FaultReportMsg *cid.Cid

// Debug

LastErr string

Log []Log

}

SectorInfo包括了Sector状态,Precommit1/2的数据,Committing的数据等等。其中,SectorState描述了Sector的具体状态。Sector的所有的状态定义在sector_state.go文件中:

const (

UndefinedSectorState SectorState = “”

// happy path

Empty          SectorState = “Empty”

Packing        SectorState = “Packing”       // sector not in sealStore, and not on chain

PreCommit1     SectorState = “PreCommit1”    // do PreCommit1

PreCommit2     SectorState = “PreCommit2”    // do PreCommit1

PreCommitting  SectorState = “PreCommitting” // on chain pre-commit

WaitSeed       SectorState = “WaitSeed”      // waiting for seed

Committing     SectorState = “Committing”

CommitWait     SectorState = “CommitWait” // waiting for message to land on chain

FinalizeSector SectorState = “FinalizeSector”

Proving        SectorState = “Proving”

// error modes

FailedUnrecoverable SectorState = “FailedUnrecoverable”

SealFailed          SectorState = “SealFailed”

PreCommitFailed     SectorState = “PreCommitFailed”

ComputeProofFailed  SectorState = “ComputeProofFailed”

CommitFailed        SectorState = “CommitFailed”

PackingFailed       SectorState = “PackingFailed”

Faulty              SectorState = “Faulty”        // sector is corrupted or gone for some reason

FaultReported       SectorState = “FaultReported” // sector has been declared as a fault on chain

FaultedFinal        SectorState = “FaultedFinal”  // fault declared on chain

)

熟悉SDR算法的小伙伴,会发现很多熟悉的字眼:PreCommit1,PreCommit2,Commiting等等。结合状态处理函数,就能很清楚各个状态的含义以及需要处理的内容。

fsm.go中Sealing结构的Plan函数是Sector状态的处理函数:

func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface{}, uint64, error) {

next, err := m.plan(events, user.(*SectorInfo))

if err != nil || next == nil {

return nil, uint64(len(events)), err

}

return func(ctx statemachine.Context, si SectorInfo) error {

err := next(ctx, si)

if err != nil {

log.Errorf(“unhandled sector error (%d): %+v”, si.SectorNumber, err)

return nil

}

return nil

}, uint64(len(events)), nil // TODO: This processed event count is not very correct

}

Plan函数的实现也比较简单,调用plan处理当前的状态,并返回接下来需要处理的函数。状态的处理又可以分成两部分来看:1/ 状态转换的定义 2/ 状态的处理。

针对Sector的状态转换的定义,在fsmPlanners中:

var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *SectorInfo) error{

UndefinedSectorState: planOne(on(SectorStart{}, Packing)),

Packing:              planOne(on(SectorPacked{}, PreCommit1)),

PreCommit1: planOne(

on(SectorPreCommit1{}, PreCommit2),

on(SectorSealPreCommitFailed{}, SealFailed),

on(SectorPackingFailed{}, PackingFailed),

),

PreCommit2: planOne(

on(SectorPreCommit2{}, PreCommitting),

on(SectorSealPreCommitFailed{}, SealFailed),

on(SectorPackingFailed{}, PackingFailed),

),

比如说,在PreCommit1的状态,接收到SectorPreCommit1事件,将进入PreCommit2状态。所有Sector的状态转换如下图:

ZVZjAru.jpg!web

在处理好状态好,就进入相应状态的处理程序。以PreCommit1状态为例,相应的处理函数是handlePreCommit1。

func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo) error {

tok, epoch, err := m.api.ChainHead(ctx.Context())

pc1o, err := m.sealer.SealPreCommit1(ctx.Context(), m.minerSector(sector.SectorNumber), ticketValue, sector.pieceInfos())

if err != nil {

return ctx.Send(SectorSealPreCommitFailed{xerrors.Errorf(“seal pre commit(1) failed: %w”, err)})

}

}

很清楚看出,在PreCommit1的处理函数中,会通过SealPrecommit1调用rust-fil-proofs实现Precommit1的计算。最后总结一下,各个状态的含义:

Empty – 空状态 

Packing – 打包状态,多个Piece填充到一个Sector中 

PreCommit1 – PreCommit1计算 

PreCommit2 – PreCommit2计算 

PreCommitting – 提交Precommit2的结果到链上 

WaitSeed – 等待随机种子(给定10个区块的时间,让随机数种子不可提前预测) 

Committing – 计算Commit1/Commit2,并将证明提交到链上 

CommitWait – 等待链上确认 FinalizeSector – Sector状态确定

总结:

Sector的状态管理基于状态机。通用状态机的实现是通过go-statemachine实现。状态的存储通过go-statestore实现。在这些模块的基础上,storage-fsm实现了Sector的状态定义以及状态处理函数。

根据国家《 关于防范代币发行融资风险的公告 》,大家应警惕代币发行融资与交易的风险隐患。

本文来自 LIANYI 转载,不代表链一财经立场,转载请联系原作者。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK