66

用Go语言实现ReactiveX(三)——链式编程

 5 years ago
source link: https://studygolang.com/articles/15134?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

接上一篇 用Go语言实现ReactiveX(二)——Deliver

我们在上一篇,谈到了数据传递者Deliver。那么还差一个Subscriber没讲,这个实现其实已经没什么好讲的了,可以直接看源码。因为Deliver里面蕴含了对Observable的订阅过程,而Subscriber的主要功能就是这个,相当于去掉被订阅功能的Deliver。

Reactive 编程就是把Observable、Deliver、Subscriber串起来变成一个单向流动的数据管道。所以必须设计一个串起来的方式。

Pipe编程模式

RxJS 6.0 的时候引入了pipe模式。所以我们的实现是基于pipe模式的。

func Pipe(source Observable, cbs ...Deliver) Observable {
    for _, cb := range cbs {
        source = cb(source)
    }
    return source
}

这时候我们可以将使用这个函数来组合所有的Rx对象

Pipe(FromArray(...),Filter(...),...)

这个函数返回的仍然是Observable,所以可以继续使用Pipe

ob1:=Pipe(FromArray(...),Filter(...),...)
Pipe(ob1,Map(...),SwitchMap(...),...)

当然最后必须得有人订阅这个Observable

Subscribe(...)(observable)

这么设计的原因是golang是强类型语言,pipe无法兼容observer类型,除非有泛型。否则Subscriber就可以放到pipe函数参数末尾传入了。

下面我们回到标题说的链式编程的实现

链式编程实现

所谓链式编程,就是一个对象的方法返回值是对象自身,这样可以接着调用对象的其他方法,行程一个链条,Rx早期的实现都是这么做的。

最终我们可以如此调用:

rx.FromArray(...).Filter(...).Subscribe(...)

那么如何实现呢?

package rx
import (
    p "github.com/langhuihui/gorx/pipe"
)
type Observable struct {
    source p.Observable
}

我们所有的Observable和Deliver包括Subscriber以及Pipe函数等定义全部都在 github.com/langhuihui/gorx/pipe 这个包里面

那么我们在外层的rx包里面就定义上面这个Observable,名称是相同的,但在不同包里面。

在pipe包里面,Observable是一个函数,而在rx包里面Observable是一个结构体,目的是实现链式编程。这个结构体只有一个成员就是source,类型是pipe包里面的Observable。魔法就此展开了。

func FromArray(array []interface{}) *Observable {
    return &Observable{p.FromArray(array)}
}

当我们调用 rx.FromArray(...) 的时候,会返回一个rx.Observable 的对象指针,这个对象里面的source属性就是pipe包里面的FromArray函数调用后的Observable

当我们继续调用操作符Filter的时候, rx.FromArray(...).Filter(...) ,就会调用rx.Observable结构体的Filter方法,这时候我们只需要定义这个成员函数即可。

func (observable *Observable) Filter(f func(interface{}) bool) *Observable {
    return &Observable{p.Filter(f)(observable.source)}
}

其他操作符以此类推,我写了一个脚本用来生成一系列这个定义,省去手工抄写的重复劳动。

可以瞬间从源码生成一堆成员方法

//TakeUntil 
func (observable *Observable) TakeUntil(sSrc Observable, delivers ...p.Deliver) *Observable {
    return &Observable{p.TakeUntil(sSrc.source, delivers...)(observable.source)}
}

//TakeLast 
func (observable *Observable) TakeLast(count int) *Observable {
    return &Observable{p.TakeLast(count)(observable.source)}
}

//Skip 
func (observable *Observable) Skip(count int) *Observable {
    return &Observable{p.Skip(count)(observable.source)}
}

//SkipWhile 
func (observable *Observable) SkipWhile(f func(interface{}) bool) *Observable {
    return &Observable{p.SkipWhile(f)(observable.source)}
}

//SkipUntil 
func (observable *Observable) SkipUntil(sSrc Observable, delivers ...p.Deliver) *Observable {
    return &Observable{p.SkipUntil(sSrc.source, delivers...)(observable.source)}
}

链式编程就算大工告成了。下面就是愉快的Rx编程了。

import "github.com/langhuihui/gorx"
rx.Interval(1000).SkipUntil(rx.Of(1).Delay(3000)).Subscribe(func(x interface{}, dispose func()) {
        fmt.Print(x)
    }, nil, nil)

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK