1

80 行代码实现简易 RxJS

 1 year ago
source link: https://www.fly63.com/article/detial/11916
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

RxJS 是一个响应式的库,它接收从事件源发出的一个个事件,经过处理管道的层层处理之后,传入最终的接收者,这个处理管道是由操作符组成的,开发者只需要选择和组合操作符就能完成各种异步逻辑,极大简化了异步编程。除此以外,RxJS 的设计还遵循了函数式、流的理念。

直接讲概念比较难理解,不如我们实现一个简易的 RxJS 再来看这些。

RxJS 的使用

RxJS 会对事件源做一层封装,叫做 Observable,由它发出一个个事件。

比如这样:

const source = new Observable((observer) => {  
    let i = 0;  
    setInterval(() => {  
        observer.next(++i);  
    }, 1000);  
});  

在回调函数里面设置一个定时器,不断通过 next 传入事件。

这些事件会被接受者监听,叫做 Observer。

const subscription = source.subscribe({  
    next: (v) => console.log(v),  
    error: (err) => console.error(err),  
    complete: () => console.log('complete'),  
});  

observer 可以接收 next 传过来的事件,传输过程中可能有 error,也可以在这里处理 error,还可以处理传输完成的事件。

这样的一个监听或者说订阅,叫做 Subscription。

可以订阅当然也可以取消订阅:

subscription.unsubscribe();  

取消订阅时的回调函数是在 Observable 里返回的:

const source = new Observable((observer) => {  
    let i = 0;  
    const timer = setInterval(() => {  
        observer.next(++i);  
    }, 1000);  
    return function unsubscribe() {  
        clearInterval(timer);  
    };  
});  

发送事件、监听事件只是基础,处理事件的过程才是 RxJS 的精髓,它设计了管道的概念,可以用操作符 operator 来组装这个管道:

source.pipe(  
    map((i) => ++i),  
    map((i) => i * 10)  
).subscribe(() => {  
    //...  
})  

事件经过管道之后才会传到 Observer,在传输过程中会经过一个个操作符的处理。

比如这里的处理逻辑是,对传过来的数据加 1,然后再乘以 10。

综上,使用 RxJS 的代码就是这样的:

const source = new Observable((observer) => {  
    let i = 0;  
    const timer = setInterval(() => {  
        observer.next(++i);  
    }, 1000);  
    return function unsubscribe() {  
        clearInterval(timer);  
    };  
});  
const subscription = source.pipe(  
    map((i) => ++i),  
    map((i) => i * 10)  
).subscribe({  
    next: (v) => console.log(v),  
    error: (err) => console.error(err),  
    complete: () => console.log('complete'),  
});  

setTimeout(() => {  
    subscription.unsubscribe();  
}, 4500);  

我们通过 Observable 创建了一个事件源,每秒发出一个事件,这些事件会经过管道的处理再传递给 Observer,管道的组成是两个 map 操作符,对数据做了 + 1 和 * 10 的处理。

Observer 接收到传递过来的数据,做了打印,还对错误和结束时的事件做了处理。此外,Observable 提供了取消订阅时的处理逻辑,当我们在 4.5s 取消订阅时,就可以清除定时器。

使用 RxJS 基本就是这个流程,那它是怎么实现的呢?

80 行代码实现 RxJS

先从事件源开始,实现 Observable:

观察下它的特点:

  1. 它接收一个回调函数,里面可以调用 next 来传输数据。
  2. 它有 subscribe 方法可以用来添加 Observer 的订阅,返回 subscription
  3. 它可以在回调函数里返回 unsbscribe 时的处理逻辑
  4. 它有 pipe 方法可以传入操作符
62d8b7f0ae84b.jpg

我们按照这些特点来实现下:

首先,Observable 的构造函数要接收回调函数 _subscribe,但是不是立刻调用,而是在 subscribe 的时候才调用:

class Observable {  
    constructor(_subscribe) {  
        this._subscribe = _subscribe;  
    }  
    subscribe() {  
        this._subscribe();  
    }  
}  

回调函数的参数是有 next、error、complete 方法的对象,用于传递事件:

class Observable {  
    constructor(_subscribe) {  
        this._subscribe = _subscribe;  
    }  
    subscribe(observer) {  
        const subscriber = new Subscriber(observer);  
        this._subscribe(subscriber);  
    }  
}  

class Subscriber{  
    constructor(observer) {  
        super();  
        this.observer = observer;  
        this.isStopped = false;  
    }  
    next(value) {  
        if (this.observer.next && !this.isStopped) {  
            this.observer.next(value);  
        }  
    }  
    error(value) {  
        this.isStopped = true;  
        if (this.observer.error) {  
            this.observer.error(value);  
        }  
    }  
    complete() {  
        this.isStopped = true;  
        if (this.observer.complete) {  
            this.observer.complete();  
        }  
        if (this.unsubscribe) {  
            this.unsubscribe();  
        }  
    }  
}  

这样,在回调函数里面就可以调用 next、error、complete 方法了:

62d8b7f818015.jpg

此外,回调函数的返回值是 unsbscribe 时的处理逻辑,要收集起来,在取消订阅时调用:

class Subscription {  
    constructor() {  
        this._teardowns = [];  
    }  
    unsubscribe() {  
        this._teardowns.forEach((teardown) => {  
            typeof teardown === 'function' ? teardown() : teardown.unsubscribe()  
        });  
    }  
    add(teardown) {  
        if (teardown) {  
            this._teardowns.push(teardown);  
        }  
    }  
}  

提供 unsubscribe 方法用于取消订阅,_teardowns 用于收集所有的取消订阅时的回调,在 unsubscribe 时调用所有 teardown 回调。

这段逻辑比较通用,可以作为 Subscriber 的父类。

然后,在 Observable 里调用 add 来添加 teardown,并且返回 subscription(它有 unsubscribe 方法):

class Observable {  
    constructor(_subscribe) {  
        this._subscribe = _subscribe;  
    }  
    subscribe(observer) {  
        const subscriber = new Subscriber(observer);  
        subscriber.add(this._subscribe(subscriber));  
        return subscriber;  
    }  
}  
class Subscriber extends Subscription {  
    constructor(observer) {  
        super();  
        this.observer = observer;  
        this.isStopped = false;  
    }  
    next(value) {  
        if (this.observer.next && !this.isStopped) {  
            this.observer.next(value);  
        }  
    }  
    error(value) {  
        this.isStopped = true;  
        if (this.observer.error) {  
            this.observer.error(value);  
        }  
    }  
    complete() {  
        this.isStopped = true;  
        if (this.observer.complete) {  
            this.observer.complete();  
        }  
        if (this.unsubscribe) {  
            this.unsubscribe();  
        }  
    }  
}  

class Subscription {  
    constructor() {  
        this._teardowns = [];  
    }  
    unsubscribe() {  
        this._teardowns.forEach((teardown) => {  
            typeof teardown === 'function' ? teardown() : teardown.unsubscribe()  
        });  
    }  
    add(teardown) {  
        if (teardown) {  
            this._teardowns.push(teardown);  
        }  
    }  
}  

这样,我们就实现了 Observable 和 Observer,只写了 50 行代码。先来测试下:

const source = new Observable((observer) => {  
    let i = 0;  
    const timer = setInterval(() => {  
        observer.next(++i);  
    }, 1000);  
    return function unsubscribe() {  
        clearInterval(timer);  
    };  
});  
const subscription = source.subscribe({  
    next: (v) => console.log(v),  
    error: (err) => console.error(err),  
    complete: () => console.log('complete'),  
});  

setTimeout(() => {  
    subscription.unsubscribe();  
}, 4500);  
62d8b7ffab2fd.jpg

Observer 监听到了 Observable 传递过来的 1、2、3、4 的数据,因为在 4.5s 时取消了订阅,所以后面就不再有数据了。

我们用 50 行实现了基础的 RxJS!

当然,最精髓的 operator 还没有实现,接下来继续完善。

我们给 Observable 添加 pipe 方法,它会调用传入的 operator,并且上个的结果是下个的输入,这样就串起来了,也就是管道的概念:

class Observable {  
    constructor(_subscribe) {  
        //...  
    }  
    subscribe(observer) {  
       //...  
    }  
    pipe(...operations) {  
        return pipeFromArray(operations)(this);  
    }  
}  

function pipeFromArray(fns) {  
    if (fns.length === 0) {  
        return (x) => x;  
    }  
    if (fns.length === 1) {  
        return fns[0];  
    }  
    return (input) => {  
        return fns.reduce((prev, fn) => fn(prev), input);  
    };  
}  

当传入的参数是 0 个的时候,就直接返回之前的 Observable,1 个的时候直接返回,否则就通过 reduce 的方式串联起来,组成管道。

operator 的实现就是监听上一个 Observable,返回一个新的。

比如 map 的实现,就是传入 project 对 value 做处理,把结果用 next 传下去:

function map(project) {  
    return (observable) => new Observable((subscriber) => {  
        const subcription = observable.subscribe({  
            next(value) {  
                return subscriber.next(project(value));  
            },  
            error(err) {  
                subscriber.error(err);  
            },  
            complete() {  
                subscriber.complete();  
            },  
        });  
        return subcription;  
    });  
}  

这样我们就实现了 operator,来测试下:

62d8b805c9ea7.jpg

我们调用了 pipe 方法,使用两个 map 操作符来组织处理流程,对数据做了 +1 和 *10 的处理。

所以,Observable 传递过来的 1、2、3、4 传递给 Observer 的时候就变成了 20、30、40、50。

至此,我们实现了 RxJS 的 Observable、Observer、Subscription、operator 等概念,是一个简易版  RxJS 了。只用了 80 行代码。

再来看最开始的那些理念:

为什么叫做响应式呢?

因为是对事件源做监听和一系列处理的,这种编程模式就叫做响应式。

为什么叫函数式呢?

因为每一步 operator 都是纯函数,返回一个新的 Observable,这符合函数式的不可变,修改后返回一个新的的理念。

为什么叫流呢?

因为一个个事件是动态产生和传递的,这种数据的动态产生和传递就可以叫做流。

完整代码如下:

function pipeFromArray(fns) {  
    if (fns.length === 0) {  
        return (x) => x;  
    }  
    if (fns.length === 1) {  
        return fns[0];  
    }  
    return (input) => {  
        return fns.reduce((prev, fn) => fn(prev), input);  
    };  
}  
class Subscription {  
    constructor() {  
        this._teardowns = [];  
    }  
    unsubscribe() {  
        this._teardowns.forEach((teardown) => {  
            typeof teardown === 'function' ? teardown() : teardown.unsubscribe()  
        });  
    }  
    add(teardown) {  
        if (teardown) {  
            this._teardowns.push(teardown);  
        }  
    }  
}  
class Subscriber extends Subscription {  
    constructor(observer) {  
        super();  
        this.observer = observer;  
        this.isStopped = false;  
    }  
    next(value) {  
        if (this.observer.next && !this.isStopped) {  
            this.observer.next(value);  
        }  
    }  
    error(value) {  
        this.isStopped = true;  
        if (this.observer.error) {  
            this.observer.error(value);  
        }  
    }  
    complete() {  
        this.isStopped = true;  
        if (this.observer.complete) {  
            this.observer.complete();  
        }  
        if (this.unsubscribe) {  
            this.unsubscribe();  
        }  
    }  
}  
class Observable {  
    constructor(_subscribe) {  
        this._subscribe = _subscribe;  
    }  
    subscribe(observer) {  
        const subscriber = new Subscriber(observer);  
        subscriber.add(this._subscribe(subscriber));  
        return subscriber;  
    }  
    pipe(...operations) {  
        return pipeFromArray(operations)(this);  
    }  
}  
function map(project) {  
    return (observable) => new Observable((subscriber) => {  
        const subcription = observable.subscribe({  
            next(value) {  
                return subscriber.next(project(value));  
            },  
            error(err) {  
                subscriber.error(err);  
            },  
            complete() {  
                subscriber.complete();  
            },  
        });  
        return subcription;  
    });  
}  


const source = new Observable((observer) => {  
    let i = 0;  
    const timer = setInterval(() => {  
        observer.next(++i);  
    }, 1000);  
    return function unsubscribe() {  
        clearInterval(timer);  
    };  
});  
const subscription = source.pipe(  
    map((i) => ++i),  
    map((i) => i * 10)  
).subscribe({  
    next: (v) => console.log(v),  
    error: (err) => console.error(err),  
    complete: () => console.log('complete'),  
});  

setTimeout(() => {  
    subscription.unsubscribe();  
}, 4500);  

为了理解 RxJS 的响应式、函数式、流等理念,我们实现了简易版的 RxJS。

我们实现了 Observable、Observer、Subscription 等概念,完成了事件的产生和订阅以及取消订阅。

接着又实现了 operator 和 pipe,每个 operator 返回一个新的 Observable,对数据做层层处理。

写完以后,我们能更清晰的理解响应式、函数式、流等理念在 RxJS 里是怎么体现的。

实现简易版 RxJS,只需要 80 行代码。

以上文章来源于神光的编程秘籍 ,作者神说要有光

链接: https://www.fly63.com/article/detial/11916


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK