1

Golang框架实战-KisFlow流式计算框架(8)-KisFlow Action

 2 months ago
source link: https://studygolang.com/articles/36543
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Golang框架实战-KisFlow流式计算框架专栏

Golang框架实战-KisFlow流式计算框架(1)-概述

Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)

Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)

Golang框架实战-KisFlow流式计算框架(4)-数据流

Golang框架实战-KisFlow流式计算框架(5)-Function调度

Golang框架实战-KisFlow流式计算框架(6)-Connector

Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出


7.1 Action Abort(终止流程)

KisFlow Action 是指在执行Function的时候,同时可以控制Flow的调度逻辑,KisFlow提供一些Action动作让开发者做选择,本节先介绍最简单的Action动作,Abort(终止当前Flow)。

我们最终的Abort的使用形式如下:

func AbortFuncHandler(ctx context.Context, flow kis.Flow) error {
    fmt.Println("---> Call AbortFuncHandler ----")

    for _, row := range flow.Input() {
        str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
        fmt.Println(str)
    }

    return flow.Next(kis.ActionAbort)  // 终止Flow
}

AbortFuncHandler()是一个Function 的业务回调方法,是由开发者自定义的,在执行完当前Funciton之后,正常的情况是继续执行下一个Funciton,但是如果传递flow.Next(kis.ActionAbort) 作为当前Funciton的返回值,那么则不会执行到下一个Funciton,而是直接终止当前Flow的调度计算流。

下面我们先来实现KisFlow的 Abort Action动作模式。

7.1.1 Abort接口定义

首先,先对Flow的Abort()接口做定义。

kis-flow/kis/flow.go

type Flow interface {
    // Run 调度Flow,依次调度Flow中的Function并且执行
    Run(ctx context.Context) error
    // Link 将Flow中的Function按照配置文件中的配置进行连接
    Link(fConf *config.KisFuncConfig, fParams config.FParam) error
    // CommitRow 提交Flow数据到即将执行的Function层
    CommitRow(row interface{}) error
    // Input 得到flow当前执行Function的输入源数据
    Input() common.KisRowArr
    // GetName 得到Flow的名称
    GetName() string
    // GetThisFunction 得到当前正在执行的Function
    GetThisFunction() Function
    // GetThisFuncConf 得到当前正在执行的Function的配置
    GetThisFuncConf() *config.KisFuncConfig
    // GetConnector 得到当前正在执行的Function的Connector
    GetConnector() (Connector, error)
    // GetConnConf 得到当前正在执行的Function的Connector的配置
    GetConnConf() (*config.KisConnConfig, error)
    // GetConfig 得到当前Flow的配置
    GetConfig() *config.KisFlowConfig
    // GetFuncConfigByName 得到当前Flow的配置
    GetFuncConfigByName(funcName string) *config.KisFuncConfig

    //  --- KisFlow Action ---
    // Next 当前Flow执行到的Function进入下一层Function所携带的Action动作
    Next(acts ...ActionFunc) error
}

这里面提供一个接口Next(acts ...ActionFunc) error,其中参数是一个可变参数,类型为ActionFunc,这个是我们给KisFlow定义的Action相关的方法。有关Action的定义模块如下:

7.1.2 Action模块定义

Action是用来在Flow执行过程中,通过Function来控制Flow执行特殊动作的行为配置模块,包括上面的Abort行为,Abort也属于其中一个Action。Action的模块定义如下,在kis-flow/kis/下创建action.go文件,实现:

kis-flow/kis/action.go

package kis

// Action KisFlow执行流程Actions
type Action struct {
    // Abort 终止Flow的执行
    Abort bool
}

// ActionFunc KisFlow Functional Option 类型
type ActionFunc func(ops *Action)

// LoadActions 加载Actions,依次执行ActionFunc操作函数
func LoadActions(acts []ActionFunc) Action {
    action := Action{}

    if acts == nil {
        return action
    }

    for _, act := range acts {
        act(&action)
    }

    return action
}

// ActionAbort 终止Flow的执行
func ActionAbort(action *Action) {
    action.Abort = true
}

首先,现在Action只有Abort一个行为,我们用bool类型来表示Abort是否为终止,true则为需要终止flow的调用。 其次,type ActionFunc func(ops *Action)这个函数原型为一个函数类型,函数的形参是传递进来一个Action{} 指针,而 func ActionAbort(action *Action)则是它的一个具体的函数,ActionAbort()的方法的目的就是将Action的Abort成员设置为true。

最后看func LoadActions(acts []ActionFunc) Action方法。这个形参是一个ActionFunc函数数组,LoadActions()则是创建一个新的Action{} ,然后依次执行[]ActionFunc的函数来改变Aciton{}的成员,最终将新的Action{}返回上层。

7.1.3 Next方法实现

接下来,我们需要给KisFlow模块实现这个接口,首先需要给KisFlow添加一个Action{}成员,表示每次执行完Function之后所携带的动作。

kis-flow/flow/kis_flow.go

// KisFlow 用于贯穿整条流式计算的上下文环境
type KisFlow struct {
    // 基础信息
    Id   string                // Flow的分布式实例ID(用于KisFlow内部区分不同实例)
    Name string                // Flow的可读名称
    Conf *config.KisFlowConfig // Flow配置策略

    // Function列表
    Funcs          map[string]kis.Function // 当前flow拥有的全部管理的全部Function对象, key: FunctionName
    FlowHead       kis.Function            // 当前Flow所拥有的Function列表表头
    FlowTail       kis.Function            // 当前Flow所拥有的Function列表表尾
    flock          sync.RWMutex            // 管理链表插入读写的锁
    ThisFunction   kis.Function            // Flow当前正在执行的KisFunction对象
    ThisFunctionId string                  // 当前执行到的Function ID
    PrevFunctionId string                  // 当前执行到的Function 上一层FunctionID

    // Function列表参数
    funcParams map[string]config.FParam // flow在当前Function的自定义固定配置参数,Key:function的实例KisID, value:FParam
    fplock     sync.RWMutex             // 管理funcParams的读写锁

    // 数据
    buffer common.KisRowArr  // 用来临时存放输入字节数据的内部Buf, 一条数据为interface{}, 多条数据为[]interface{} 也就是KisBatch
    data   common.KisDataMap // 流式计算各个层级的数据源
    inPut  common.KisRowArr  // 当前Function的计算输入数据

    // +++++++++++++++++++++

    // KisFlow Action
    action kis.Action        // 当前Flow所携带的Action动作
}

然后实现KisFlow的Next()接口,如下:

kis-flow/flow/kis_flow.go

// Next 当前Flow执行到的Function进入下一层Function所携带的Action动作
func (flow *KisFlow) Next(acts ...kis.ActionFunc) error {

    // 加载Function FaaS 传递的 Action动作
    flow.action = kis.LoadActions(acts)

    return nil
}

每次开发者在执行Function的自定义业务回调中,最后会调用flow.Next()来传递Action,所以Next(acts ...kis.ActionFunc) error就是讲传递的Action属性加载进来并且在flow.action保存。

7.1.4 Abort控制Flow流程

现在有个Abort来控制Flow流,那么我们需要给KisFlow添加一个成员来表示这个状态

kis-flow/flow/kis_flow.go

// KisFlow 用于贯穿整条流式计算的上下文环境
type KisFlow struct {
    // 基础信息
    Id   string                // Flow的分布式实例ID(用于KisFlow内部区分不同实例)
    Name string                // Flow的可读名称
    Conf *config.KisFlowConfig // Flow配置策略

    // Function列表
    Funcs          map[string]kis.Function // 当前flow拥有的全部管理的全部Function对象, key: FunctionName
    FlowHead       kis.Function            // 当前Flow所拥有的Function列表表头
    FlowTail       kis.Function            // 当前Flow所拥有的Function列表表尾
    flock          sync.RWMutex            // 管理链表插入读写的锁
    ThisFunction   kis.Function            // Flow当前正在执行的KisFunction对象
    ThisFunctionId string                  // 当前执行到的Function ID
    PrevFunctionId string                  // 当前执行到的Function 上一层FunctionID

    // Function列表参数
    funcParams map[string]config.FParam // flow在当前Function的自定义固定配置参数,Key:function的实例KisID, value:FParam
    fplock     sync.RWMutex             // 管理funcParams的读写锁

    // 数据
    buffer common.KisRowArr  // 用来临时存放输入字节数据的内部Buf, 一条数据为interface{}, 多条数据为[]interface{} 也就是KisBatch
    data   common.KisDataMap // 流式计算各个层级的数据源
    inPut  common.KisRowArr  // 当前Function的计算输入数据
    action kis.Action        // 当前Flow所携带的Action动作

    // +++++++++
    abort  bool              // 是否中断Flow
}

在每次执行到flow.Run()方法时,需要重置abort变量,并且在循环调度的时候加上对flow.abort的判断。

kis-flow/flow/kis_flow.go

// Run 启动KisFlow的流式计算, 从起始Function开始执行流
func (flow *KisFlow) Run(ctx context.Context) error {

    // +++++++++
    // 重置 abort
    flow.abort = false  //  每次进入调度,要重置abort状态

    // ... ...

    // ... ...

    //流式链式调用
    for fn != nil && flow.abort != true { // ++++ 如果设置abort则不进入下次循环调度

        // ... ...
        // ... ...

        if err := fn.Call(ctx, flow); err != nil {
            //Error
            return err
        } else {
            //Success

            // ... ...

            fn = fn.Next()
        }
    }

    return nil

这样在每次Call()调度到Funciton的自定方法时,如果return flow.Next(ActionAbort)就会对flow的Action状态进行改变,从而就控制了flow的流程终止。最后就是将Action的Abort状态传递给KisFlow的Abort状态。

既然有了Abort状态,那么我们可以通过给Flow执行过程中添加一个设定,如果当前的Function没有提交本层的结果数据,也就是flow.buffer为空,那么将不会进入下一层,在本层直接结束退出Flow的Run()调用。

kis-flow/flow/kis_flow_data.go

//commitCurData 提交Flow当前执行Function的结果数据
func (flow *KisFlow) commitCurData(ctx context.Context) error {

    // 判断本层计算是否有结果数据,如果没有则退出本次Flow Run循环
    if len(flow.buffer) == 0 {
        // ++++++++++++
        flow.abort = true
        return nil
    }

    // ... ...
    // ... ...

    return nil

7.1.5 Action捕获及处理

接下来来实现一个专门处理Action动作的方法,定义在kis-flow/flow/kis_flow_action.go文件中,如下:

kis-flow/flow/kis_flow_action.go

package flow

import (
    "context"
    "errors"
    "fmt"
    "kis-flow/kis"
)

// dealAction  处理Action,决定接下来Flow的流程走向
func (flow *KisFlow) dealAction(ctx context.Context, fn kis.Function) (kis.Function, error) {

    if err := flow.commitCurData(ctx); err != nil {
        return nil, err
    }

    // 更新上一层 FuncitonId 游标
    flow.PrevFunctionId = flow.ThisFunctionId
    fn = fn.Next()

    // Abort Action 强制终止
    if flow.action.Abort {
        flow.abort = true
    }

    // 清空Action
    flow.action = kis.Action{}

    return fn, nil
}

然后稍微改进下KisFlow的Run() 流程,将dealAction()方法嵌入进去。

kis-flow/flow/kis_flow.go

// Run 启动KisFlow的流式计算, 从起始Function开始执行流
func (flow *KisFlow) Run(ctx context.Context) error {

    var fn kis.Function

    fn = flow.FlowHead
    flow.abort = false

    if flow.Conf.Status == int(common.FlowDisable) {
        //flow被配置关闭
        return nil
    }

    // 因为此时还没有执行任何Function, 所以PrevFunctionId为FirstVirtual 因为没有上一层Function
    flow.PrevFunctionId = common.FunctionIdFirstVirtual

    // 提交数据流原始数据
    if err := flow.commitSrcData(ctx); err != nil {
        return err
    }

    //流式链式调用
    for fn != nil && flow.abort == false {

        // flow记录当前执行到的Function 标记
        fid := fn.GetId()
        flow.ThisFunction = fn
        flow.ThisFunctionId = fid

        // 得到当前Function要处理与的源数据
        if inputData, err := flow.getCurData(); err != nil {
            log.Logger().ErrorFX(ctx, "flow.Run(): getCurData err = %s\n", err.Error())
            return err
        } else {
            flow.inPut = inputData
        }

        if err := fn.Call(ctx, flow); err != nil {
            //Error
            return err
        } else {
            //Success

            // +++++++++++++++++++++++++++++++
            fn, err = flow.dealAction(ctx, fn)
            if err != nil {
                return err
            }
            // +++++++++++++++++++++++++++++++
        }
    }

    return nil
}

7.1.6 Action Abort单元测试

首先我们新建一个Function业务,配置文件如下:

kis-flow/test/load_conf/func/func-AbortFunc.yml

kistype: func
fname: abortFunc
fmode: Calculate
source:
  name: 用户订单错误率
  must:
    - order_id
    - user_id

当前的Funciton的名称为abortFunc,然后实现其FaaS函数,如下:

kis-flow/test/faas/faas_abort.go

package faas

import (
    "context"
    "fmt"
    "kis-flow/kis"
)

// type FaaS func(context.Context, Flow) error

func AbortFuncHandler(ctx context.Context, flow kis.Flow) error {
    fmt.Println("---> Call AbortFuncHandler ----")

    for _, row := range flow.Input() {
        str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
        fmt.Println(str)
    }

    return flow.Next(kis.ActionAbort)
}

这个Function就会最终调用flow.Next(kis.ActionAbort)来终止Flow,接下来我们新建一个Flow,将上面的Function作为中间的Function,看检测是否会终止之后的Function被执行。 新建的flow的配置如下:

kis-flow/test/load_conf/flow/flow-FlowName2.yml

kistype: flow
status: 1
flow_name: flowName2
flows:
  - fname: funcName1
  - fname: abortFunc
  - fname: funcName3

当前Flow的名称为flowName2,当前的Flow有三个Function,其中funcNam1 和 funcName2我们之前都已经定义好了,abortFunc是我们新建的,并且在中间。如果abort功能满足,则funcName3将不会被调度。

接下来实现单元测试用例。

kis-flow/test/kis_action_test.go

package test

import (
    "context"
    "kis-flow/common"
    "kis-flow/file"
    "kis-flow/kis"
    "kis-flow/test/caas"
    "kis-flow/test/faas"
    "testing"
)

func TestActionAbort(t *testing.T) {
    ctx := context.Background()

    // 0. 注册Function 回调业务
    kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
    kis.Pool().FaaS("abortFunc", faas.AbortFuncHandler) // 添加abortFunc 业务
    kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)

    // 0. 注册ConnectorInit 和 Connector 回调业务
    kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
    kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)

    // 1. 加载配置文件并构建Flow
    if err := file.ConfigImportYaml("/Users/gopath/src/kis-flow/test/load_conf/"); err != nil {
        panic(err)
    }

    // 2. 获取Flow
    flow1 := kis.Pool().GetFlow("flowName2")

    // 3. 提交原始数据
    _ = flow1.CommitRow("This is Data1 from Test")
    _ = flow1.CommitRow("This is Data2 from Test")
    _ = flow1.CommitRow("This is Data3 from Test")

    // 4. 执行flow1
    if err := flow1.Run(ctx); err != nil {
        panic(err)
    }
}

其中下面的代码是初始化注册的代码,大家也可以写在其他文件中,这样就不需要每次都携带这部分代码了。

    // 0. 注册Function 回调业务
    kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
    kis.Pool().FaaS("abortFunc", faas.AbortFuncHandler) // 添加abortFunc 业务
    kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)

    // 0. 注册ConnectorInit 和 Connector 回调业务
    kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
    kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)

cd 到kis-flow/test/目录下执行如下指令:

go test -test.v -test.paniconexit0 -test.run  TestActionAbort

结果如下:

=== RUN   TestActionAbort
Add KisPool FuncName=funcName1
Add KisPool FuncName=abortFunc
Add KisPool FuncName=funcName3
Add KisPool CaaSInit CName=ConnName1
Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode =Save
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2]}
Add FlowRouter FlowName=flowName1

Add FlowRouter FlowName=flowName2

context.Background
====> After CommitSrcData, flow_name = flowName2, flow_id = flow-b6b90eb4b7d7457fbf85b3299b625513
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]

KisFunctionV, flow = &{Id:flow-b6b90eb4b7d7457fbf85b3299b625513 Name:flowName2 Conf:0xc000092cc0 Funcs:map[abortFunc:0xc000094d20 funcName1:0xc000094cc0 funcName3:0xc000094d80] FlowHead:0xc000094cc0 FlowTail:0xc000094d80 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000094cc0 ThisFunctionId:func-c435cf9f8e3346a1851f8c76375fce0f PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-7f5af1521fd64d08839d5bdd26de5254:map[] func-c435cf9f8e3346a1851f8c76375fce0f:map[] func-f0b80593fe2e4018a878f155b9c543b4:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] jumpFunc:NoJump abort:false nextOpt:<nil>}

---> Call funcName1Handler ----
In FuncName = funcName1, FuncId = func-c435cf9f8e3346a1851f8c76375fce0f, row = This is Data1 from Test
In FuncName = funcName1, FuncId = func-c435cf9f8e3346a1851f8c76375fce0f, row = This is Data2 from Test
In FuncName = funcName1, FuncId = func-c435cf9f8e3346a1851f8c76375fce0f, row = This is Data3 from Test
context.Background
 ====> After commitCurData, flow_name = flowName2, flow_id = flow-b6b90eb4b7d7457fbf85b3299b625513
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-c435cf9f8e3346a1851f8c76375fce0f:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]

KisFunctionC, flow = &{Id:flow-b6b90eb4b7d7457fbf85b3299b625513 Name:flowName2 Conf:0xc000092cc0 Funcs:map[abortFunc:0xc000094d20 funcName1:0xc000094cc0 funcName3:0xc000094d80] FlowHead:0xc000094cc0 FlowTail:0xc000094d80 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000094d20 ThisFunctionId:func-7f5af1521fd64d08839d5bdd26de5254 PrevFunctionId:func-c435cf9f8e3346a1851f8c76375fce0f funcParams:map[func-7f5af1521fd64d08839d5bdd26de5254:map[] func-c435cf9f8e3346a1851f8c76375fce0f:map[] func-f0b80593fe2e4018a878f155b9c543b4:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-c435cf9f8e3346a1851f8c76375fce0f:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] jumpFunc:NoJump abort:false nextOpt:<nil>}

---> Call AbortFuncHandler ----
In FuncName = abortFunc, FuncId = func-7f5af1521fd64d08839d5bdd26de5254, row = data from funcName[funcName1], index = 0
In FuncName = abortFunc, FuncId = func-7f5af1521fd64d08839d5bdd26de5254, row = data from funcName[funcName1], index = 1
In FuncName = abortFunc, FuncId = func-7f5af1521fd64d08839d5bdd26de5254, row = data from funcName[funcName1], index = 2
--- PASS: TestActionAbort (0.00s)
PASS
ok      kis-flow/test   0.487s

通过结果可以看到,在执行完 AbortFuncHandler 后,没有继续执行,而是退出了Flow的Run()方法。

7.2 Action DataReuse(复用上层数据)

Action DataReuse 为服用上层数据,含义为,当前的执行Function提交到下一层的结果将不被使用,而是直接将当前Function的上一层结果数据,复用到下一层,作为下一层Funciton的数据源。

下面来实现Action DataReuse功能。

7.2.1 DataReuse Action添加

在Action中添加DataReuse成员,是一个bool类型。

kis-flow/kis/action.go

// Action KisFlow执行流程Actions
type Action struct {
    // +++++++++++++
    // DataReuse 是否复用上层Function数据
    DataReuse bool

    // Abort 终止Flow的执行
    Abort bool
}


// ActionDataReuse Next复用上层Function数据Option
func ActionDataReuse(act *Action) {
    act.DataReuse = true
}

然后提供一个ActionFunc,命名为:ActionDataReuse,实现中为改变DataReuse状态为true。

7.2.2 复用上层数据到下层

这里需要再实现一个提交数据的方法,为如何提交复用数据,具体逻辑如下:

kis-flow/flow/kis_flow_data.go

// commitReuseData
func (flow *KisFlow) commitReuseData(ctx context.Context) error {

    // 判断上层是否有结果数据, 如果没有则退出本次Flow Run循环
    if len(flow.data[flow.PrevFunctionId]) == 0 {
        flow.abort = true
        return nil
    }

    // 本层结果数据等于上层结果数据(复用上层结果数据到本层)
    flow.data[flow.ThisFunctionId] = flow.data[flow.PrevFunctionId]

    // 清空缓冲Buf (如果是ReuseData选项,那么提交的全部数据,都将不会携带到下一层)
    flow.buffer = flow.buffer[0:0]

    log.Logger().DebugFX(ctx, " ====> After commitReuseData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data)

    return nil
}

逻辑很简单,与commitCurData()不同的是,commitCurData()为将flow.buffer的数据提交到flow.data[flow.ThisFunctionId]中,而commitReuseData()为将上一层的结果数据提交到flow.data[flow.ThisFunctionId]中。

7.2.3 处理DataReuse Action动作

然后在dealAction()方法中添加对Action DataReuse的动作捕获,如下:

kis-flow/flow/kis_flow_action.go

// dealAction  处理Action,决定接下来Flow的流程走向
func (flow *KisFlow) dealAction(ctx context.Context, fn kis.Function) (kis.Function, error) {

    // ++++++++++++++++
    // DataReuse Action
    if flow.action.DataReuse {
        if err := flow.commitReuseData(ctx); err != nil {
            return nil, err
        }
    } else {
        if err := flow.commitCurData(ctx); err != nil {
            return nil, err
        }
    }


    // 更新上一层 FuncitonId 游标
    flow.PrevFunctionId = flow.ThisFunctionId
    fn = fn.Next()

    // Abort Action 强制终止
    if flow.action.Abort {
        flow.abort = true
    }

    // 清空Action
    flow.action = kis.Action{}

    return fn, nil
}

7.2.4 单元测试

下面来针对DataReuse做单元测试,首先创建一个名字为dataReuseFunc 的Funciton,先创建配置文件。

kis-flow/test/load_conf/func/func-dataReuseFunc.yml

kistype: func
fname: dataReuseFunc
fmode: Calculate
source:
  name: 用户订单错误率
  must:
    - order_id
    - user_id

同时新建一个Flow流,名称为flowName3,配置如下:

kis-flow/test/load_conf/flow/func-FlowName3.yml

kistype: flow
status: 1
flow_name: flowName3
flows:
  - fname: funcName1
  - fname: dataReuseFunc
  - fname: funcName3

针对dataReuseFunc的Function的逻辑业务,如下:

kis-flow/test/faas/faas_data_reuse.go

package faas

import (
    "context"
    "fmt"
    "kis-flow/kis"
)

// type FaaS func(context.Context, Flow) error

func DataReuseFuncHandler(ctx context.Context, flow kis.Flow) error {
    fmt.Println("---> Call DataReuseFuncHandler ----")

    for index, row := range flow.Input() {
        str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
        fmt.Println(str)

        // 计算结果数据
        resultStr := fmt.Sprintf("data from funcName[%s], index = %d", flow.GetThisFuncConf().FName, index)

        // 提交结果数据
        _ = flow.CommitRow(resultStr)
    }

    return flow.Next(kis.ActionDataReuse)
}

最后实现测试用例,如下:

kis-flow/test/kis_action_test.go

func TestActionDataReuse(t *testing.T) {
    ctx := context.Background()

    // 0. 注册Function 回调业务
    kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
    kis.Pool().FaaS("dataReuseFunc", faas.DataReuseFuncHandler) // 添加dataReuesFunc 业务
    kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)

    // 0. 注册ConnectorInit 和 Connector 回调业务
    kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
    kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)

    // 1. 加载配置文件并构建Flow
    if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil {
        panic(err)
    }

    // 2. 获取Flow
    flow1 := kis.Pool().GetFlow("flowName3")

    // 3. 提交原始数据
    _ = flow1.CommitRow("This is Data1 from Test")
    _ = flow1.CommitRow("This is Data2 from Test")
    _ = flow1.CommitRow("This is Data3 from Test")

    // 4. 执行flow1
    if err := flow1.Run(ctx); err != nil {
        panic(err)
    }
}

cd 到 kis-flow/test/下执行:

go test -test.v -test.paniconexit0 -test.run  TestActionDataReuse
=== RUN   TestActionDataReuse
Add KisPool FuncName=funcName1
Add KisPool FuncName=dataReuseFunc
Add KisPool FuncName=funcName3
Add KisPool CaaSInit CName=ConnName1
Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode =Save
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2]}
Add FlowRouter FlowName=flowName5
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2 funcName2]}
Add FlowRouter FlowName=flowName1
Add FlowRouter FlowName=flowName2
Add FlowRouter FlowName=flowName3
Add FlowRouter FlowName=flowName4
context.Background
====> After CommitSrcData, flow_name = flowName3, flow_id = flow-2c1a23d9587842bebaeee490319de81f
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]

KisFunctionV, flow = &{Id:flow-2c1a23d9587842bebaeee490319de81f Name:flowName3 Conf:0xc000092dc0 Funcs:map[dataReuseFunc:0xc000095620 funcName1:0xc0000955c0 funcName3:0xc000095680] FlowHead:0xc0000955c0 FlowTail:0xc000095680 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0000955c0 ThisFunctionId:func-7886178381634f05b302841141382e59 PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-7886178381634f05b302841141382e59:map[] func-cfe66e39aba54ff989d6764cc4edda20:map[] func-ef567879d0dd45b287ed709e549e9d32:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}

---> Call funcName1Handler ----
In FuncName = funcName1, FuncId = func-7886178381634f05b302841141382e59, row = This is Data1 from Test
In FuncName = funcName1, FuncId = func-7886178381634f05b302841141382e59, row = This is Data2 from Test
In FuncName = funcName1, FuncId = func-7886178381634f05b302841141382e59, row = This is Data3 from Test
context.Background
 ====> After commitCurData, flow_name = flowName3, flow_id = flow-2c1a23d9587842bebaeee490319de81f
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-7886178381634f05b302841141382e59:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]

KisFunctionC, flow = &{Id:flow-2c1a23d9587842bebaeee490319de81f Name:flowName3 Conf:0xc000092dc0 Funcs:map[dataReuseFunc:0xc000095620 funcName1:0xc0000955c0 funcName3:0xc000095680] FlowHead:0xc0000955c0 FlowTail:0xc000095680 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000095620 ThisFunctionId:func-ef567879d0dd45b287ed709e549e9d32 PrevFunctionId:func-7886178381634f05b302841141382e59 funcParams:map[func-7886178381634f05b302841141382e59:map[] func-cfe66e39aba54ff989d6764cc4edda20:map[] func-ef567879d0dd45b287ed709e549e9d32:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-7886178381634f05b302841141382e59:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}

---> Call DataReuseFuncHandler ----
In FuncName = dataReuseFunc, FuncId = func-ef567879d0dd45b287ed709e549e9d32, row = data from funcName[funcName1], index = 0
In FuncName = dataReuseFunc, FuncId = func-ef567879d0dd45b287ed709e549e9d32, row = data from funcName[funcName1], index = 1
In FuncName = dataReuseFunc, FuncId = func-ef567879d0dd45b287ed709e549e9d32, row = data from funcName[funcName1], index = 2
context.Background
 ====> After commitReuseData, flow_name = flowName3, flow_id = flow-2c1a23d9587842bebaeee490319de81f
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-7886178381634f05b302841141382e59:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] func-ef567879d0dd45b287ed709e549e9d32:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]

KisFunctionC, flow = &{Id:flow-2c1a23d9587842bebaeee490319de81f Name:flowName3 Conf:0xc000092dc0 Funcs:map[dataReuseFunc:0xc000095620 funcName1:0xc0000955c0 funcName3:0xc000095680] FlowHead:0xc0000955c0 FlowTail:0xc000095680 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000095680 ThisFunctionId:func-cfe66e39aba54ff989d6764cc4edda20 PrevFunctionId:func-ef567879d0dd45b287ed709e549e9d32 funcParams:map[func-7886178381634f05b302841141382e59:map[] func-cfe66e39aba54ff989d6764cc4edda20:map[] func-ef567879d0dd45b287ed709e549e9d32:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-7886178381634f05b302841141382e59:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] func-ef567879d0dd45b287ed709e549e9d32:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}

---> Call funcName3Handler ----
In FuncName = funcName3, FuncId = func-cfe66e39aba54ff989d6764cc4edda20, row = data from funcName[funcName1], index = 0
In FuncName = funcName3, FuncId = func-cfe66e39aba54ff989d6764cc4edda20, row = data from funcName[funcName1], index = 1
In FuncName = funcName3, FuncId = func-cfe66e39aba54ff989d6764cc4edda20, row = data from funcName[funcName1], index = 2
--- PASS: TestActionDataReuse (0.02s)
PASS
ok      kis-flow/test   0.523s

通过结果可以看出,在最后的funcName3Handler中得到的数据是funcName1传递下来的数据,中间的ReuseFunction将上层的数据复用到了下一层,变成了FuncName3的数据源。

7.3 Action ForceEntryNext(强制进入下一层)

7.3.1 ForceEntryNext Action属性

目前的KisFlow为,如果当前的Function没有commit数据(本层的结果数据),那么当前的Function结束后,将不会继续调度下一层Function。 但是有的Flow的流式计算可能需要继续向下执行,哪怕没有数据,所以这里可以通过ForceEntryNext这个动作来触发。 首先我们在Action中新增一个ForceEntryNext 属性。

kis-flow/kis/action.go

// Action KisFlow执行流程Actions
type Action struct {
    // DataReuse 是否复用上层Function数据
    DataReuse bool

    // 默认Next()为如果本层Function计算结果为0条数据,之后Function将不会继续执行
    // ForceEntryNext 为忽略上述默认规则,没有数据强制进入下一层Function
    ForceEntryNext bool

    // Abort 终止Flow的执行
    Abort bool
}

// ActionForceEntryNext 强制进入下一层
func ActionForceEntryNext(act *Action) {
    act.ForceEntryNext = true
}

且提供配置函数ActionForceEntryNext()来修改这个属性状态。

7.3.2 捕获Action

在捕获Action的dealAction()方法中,加上对这个状态的判断,如果被设置,则需要将flow.Abort状态改成false,flow将继续执行下一层。

kis-flow/flow/kis_flow_action.go

// dealAction  处理Action,决定接下来Flow的流程走向
func (flow *KisFlow) dealAction(ctx context.Context, fn kis.Function) (kis.Function, error) {

    // DataReuse Action
    if flow.action.DataReuse {
        if err := flow.commitReuseData(ctx); err != nil {
            return nil, err
        }
    } else {
        if err := flow.commitCurData(ctx); err != nil {
            return nil, err
        }
    }

    // ++++++++++++++++++++++++++++
    // ForceEntryNext Action
    if flow.action.ForceEntryNext {
        if err := flow.commitVoidData(ctx); err != nil {
            return nil, err
        }
        flow.abort = false
    }

    // 更新上一层 FuncitonId 游标
    flow.PrevFunctionId = flow.ThisFunctionId
    fn = fn.Next()

    // Abort Action 强制终止
    if flow.action.Abort {
        flow.abort = true
    }

    // 清空Action
    flow.action = kis.Action{}

    return fn, nil
}

这里有一个细节,我们需要调用一个方法commitVoidData(),即提交空数据,原因是,如果不提交空数据,那么flow.buffer依然为空,那么不会执行数据的提交动作,那么会导致flow.data[flow.ThisFunctionId]这条不存在,也就是key不存在,那么再执行到flow.getCurData()会出现找不到key的异常而panic。所以这里需要提交一个空的数据到flow.data[flow.ThisFunctionId]中。 具体的commitVoidData()实现如下:

kis-flow/flow/kis_flow_data.go

func (flow *KisFlow) commitVoidData(ctx context.Context) error {
    if len(flow.buffer) != 0 {
        return nil
    }

    // 制作空数据
    batch := make(common.KisRowArr, 0)

    // 将本层计算的缓冲数据提交到本层结果数据中
    flow.data[flow.ThisFunctionId] = batch

    log.Logger().DebugFX(ctx, " ====> After commitVoidData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data)

    return nil
}

7.3.3 单元测试,不设置ForceEntryNext

首先,创建一个noResultFunc的Function配置,且实现相关的回调业务函数。

kis-flow/test/load_conf/func/func-NoResultFunc.yml

kistype: func
fname: noResultFunc
fmode: Calculate
source:
  name: 用户订单错误率
  must:
    - order_id
    - user_id

kis-flow/test/faas/faas_no_result.go

package faas

import (
    "context"
    "fmt"
    "kis-flow/kis"
)

// type FaaS func(context.Context, Flow) error

func NoResultFuncHandler(ctx context.Context, flow kis.Flow) error {
    fmt.Println("---> Call NoResultFuncHandler ----")

    for _, row := range flow.Input() {
        str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
        fmt.Println(str)
    }

    return flow.Next()
}

这里面在Function的最后,只调用flow.Next() 不传递任何Action动作。 然后新建一个FlowName4,配置如下:

kis-flow/test/load_conf/flow-FlowName4.yml

kistype: flow
status: 1
flow_name: flowName4
flows:
  - fname: funcName1
  - fname: noResultFunc
  - fname: funcName3

最后我们编写单元测试用例代码,将noResultFunc放在中间的部分。

kis-flow/test/kis_action_test.go

func TestActionForceEntry(t *testing.T) {
    ctx := context.Background()

    // 0. 注册Function 回调业务
    kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
    kis.Pool().FaaS("noResultFunc", faas.NoResultFuncHandler) // 添加noResultFunc 业务
    kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)

    // 0. 注册ConnectorInit 和 Connector 回调业务
    kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
    kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)

    // 1. 加载配置文件并构建Flow
    if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil {
        panic(err)
    }

    // 2. 获取Flow
    flow1 := kis.Pool().GetFlow("flowName4")

    // 3. 提交原始数据
    _ = flow1.CommitRow("This is Data1 from Test")
    _ = flow1.CommitRow("This is Data2 from Test")
    _ = flow1.CommitRow("This is Data3 from Test")

    // 4. 执行flow1
    if err := flow1.Run(ctx); err != nil {
        panic(err)
    }
}

cd到kis-flow/test/ 下执行:

go test -test.v -test.paniconexit0 -test.run  TestActionForceEntry

结果如下:

=== RUN   TestActionForceEntry
Add KisPool FuncName=funcName1
Add KisPool FuncName=noResultFunc
Add KisPool FuncName=funcName3
Add KisPool CaaSInit CName=ConnName1
Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode =Save
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2]}
Add FlowRouter FlowName=flowName1
Add FlowRouter FlowName=flowName2
Add FlowRouter FlowName=flowName3
Add FlowRouter FlowName=flowName4
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2 funcName2]}
Add FlowRouter FlowName=flowName5
context.Background
====> After CommitSrcData, flow_name = flowName4, flow_id = flow-a496d02c79204e9a803fb5e1307523c9
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]

KisFunctionV, flow = &{Id:flow-a496d02c79204e9a803fb5e1307523c9 Name:flowName4 Conf:0xc000152e40 Funcs:map[funcName1:0xc00011d560 funcName3:0xc00011d620 noResultFunc:0xc00011d5c0] FlowHead:0xc00011d560 FlowTail:0xc00011d620 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00011d560 ThisFunctionId:func-4d113d6a8e744d30a906db310f2d7818 PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-47cb6f9ae464484aa779c18284035705:map[] func-4d113d6a8e744d30a906db310f2d7818:map[] func-70011c7ccecf46be91c6993d143639bb:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}

---> Call funcName1Handler ----
In FuncName = funcName1, FuncId = func-4d113d6a8e744d30a906db310f2d7818, row = This is Data1 from Test
In FuncName = funcName1, FuncId = func-4d113d6a8e744d30a906db310f2d7818, row = This is Data2 from Test
In FuncName = funcName1, FuncId = func-4d113d6a8e744d30a906db310f2d7818, row = This is Data3 from Test
context.Background
 ====> After commitCurData, flow_name = flowName4, flow_id = flow-a496d02c79204e9a803fb5e1307523c9
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-4d113d6a8e744d30a906db310f2d7818:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]

KisFunctionC, flow = &{Id:flow-a496d02c79204e9a803fb5e1307523c9 Name:flowName4 Conf:0xc000152e40 Funcs:map[funcName1:0xc00011d560 funcName3:0xc00011d620 noResultFunc:0xc00011d5c0] FlowHead:0xc00011d560 FlowTail:0xc00011d620 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00011d5c0 ThisFunctionId:func-47cb6f9ae464484aa779c18284035705 PrevFunctionId:func-4d113d6a8e744d30a906db310f2d7818 funcParams:map[func-47cb6f9ae464484aa779c18284035705:map[] func-4d113d6a8e744d30a906db310f2d7818:map[] func-70011c7ccecf46be91c6993d143639bb:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-4d113d6a8e744d30a906db310f2d7818:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}

---> Call NoResultFuncHandler ----
In FuncName = noResultFunc, FuncId = func-47cb6f9ae464484aa779c18284035705, row = data from funcName[funcName1], index = 0
In FuncName = noResultFunc, FuncId = func-47cb6f9ae464484aa779c18284035705, row = data from funcName[funcName1], index = 1
In FuncName = noResultFunc, FuncId = func-47cb6f9ae464484aa779c18284035705, row = data from funcName[funcName1], index = 2
--- PASS: TestActionForceEntry (0.02s)
PASS
ok      kis-flow/test   0.958s

因为noResultFunc不会生成任何的结果数据,所以下一层Function将不会被执行,最后只执行到

---> Call NoResultFuncHandler ----

7.3.4 单元测试,设置ForceEntryNext

下面我们将Action为ForceEntryNext加上,在NoResultFuncHandler() 中,加上flow.Next(kis.ActionForceEntryNext),如下:

kis-flow/test/faas/faas_no_result.go

package faas

import (
    "context"
    "fmt"
    "kis-flow/kis"
)

// type FaaS func(context.Context, Flow) error

func NoResultFuncHandler(ctx context.Context, flow kis.Flow) error {
    fmt.Println("---> Call NoResultFuncHandler ----")

    for _, row := range flow.Input() {
        str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
        fmt.Println(str)
    }

    return flow.Next(kis.ActionForceEntryNext)
}

cd到kis-flow/test/ 下执行:

go test -test.v -test.paniconexit0 -test.run  TestActionForceEntry

结果如下:

=== RUN   TestActionForceEntry
Add KisPool FuncName=funcName1
Add KisPool FuncName=noResultFunc
Add KisPool FuncName=funcName3
Add KisPool CaaSInit CName=ConnName1
Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode =Save
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2]}
Add FlowRouter FlowName=flowName5
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2 funcName2]}
Add FlowRouter FlowName=flowName1
Add FlowRouter FlowName=flowName2
Add FlowRouter FlowName=flowName3
Add FlowRouter FlowName=flowName4
context.Background
====> After CommitSrcData, flow_name = flowName4, flow_id = flow-7fb47f227c9f4b9d8fa69c28177fc7bb
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]

KisFunctionV, flow = &{Id:flow-7fb47f227c9f4b9d8fa69c28177fc7bb Name:flowName4 Conf:0xc000028e80 Funcs:map[funcName1:0xc0000136e0 funcName3:0xc0000137a0 noResultFunc:0xc000013740] FlowHead:0xc0000136e0 FlowTail:0xc0000137a0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0000136e0 ThisFunctionId:func-ecddaee7d7d447a9852d07088732f509 PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-5729600ae6ea4d6f879eb5832c638e1a:map[] func-c9817c7993894919b8463dea1757544e:map[] func-ecddaee7d7d447a9852d07088732f509:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}

---> Call funcName1Handler ----
In FuncName = funcName1, FuncId = func-ecddaee7d7d447a9852d07088732f509, row = This is Data1 from Test
In FuncName = funcName1, FuncId = func-ecddaee7d7d447a9852d07088732f509, row = This is Data2 from Test
In FuncName = funcName1, FuncId = func-ecddaee7d7d447a9852d07088732f509, row = This is Data3 from Test
context.Background
 ====> After commitCurData, flow_name = flowName4, flow_id = flow-7fb47f227c9f4b9d8fa69c28177fc7bb
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-ecddaee7d7d447a9852d07088732f509:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]

KisFunctionC, flow = &{Id:flow-7fb47f227c9f4b9d8fa69c28177fc7bb Name:flowName4 Conf:0xc000028e80 Funcs:map[funcName1:0xc0000136e0 funcName3:0xc0000137a0 noResultFunc:0xc000013740] FlowHead:0xc0000136e0 FlowTail:0xc0000137a0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000013740 ThisFunctionId:func-c9817c7993894919b8463dea1757544e PrevFunctionId:func-ecddaee7d7d447a9852d07088732f509 funcParams:map[func-5729600ae6ea4d6f879eb5832c638e1a:map[] func-c9817c7993894919b8463dea1757544e:map[] func-ecddaee7d7d447a9852d07088732f509:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-ecddaee7d7d447a9852d07088732f509:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}

---> Call NoResultFuncHandler ----
In FuncName = noResultFunc, FuncId = func-c9817c7993894919b8463dea1757544e, row = data from funcName[funcName1], index = 0
In FuncName = noResultFunc, FuncId = func-c9817c7993894919b8463dea1757544e, row = data from funcName[funcName1], index = 1
In FuncName = noResultFunc, FuncId = func-c9817c7993894919b8463dea1757544e, row = data from funcName[funcName1], index = 2
context.Background
 ====> After commitVoidData, flow_name = flowName4, flow_id = flow-7fb47f227c9f4b9d8fa69c28177fc7bb
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-c9817c7993894919b8463dea1757544e:[] func-ecddaee7d7d447a9852d07088732f509:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]

KisFunctionC, flow = &{Id:flow-7fb47f227c9f4b9d8fa69c28177fc7bb Name:flowName4 Conf:0xc000028e80 Funcs:map[funcName1:0xc0000136e0 funcName3:0xc0000137a0 noResultFunc:0xc000013740] FlowHead:0xc0000136e0 FlowTail:0xc0000137a0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0000137a0 ThisFunctionId:func-5729600ae6ea4d6f879eb5832c638e1a PrevFunctionId:func-c9817c7993894919b8463dea1757544e funcParams:map[func-5729600ae6ea4d6f879eb5832c638e1a:map[] func-c9817c7993894919b8463dea1757544e:map[] func-ecddaee7d7d447a9852d07088732f509:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-c9817c7993894919b8463dea1757544e:[] func-ecddaee7d7d447a9852d07088732f509:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}

---> Call funcName3Handler ----
--- PASS: TestActionForceEntry (0.01s)
PASS
ok      kis-flow/test   0.348s

会发现,Function第三层funcName3Handler 被执行到,但是没有任何的数据。

7.4 Action JumpFunc(流程跳转)

接下来,来实现JumpFunc Action,JumpFunc是可以在当前Flow中任意跳转到指定的FuncName继续执行(前提是跳转的FuncName当当前Flow中存在)

注意:JumpFunc容易出现无限循环流,所以在业务的设计要慎用。

7.4.1 Action添加JumpFunc

首先在Action添加一个JumpFunc属性,注意,JunpFunc不是一个bool状态,而是一个string字符串,表示具体要跳转的FunctionName名称。

kis-flow/kis/action.go

// Action KisFlow执行流程Actions
type Action struct {
    // DataReuse 是否复用上层Function数据
    DataReuse bool

    // 默认Next()为如果本层Function计算结果为0条数据,之后Function将不会继续执行
    // ForceEntryNext 为忽略上述默认规则,没有数据强制进入下一层Function
    ForceEntryNext bool

    // ++++++++++
    // JumpFunc 跳转到指定Function继续执行
    JumpFunc string

    // Abort 终止Flow的执行
    Abort bool
}


// ActionJumpFunc 会返回一个ActionFunc函数,并且会将funcName赋值给Action.JumpFunc
// (注意:容易出现Flow循环调用,导致死循环)
func ActionJumpFunc(funcName string) ActionFunc {
    return func(act *Action) {
        act.JumpFunc = funcName
    }
}

然后提供一个修改JumpFunc的配置方法ActionJumpFunc(),注意这个方法和之前的方法写法有一些不同,主要是返回一个匿名函数并且执行,目的则是修改Action中的JumpFunc属性。

7.4.2 捕获Action

接下来,我们来捕获JumpFunc的Action动作,判断JumpFunc是否为空字符串即可。

kis-flow/flow/kis_flow_action.go

// dealAction  处理Action,决定接下来Flow的流程走向
func (flow *KisFlow) dealAction(ctx context.Context, fn kis.Function) (kis.Function, error) {

    // DataReuse Action
    if flow.action.DataReuse {
        if err := flow.commitReuseData(ctx); err != nil {
            return nil, err
        }
    } else {
        if err := flow.commitCurData(ctx); err != nil {
            return nil, err
        }
    }

    // ForceEntryNext Action
    if flow.action.ForceEntryNext {
        if err := flow.commitVoidData(ctx); err != nil {
            return nil, err
        }
        flow.abort = false
    }

    // ++++++++++++++++++++++++++++++++
    // JumpFunc Action
    if flow.action.JumpFunc != "" {
        if _, ok := flow.Funcs[flow.action.JumpFunc]; !ok {
            //当前JumpFunc不在flow中
            return nil, errors.New(fmt.Sprintf("Flow Jump -> %s is not in Flow", flow.action.JumpFunc))
        }

        jumpFunction := flow.Funcs[flow.action.JumpFunc]
        // 更新上层Function
        flow.PrevFunctionId = jumpFunction.GetPrevId()
        fn = jumpFunction

        // 如果设置跳跃,强制跳跃
        flow.abort = false
    // ++++++++++++++++++++++++++++++++

    } else {

        // 更新上一层 FuncitonId 游标
        flow.PrevFunctionId = flow.ThisFunctionId
        fn = fn.Next()
    }

    // Abort Action 强制终止
    if flow.action.Abort {
        flow.abort = true
    }

    // 清空Action
    flow.action = kis.Action{}

    return fn, nil
}

如果设置JumpFunc,则需要修改下次执行的fn指针,否则则正常寻址fn.Next()

7.4.3 单元测试

接下来来定义一个跳转Action的Function,配置,如下:

kis-flow/test/load_conf/func/func-jumpFunc.yml

kistype: func
fname: jumpFunc
fmode: Calculate
source:
  name: 用户订单错误率
  must:
    - order_id
    - user_id

并且实现相关的Funciton业务逻辑,如下:

kis-flow/test/faas/faas_jump.go

package faas

import (
    "context"
    "fmt"
    "kis-flow/kis"
)

// type FaaS func(context.Context, Flow) error

func JumpFuncHandler(ctx context.Context, flow kis.Flow) error {
    fmt.Println("---> Call JumpFuncHandler ----")

    for _, row := range flow.Input() {
        str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
        fmt.Println(str)
    }

    return flow.Next(kis.ActionJumpFunc("funcName1"))
}

这里,最后通过flow.Next(kis.ActionJumpFunc("funcName1"))来指定跳转到funcName1的Function。

新建一个Flow,为FlowName5,配置如下:

kis-flow/test/load_conf/flow/flow-FlowName5.yml

kistype: flow
status: 1
flow_name: flowName5
flows:
  - fname: funcName1
  - fname: funcName2
  - fname: jumpFunc

之后,来实现单元测试用例代码,如下:

kis-flow/test/kis_action_test.go

func TestActionJumpFunc(t *testing.T) {
    ctx := context.Background()

    // 0. 注册Function 回调业务
    kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
    kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler)
    kis.Pool().FaaS("jumpFunc", faas.JumpFuncHandler) // 添加jumpFunc 业务

    // 0. 注册ConnectorInit 和 Connector 回调业务
    kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
    kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)

    // 1. 加载配置文件并构建Flow
    if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil {
        panic(err)
    }

    // 2. 获取Flow
    flow1 := kis.Pool().GetFlow("flowName5")

    // 3. 提交原始数据
    _ = flow1.CommitRow("This is Data1 from Test")
    _ = flow1.CommitRow("This is Data2 from Test")
    _ = flow1.CommitRow("This is Data3 from Test")

    // 4. 执行flow1
    if err := flow1.Run(ctx); err != nil {
        panic(err)
    }
}

cd到kis-flow/test/执行:

go test -test.v -test.paniconexit0 -test.run  TestActionJumpFunc

结果如下:

... 
...

---> Call funcName1Handler ----
In FuncName = funcName1, FuncId = func-f6ca8010d66744429bf6069c9897a928, row = This is Data1 from Test
In FuncName = funcName1, FuncId = func-f6ca8010d66744429bf6069c9897a928, row = This is Data2 from Test
In FuncName = funcName1, FuncId = func-f6ca8010d66744429bf6069c9897a928, row = This is Data3 from Test
context.Background
 ====> After commitCurData, flow_name = flowName5, flow_id = flow-5da80af989dc49648a001762fa08b866
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-5800567c4cd842b6b377c2b0c0fd81c2:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f6ca8010d66744429bf6069c9897a928:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]

KisFunctionS, flow = &{Id:flow-5da80af989dc49648a001762fa08b866 Name:flowName5 Conf:0xc000028f80 Funcs:map[funcName1:0xc000013620 funcName2:0xc000013680 jumpFunc:0xc0000136e0] FlowHead:0xc000013620 FlowTail:0xc0000136e0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000013680 ThisFunctionId:func-5800567c4cd842b6b377c2b0c0fd81c2 PrevFunctionId:func-f6ca8010d66744429bf6069c9897a928 funcParams:map[func-4faf8f019f4a4a48b84ef27abfad53d1:map[] func-5800567c4cd842b6b377c2b0c0fd81c2:map[] func-f6ca8010d66744429bf6069c9897a928:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-5800567c4cd842b6b377c2b0c0fd81c2:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f6ca8010d66744429bf6069c9897a928:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}

---> Call funcName2Handler ----
In FuncName = funcName2, FuncId = func-5800567c4cd842b6b377c2b0c0fd81c2, row = data from funcName[funcName1], index = 0
===> In CaasDemoHanler1: flowName: flowName5, cName:ConnName1, fnName:funcName2, mode:Save
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 0
In FuncName = funcName2, FuncId = func-5800567c4cd842b6b377c2b0c0fd81c2, row = data from funcName[funcName1], index = 1
===> In CaasDemoHanler1: flowName: flowName5, cName:ConnName1, fnName:funcName2, mode:Save
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 1
In FuncName = funcName2, FuncId = func-5800567c4cd842b6b377c2b0c0fd81c2, row = data from funcName[funcName1], index = 2
===> In CaasDemoHanler1: flowName: flowName5, cName:ConnName1, fnName:funcName2, mode:Save
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 2
context.Background
 ====> After commitCurData, flow_name = flowName5, flow_id = flow-5da80af989dc49648a001762fa08b866
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-5800567c4cd842b6b377c2b0c0fd81c2:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f6ca8010d66744429bf6069c9897a928:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]

KisFunctionC, flow = &{Id:flow-5da80af989dc49648a001762fa08b866 Name:flowName5 Conf:0xc000028f80 Funcs:map[funcName1:0xc000013620 funcName2:0xc000013680 jumpFunc:0xc0000136e0] FlowHead:0xc000013620 FlowTail:0xc0000136e0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0000136e0 ThisFunctionId:func-4faf8f019f4a4a48b84ef27abfad53d1 PrevFunctionId:func-5800567c4cd842b6b377c2b0c0fd81c2 funcParams:map[func-4faf8f019f4a4a48b84ef27abfad53d1:map[] func-5800567c4cd842b6b377c2b0c0fd81c2:map[] func-f6ca8010d66744429bf6069c9897a928:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-5800567c4cd842b6b377c2b0c0fd81c2:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f6ca8010d66744429bf6069c9897a928:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}

---> Call JumpFuncHandler ----
In FuncName = jumpFunc, FuncId = func-4faf8f019f4a4a48b84ef27abfad53d1, row = data from funcName[funcName2], index = 0
In FuncName = jumpFunc, FuncId = func-4faf8f019f4a4a48b84ef27abfad53d1, row = data from funcName[funcName2], index = 1
In FuncName = jumpFunc, FuncId = func-4faf8f019f4a4a48b84ef27abfad53d1, row = data from funcName[funcName2], index = 2
KisFunctionV, flow = &{Id:flow-5da80af989dc49648a001762fa08b866 Name:flowName5 Conf:0xc000028f80 Funcs:map[funcName1:0xc000013620 funcName2:0xc000013680 jumpFunc:0xc0000136e0] FlowHead:0xc000013620 FlowTail:0xc0000136e0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000013620 ThisFunctionId:func-f6ca8010d66744429bf6069c9897a928 PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-4faf8f019f4a4a48b84ef27abfad53d1:map[] func-5800567c4cd842b6b377c2b0c0fd81c2:map[] func-f6ca8010d66744429bf6069c9897a928:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-5800567c4cd842b6b377c2b0c0fd81c2:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f6ca8010d66744429bf6069c9897a928:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}

---> Call funcName1Handler ----

 ... 
 ...

发现我们会无限循环的调度Flow,这样说明我们的JumpFunc Action已经生效。

7.5【V0.6】源代码

https://github.com/aceld/kis-flow/releases/tag/v0.6


作者:刘丹冰Aceld github: https://github.com/aceld
KisFlow开源项目地址:https://github.com/aceld/kis-flow

Golang框架实战-KisFlow流式计算框架专栏

Golang框架实战-KisFlow流式计算框架(1)-概述

Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)

Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)

Golang框架实战-KisFlow流式计算框架(4)-数据流

Golang框架实战-KisFlow流式计算框架(5)-Function调度

Golang框架实战-KisFlow流式计算框架(6)-Connector

Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK