7

Node.js Backpressuring(背压)

 3 years ago
source link: https://blog.yourtion.com/nodejs-backpressuring.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Node.js Backpressuring(背压)

Yourtion 创作于:2021-03-02     全文约 7325 字, 预计阅读时间为 21 分钟

通常在数据处理的时候我们会遇到一个普遍的问题:背压,意思是在数据传输过程中有一大堆数据在缓存之后积压着。每次当数据到达结尾又遇到复杂的运算,又或者无论什么原因它比预期的慢,这样累积下来,从源头来的数据就会变得很庞大,像一个塞子一样堵塞住。

为解决这个问题,必须存在一种适当的代理机制,确保流从一个源流入另外一个的时候是平滑顺畅的。不同的社区组织针对他们各自的问题单独做了解决,好例子比如 Unix 的管道和 TCP 的 Socket。此问题经常与流控制相关。在 Node.js 中,流已经是被采纳的解决方案。

此文的目的在于详细深入介绍什么是背压,并从代码角度详细解释在 Node.js 中,流是如何针对此问题进行处理的。本文的第二部分将给予你实现流的功能时最佳实践,以确保你的程序既安全又精简。

我们假定你对 Node.js 中的背压,BufferEventEmitterStream 的基本概念有一点了解。如果你尚未完整阅读过 API 文档,那么最好是先看一下相关 API 说明,它也会有助于你扩展理解本文的主旨。

处理数据中遇到的问题

我们使用 .pipe() 从一个数据源终端到另外一个终端,不过没有使用任何出错处理机制。如果一大堆数据出错了但是又要被接收, ReadStreamgzip 流不会被销毁。

pump(pump 是一个工具类,如果有某个流发生错误或者关闭,它会自动销毁相关所有的流)对于 Node.js 8.x 以及先前版本是必须的。但对于 10.x 和之后的版本而言,我们引入了 pipeline 来取而代之。这是一个模块化函数,用于对接不同的数据流,可以处理异常错误并善后清理释放资源。它同时也提供了一个回调函数——当整个 pipeline 任务完成时将触发。

这里给出一个例子,告诉你如何使用 pipeline

const { pipeline } = require("stream");
const fs = require("fs");
const zlib = require("zlib");

// Use the pipeline API to easily pipe a series of streams
// together and get notified when the pipeline is fully done.
// A pipeline to gzip a potentially huge video file efficiently:

pipeline(
  fs.createReadStream("The.Matrix.1080p.mkv"),
  zlib.createGzip(),
  fs.createWriteStream("The.Matrix.1080p.mkv.gz"),
  (err) => {
    if (err) {
      console.error("Pipeline failed", err);
    } else {
      console.log("Pipeline succeeded");
    }
  }
);

// async
const util = require("util");
const pipeline = util.promisify(stream.pipeline);

async function run() {
  try {
    await pipeline(
      fs.createReadStream("The.Matrix.1080p.mkv"),
      zlib.createGzip(),
      fs.createWriteStream("The.Matrix.1080p.mkv.gz")
    );
    console.log("Pipeline succeeded");
  } catch (err) {
    console.error("Pipeline failed", err);
  }
}
run().then();

数据太多,速度太快

有太多的例子证明有时 Readable 传输给 Writable 的速度远大于它接受和处理的速度!

如果发生了这种情况,消费者开始为后面的消费而将数据列队形式积压起来。写入队列的时间越来越长,也正因为如此,更多的数据不得不保存在内存中知道整个流程全部处理完毕。

写入磁盘的速度远比从磁盘读取数据慢得多,因此,当我们试图压缩一个文件并写入磁盘时,背压的问题也就出现了。因为写磁盘的速度不能跟上读磁盘的速度。

这就是为什么说背压机制很重要——如果背压机制不存在,进程将用完你全部的系统内存,从而对其它进程产生显著影响,它独占系统大量资源直到任务完成为止。

这最终导致一些问题:

  • 明显使得其它进程处理变慢
  • 太多繁重的垃圾回收

没有合适的流来处理背压,同样的进程处理就会产生一个内存占用数量级的差异!

背压是怎么处理这些问题的?

我们有不同的函数将数据从一个进程传入另外一个进程。在 Node.js 中,有一个内置函数称为 .pipe(),同样地,你们也可以使用其它工具包。最终,在这个进程的基本层面上我们有二个互不相关的组件:数据的 source 和 consumer。

.pipe() 被源调用之后,它通知消费者有数据需要传输。管道函数为事件触发建立了合适的背压封装。

在 Node.js 中,源头是一个 Readable 的流,消费者是 Writable 流(它们两者可能通过 DuplexTransform 进行交互)。

当背压被触发的一刹那,它可以被缩小到 Writable.write() 函数的返回值。这是根据一些条件所决定的。

在数据缓存超出了 highWaterMark 或者写入的列队处于繁忙状态,.write() 会返回 false

false 返回之后,背压系统介入了。它将暂停从任何发送数据的数据流中进入的 Readable。一旦数据流清空了,'drain' 事件将被触发,消耗进来的数据流。

一旦队列全部处理完毕,背压机制将允许允许数据再次发送。在使用中的内存空间将自我释放,同时准备接收下一次的批量数据。

这个有效的举措允许一大堆锁住的内存可以为 .pipe() 函数随时使用而并没有内存泄露、无限扩大的内存缓冲。同时垃圾回收器仅需要处理一处地方。

所以,背压既然如此重要,为什么还有理由说你没有听说过呢?显然答案很明显:Node.js 为你做了一切。

注意:对于大部分机器,存在着一个字节的大小用以决定一个缓存是否已经满了(不同机器此值有变化)。Node.js 将允许你设置你自己的 highWaterMark。但是通常来说,默认是设置为 16kb(16384,对于对象模型流而言是 16)。在某些实例中你或许想提高那个值,尽管去提高吧,但是也要小心使用!

.pipe() 的生命周期

为了对背压有一个更好的理解,这里有一副 Readable 流正通过 piped 流入 Writable 流的整个生命周期图:

                                                     +===================+
                         x-->  Piping functions   +-->   src.pipe(dest)  |
                         x     are set up during     |===================|
                         x     the .pipe method.     |  Event callbacks  |
  +===============+      x                           |-------------------|
  |   Your Data   |      x     They exist outside    | .on('close', cb)  |
  +=======+=======+      x     the data flow, but    | .on('data', cb)   |
          |              x     importantly attach    | .on('drain', cb)  |
          |              x     events, and their     | .on('unpipe', cb) |
+---------v---------+    x     respective callbacks. | .on('error', cb)  |
|  Readable Stream  +----+                           | .on('finish', cb) |
+-^-------^-------^-+    |                           | .on('end', cb)    |
  ^       |       ^      |                           +-------------------+
  |       |       |      |
  |       ^       |      |
  ^       ^       ^      |    +-------------------+         +=================+
  ^       |       ^      +---->  Writable Stream  +--------->  .write(chunk)  |
  |       |       |           +-------------------+         +=======+=========+
  |       |       |                                                 |
  |       ^       |                              +------------------v---------+
  ^       |       +-> if (!chunk)                |    Is this chunk too big?  |
  ^       |       |     emit .end();             |    Is the queue busy?      |
  |       |       +-> else                       +-------+----------------+---+
  |       ^       |     emit .write();                   |                |
  |       ^       ^                                   +--v---+        +---v---+
  |       |       ^-----------------------------------<  No  |        |  Yes  |
  ^       |                                           +------+        +---v---+
  ^       |                                                               |
  |       ^               emit .pause();          +=================+     |
  |       ^---------------^-----------------------+  return false;  <-----+---+
  |                                               +=================+         |
  |                                                                           |
  ^            when queue is empty     +============+                         |
  ^------------^-----------------------<  Buffering |                         |
               |                       |============|                         |
               +> emit .drain();       |  ^Buffer^  |                         |
               +> emit .resume();      +------------+                         |
                                       |  ^Buffer^  |                         |
                                       +------------+   add chunk to queue    |
                                       |            <---^---------------------<
                                       +============+

注意:如果你创建一些管道准备把一些流串联起来从而操纵数据,你应该实现 Transform 流。

在这种情况下,从 Readable 流中的输出进入 Transform,并且会被管道输送进入 Writable

Readable.pipe(Transformable).pipe(Writable);

背压将被自动应用,但是同时请注意输入和输出 TransformhighWaterMark 可以手动控制,并且会影响到背压系统。

实现用户自定义流须知

流的黄金法则是总是接受背压。作为最佳实践的构成是不矛盾的实践。只要你小心避免与内部背压支持冲突的行为,你可以确信你在遵循良好的实践。

  1. 没有特殊要求下,绝对不要用 .push()
  2. 在流返回 false 后不要调用 .write() 方法,而是等待 'drain'
  3. 流在不同的 Node.js 版本和库中是有变化的,小心使用并进行相应的测试

对于可读流的规则

因为 Node.js 的功能,数据从 Readable 流到 Writable 流。但是正如我们在数据流传输过程中我们观察到,source 和 Readable 目标一样重要, Readable 流对于背压是如何处理的至关重要。

这两个过程相互依赖地进行有效沟通,如果 Readable 流在 Writable 流需要它停止发送数据的时候忽略了,那就会跟 .write() 的返回值不正确时一样,产生相应的问题。

所以,除了谨慎对待 .write() 方法的返回值,我们同样要小心在 ._read() 方法中使用 .push() 方法的返回值。如果 .push() 方法返回一个 false,流就会停止从源读数据。否则,它就不会停止而继续读下去。

关于可写流的规则

重新调用 .write() 方法根据一些条件可能返回 true 或者 false。幸运地是,当我们构建属于自己的 Writable 流的时候,流状态机(stream state machine)会处理我们的回调,并且决定什么时候处理背压并且为我们简化数据流。

但是当我们需要直接使用 Writable 流时,我们必须考虑 .write() 方法返回的值,并且注意到以下一些情况:

  • 如果写队列确实繁忙,.write() 方法将返回 false
  • 如果数据块太大,.write() 方法将返回 false(限定通过 highWaterMark 决定)

流经常作为一个模块用于 Node.js 中,对于内部的系统结构而言非常重要。对于开发者而言,可以通过 Node.js 扩展连接应答系统。

现在我们希望你有能力进行故障排除,记住了是如何为你的 WritableReadable 流编写背压处理的。并且你还可以把这些知识分享给你的同事和朋友们。

在此之后请仔细阅读更多的有关 Stream 其它 API 函数,这样有助于当你在构建 Node.js 的应用程序之时更好地理解关于流的能力。

  • https://nodejs.org/en/docs/guides/backpressuring-in-streams/
  • https://nodejs.org/zh-cn/docs/guides/backpressuring-in-streams/

原文链接:https://blog.yourtion.com/nodejs-backpressuring.html


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK