47

【JavaScript】吃饱了撑的系列之JavaScript模拟多线程并发 - 外婆的彭湖湾

 4 years ago
source link: https://www.cnblogs.com/penghuwan/p/11483291.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

正文

前言
最近,明学是一个火热的话题,而我,却也想当那么一回明学家,那就是,把JavaScript和多线程并发这两个八竿子打不找的东西,给硬凑了起来,还写了一个并发库concurrent-thread-js。尴尬的是,当我发现其中的不合理之处,即这个东东的应用场景究竟是什么时,我发现我已经把代码写完了。
⚠️注意! 本文中的线程指的都是用JS异步函数模拟的“假线程”,不是真正意义上的多线程,请不要误解⚠️

github地址

https://github.com/penghuwan/concurrent-thread.js

本文的目的

事实上,这个库用处很小,但是在写的过程中,我对Promise,Async函数以及event事件流的使用产生了新的认识,同时也逐渐去学习和了解怎么去从零开始去写一个非业务的,通用的npm模块,所以希望拿出来和大家分享一下,这才是本文的真正的目的。
好,我们从一个故事开始。
v2-024c0eb60e96a7178d7eacdfc5553924_b.png

场景一

v2-b539cf84592b7dcdd07e7cbaa52f1a8d_b.png

场景二

github地址

https://github.com/penghuwan/concurrent-thread.js​github.com
注意!倘若不考虑webworker这种解决方案,我们一般都认为JS是单线程的。

concurrent-thread-js功能简介

为单线程的JavaScript实现并发协调的功能,语意,命名和作用性质上参考Java的实现,提sleep/join/interupt等API以及锁和条件变量等内容,并提供线程间通信的功能,依赖ES6语法,基于Promise和Async函数实现,故需要Babel编译才能运行。JavaScrpt本来就是单线程的,所以这只是在API的层面实现了模拟,在下文的介绍中,每条所谓的线程其实就是普通的异步函数,并在此基础上实现不同线程的协调配合。

为什么不选用webworker实现?

没错,一般来说JS中模拟多线程我们也许会选用webworker,但是它必须要求你手动创建额外的webworker脚本文件,并通过new work('work.js')这种方式使用,这并不能达到我项目中想要的API的效果,而且注意:webwork中的环境不是window!很多方法你调用不了的。你只能采取这种方案,也即在主线程完成该功能,这是我没有选择webworker的另一个原因。
说是这样说,但其实在大多数时候还是用webworker就够了

什么时候使用concurrent-thread-js

这个问题真是灵魂拷问,可是既然代码写都写了,我怎么也得编一个理由出来啊!额。。。让我想想哈
它的作用是:当JS工程需要让两个函数在执行上不互相干扰,同时也不希望它们会阻塞主线程,与此同时,还希望这两个函数实现类似并发多线程之间的协调的需求的时候,你可以使用这个并发模拟库,实际上这种应用场景。。。这尼玛有这种应用场景吗?!(扎心了呀)。

API总览

  • submit(function,[namespace]): 接收一个函数,普通函数或Async函数均可,并异步执行"线程"
  • sleep(ms): "线程"休眠,可指定休眠时间ms,以毫秒计算
  • join(threadName): "线程"同步,调用此方法的"线程"函数将在threadName执行结束后继续执行
  • interupt(threadName): "线程"中断,影响"线程"内部调this.isInterrupted()的返回值
  • Lock.lock: 加锁,一个时刻只能有一个"线程"函数进入临界区,其他"线程"函数需要等待,锁是非公平的,也就是说后面排队的线程函数没有先后,以随机的方式进行竞争。
  • Lock.unlock:解除非公平锁
  • Condition.wait:不具备执行条件,"线程"进入waiting状态,等待被唤醒
  • Condition.notify:随机唤醒一个wait的"线程"
  • Condition.notifyAll: 尚未编写,唤醒所有wait的"线程"
  • getState: 还没写完 获取"线程"状态,包括RUNNALE(运行),WAITING(等待),BLOCKED(阻塞),TERMINATED(终止)
三个类:ThreadPool,Lock和Condition
我们的API分别写入三个类中,分别是
  • ThreadPool类:包含submit/sleep/join/interrupt/getState方法
  • Lock类:包含Lock.lock和Lock.unLock方法
  • Condition类:包含Condition.wait和Condition.notify方法
注:以下所说的"线程"都是指JS中模拟的异步函数

A1.submit方法

submit模拟提交线程至线程池
// 备注:为循序渐进介绍,以下为简化代码
// 存储每个线程函数的状态,例如是否中断,以及线程状态等
const threadMap = {};

class ThreadPool {
    // 模拟线程中断
    interrupt(threadName) {   }
    // 模拟线程同步
    join(threadName, targetThread) {   }
    // 模拟线程休眠
    sleep(ms) { }
};
function submit(func, name) {
    if (!func instanceof Function) return;
    // 方式1:传入一个具名函数;方式2:传入第二个参数,即线程命名空间
    const threadName = func.name || name;
    // threadMap负责存储线程状态数据
    threadMap[threadName] = { state: RUNNABLE, isInterrupted: false };
    // 让func异步调用,同时将传入函数的作用域绑定为 ThreadPool原型
    Promise.resolve({
        then: func.bind(ThreadPool.prototype);
   })
}
首先,我们做了三件事情:
  1. 获取线程函数的命名空间,并初始化线程初始数据,不同线程状态由threadMap全局存储
  2. 将提交的函数func作为Promise.resolve方法中的一个thenable对象的then参数,这相当于立即"完成"一个Promise,同时在then方法中执行func,func会以异步而不是同步的方式进行执行,你也可以简单的理解成类似于执行了setTimeOut(func,0);
  3. 将func的作用域绑定为新生成的ThreadPool实例,ThreadPool中定义了我们上面我们介绍到的方法,如sleep/join/interupt等,这有什么好处呢?这意味着我们可以直接在函数中通过调用this.interrupt的方式去调用我们定义的API了,符合我们的使用习惯(注意,class中定义的除箭头函数外的普通函数实际上都存放在原型中)
submit(async function example() {
    this.interrupt();
});
但问题在于:现在因为所有的函数通过this调用的都是ThreadPool原型中的方法,我们要在调用唯一的interrupt方法,需要在异步函数中传入"线程"标识,如线程名。这显然不方便,也不优雅,例如下面的命名为example的线程函数
submit(async function example() {
    this.interrupt('example');
});
使用这个模块用户会感到奇怪:我明明在example函数中,为什么还要给调用方法传example这个名字参数??难道不能在模块内部把这事情干了吗?
对!我们下面做的就是这件事情,我们编写一个delegateThreadPool方法,由它为ThreadPool代理处理不同“线程“函数的函数名
// 返回代理后的ThreadPool
function delegateThreadPool(threadName) { // threadName为待定的线程名,在submit方法调用时候传入
    // 代理后的ThreadPool
    const proxyClass = {};
    // 获取ThreadPool原来的所有的方法,赋给props数组
    var props = Object.getOwnPropertyNames(ThreadPool.prototype);
    for (let prop of props) {
        // 代理ThreadPool,为其所有方法增加threadName这个参数
        let fnName = prop;
        proxyClass[fnName] = (...args) => {
            const fn = baseClass[fnName];
            return fn(threadName, ...args);
        };
    }
    return proxyClass;
}
function submit(func, name) {
    // 省略其他代码 。。。
    const proxyScope = delegateThreadPool(threadName);
    // 让func异步调用,不阻塞主线程,同时实现并发
    Promise.resolve({
        then: function () {
            // 给func绑定this为代理后的ThreadPool对象,以便调用方法
            func.call(proxyScope);
        }
    });
}
// 调用this.sleep方法时,已经无需增加函数命名作为参数了
submit(async function example() {
    this.interrupt();
});
也就是说,我们的线程函数func绑定的已经不是ThreadPool.prototype了,而是delegateThreadPool处理后返回的对象:proxyScope。这时候,我们在“线程”函数体里调用this.interrupt方法时,已经无需增加函数命名作为参数了,因为这个工作,proxyScope对象帮我们做了,其实它的工作很简单——就是它的每个函数,都在一个返回的闭包里面调用ThreadPool的同名函数,并传递线程名作为第一个参数。

A2. sleep方法

作用:线程休眠
sleep方法很简单,无非就是返回一个Promise实例,在Promise的函数里面调setTimeOut,等时间到了执行resolve函数,这段时间里修饰Promise的await语句会阻塞一段时间,resolve后又await语句又继续向下执行了,能满足我们想要的休眠效果
// 模拟“线程”休眠
sleep(ms) {
  return new Promise(function (resolve) {
    setTimeout(resolve, ms);
  })
}
// 提交“线程”
submit(async function example() {
    // 阻塞停留3秒,然后才输出1
    await this.sleep(3000);
    console.log(1);
});

A3. interrupt方法

作用:线程中断,可用于处理线程停止等操作
这里要先介绍一下Java里面的interrupt方法:在JAVA里,你不能通过调用terminate方法停掉一个线程,因为这有可能会因为处理逻辑突然中断而导致数据不一致的问题,所以要通过interrupt方法把一个中断标志位置为true,然后通过isInterrupted方法作为判断条件跳出关键代码。
所以为了模拟,我在JS中处理“线程”中断也是这么去做的,但是我们这样做的根本原因是:我们压根没有可以停掉一个线程函数的方法!(JAVA是有但是不准用,即废弃了而已)
    // 模拟线程中断
    interrupt(threadName) {
        if (!threadName) { throw new Error('Miss function parameters') }
        if (threadMap[threadName]) {
            threadMap[threadName].isInterrupted = true;
        }
    }
    // 获取线程中断状态
    isInterrupted(threadName) {
        if (!threadName) { throw new Error('Miss function parameters') }
        // !!的作用是:将undefined转为false
        return !!threadMap[threadName].isInterrupted;
    }
A4. join方法
join(threadName): "线程"同步,调用此方法的"线程"函数将在threadName执行结束后继续执行
join方法和上面的sleep方法是一样的道理,我们让它返回一个Promise,只要我们不调resolve,那么外部修饰Promise的await语句就会一直暂停,等到join的那个另一个线程执行完了,我们看准时机!把这个Promise给resolve,这时候外部修饰Promise的await语句不就又可以向下执行了吗?
但问题在于:我们如何实现这个“一个函数执行完通知另一个函数的功能呢”?没错!那就是我们JavaScript最喜欢的套路: 事件流! 我们下面使用event-emitter这个前后端通用的模块实现事件流。
我们只要在任何一个函数结束的时候触发结束事件(join-finished),同时传递该线程的函数名作为参数,然后在join方法内部监听该事件,并在响应时候调用resolve方法不就可以了嘛。
首先是在join方法内部监听线程函数的结束事件
import ee from 'event-emitter';
const emitter = ee();
// 模拟线程同步
join(threadName, targetThread) {
  return new Promise((resolve) => {
    // 监听其他线程函数的结束事件
    emitter.on('join-finished', (finishThread) => {
      // 根据结束线程的线程名finishThread做判断
      if (finishThread === targetThread) {
        resolve();
      }
    })
  })
}
同时在线程函数执行结束时触发join-finished事件,传递线程名做参数
import ee from 'event-emitter';
const emitter = ee();
function submit(func, name) {
   // ...
    Promise.resolve({
        then: func().then(() => {
          emitter.emit('join-finished', threadName);
        })
    });
}
使用如下:
submit(async function thread1 () {
  this.join('thread2');
  console.log(1);
});
submit(async function thread2 () {
  this.sleep(3000);
  console.log(2)
})
// 3s后,依次输出 2 1

A5. Lock.lock & Lock.unlock(非公平锁)

我们主要是要编写两个方法:lock和unlock方法。我们需要设置一个Boolean属性isLock
  • lock方法:lock方法首先会判断isLock是否为false,如果是,则代表没有线程占领临界区,那么允许该线程进入临界区,同时把isLock设置为true,不允许其他线程函数进入。其他线程进入时,由于判断isLock为true,会setTimeOut每隔一段时间递归调用判断isLock是否为false,从而以较低性能消耗的方式模拟while死循环。当它们检测到isLock为false时候,则会进入临界区,同时设置isLock为true。因为后面的线程没有先后顺序,所以这是一个非公平锁
  • unLock方法:unlock则是把isLock属性设置为false,解除锁定就可以了
// 这是一个非公平锁
class Lock {
    constructor() {
        this.isLock = false;
    }
    //加锁
    lock() {
        if (this.isLock) {
            const self = this;
            // 循环while死循环,不停测试isLock是否等于false
            return new Promise((resolve) => {
                (function recursion() {
                    if (!self.isLock) {
                        // 占用锁
                        self.isLock = true;
                        // 使外部await语句继续往下执行
                        resolve();
                        return;
                    }
                    setTimeout(recursion, 100);
                })();
            });
        } else {
            this.isLock = true;
            return Promise.resolve();
        }
    }
    // 解锁
    unLock() {
        this.isLock = false;
    }
}
const lockObj = new Lock();
export default lockObj;
运行示例如下:
async function commonCode() {
    await Lock.lock();
    await Executor.sleep(3000);
    Lock.unLock();
}

submit(async function example1() {
    console.log('example1 start')
    await commonCode();
    console.log('example1 end')
});
submit(async function example2() {
    console.log('example2 start')
    await commonCode();
    console.log('example2 end')
});
// 立即输出
example1 start
example2 start
// 3秒后输出
example1 end
// 再3秒后输出
example2 end

A6. Condition.wait & Condition.notify(条件变量)

  • Condition.wait:不具备执行条件,线程进入waiting状态,等待被唤醒
  • Condition.notify: 唤醒线程
对不起!写到这里,我实在是口干舌燥,写不下去了,但是道理和前面是一样的:
无非是:事件监听 + Promise + Async函数组合拳,一套搞定
import ee from 'event-emitter';
const ev = ee();

class Condition {
    constructor() {
        this.n = 0;
        this.list = [];
    }
    // 当不满足条件时,让线程处于等待状态
    wait() {
        return new Promise((resolve) => {
            const eventName = `notify-${this.n}`;
            this.n++;
            const list = this.list;
            list.push(eventName);
            ev.on(eventName, () => {
                // 从列表中删除事件名
                const i = list.indexOf(eventName);
                list.splice(i, 1);
                // 让外部函数恢复执行
                debugger;
                resolve();
            })
        })
    }
    // 选择一个线程唤醒
    notify() {
        const list = this.list;
        let i = Math.random() * (this.list.length - 1);
        i = Math.floor(i);
        ev.emit(list[i])
    }
}
async function testCode() {
    console.log('i will be wait');
    if (true) {
        await Condition.wait();
    };
    console.log('i was notified ');
}

submit(async function example() {
    testCode();
    setTimeout(() => {
        Condition.notify();
    }, 3000);
});
i will be wait
// 3秒后输出
i was notified

最后的大总结

其实说到底,我想和大家分享的不是什么并发啊,什么多线程啦。
其实我想表达的是:事件监听 + Promise + Async函数这套组合拳很好用啊
  • 你想让一段代码停一下?OK!写个返回Promise的函数,用await修饰,它就停啦!
  • 你想控制它(await)不要停了,继续往下走?OK! 把Promise给resolve掉,它就往下走啦
  • 你说你不知道怎么控制它停,因为监听和发射事件的代码分布在两个地方?OK!那就使用事件流
本文完,下面是全部项目代码(刚写了文章才发现有bug,待会改改)

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK