2

Node.js 设计模式笔记 —— Streams 流编程

 1 year ago
source link: https://rollingstarky.github.io/2022/11/12/node-js-design-patterns-streams/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Streams 是 Node.js 的组件和模式中最重要的几个之一。在 Node.js 这类基于 event 的平台上,最高效的实时地处理 I/O 的方式,就是当有输入时就立即接收数据,应用产生输出时就立即发送数据。

Buffering vs streaming

对于输入数据的处理,buffer 模式会将来自资源的所有数据收集到 buffer 中,待操作完成再将数据作为单一的 blob of data 传递给调用者;相反地,streams 允许我们一旦接收到数据就立即对其进行处理。
单从效率上说,streams 在空间(内存使用)和时间(CPU 时钟)的使用上都更加高效。此外 Node.js 中的 streams 还有另一个重要的优势:组合性

使用 buffered API 完成 Gzip 压缩:

import {promises as fs} from 'fs'
import {gzip} from 'zlib'
import {promisify} from 'util'

const gzipPromise = promisify(gzip)
const filename = process.argv[2]

async function main() {
const data = await fs.readFile(filename)
const gzippedData = await gzipPromise(data)
await fs.writeFile(`${filename}.gz`, gzippedData)
console.log('File successfully compressed')
}

main()

node gzip-buffer.js <path to file>

如果我们使用上述代码压缩一个足够大的文件(比如说 8G),我们很有可能会收到一个错误信息,类似文件大小超过了允许的最大 buffer 大小。

RangeError [ERR_FS_FILE_TOO_LARGE]: File size (8130792448) is greater
than possible Buffer: 2147483647 bytes

即便没有超过 V8 的 buffer 大小限制,也有可能出现物理内存不够用的情况。

使用 streams 实现 Gzip 压缩:

import {createReadStream, createWriteStream} from 'fs'
import {createGzip} from 'zlib'

const filename = process.argv[2]

createReadStream(filename)
.pipe(createGzip())
.pipe(createWriteStream(`${filename}.gz`))
.on('finish', () => console.log('File successfully compressed'))

streams 的优势来自于其接口和可组合性,允许我们实现干净、优雅、简洁的代码。对于此处的示例,它可以对任意大小的文件进行压缩,只需要消耗常量的内存。

假设我们需要创建一个应用,能够压缩一个文件并将其上传到一个远程的 HTTP 服务器。而服务器端则负责将接收到的文件解压缩并保存。
如果我们使用 buffer API 实现客户端组件,则只有当整个文件读取和压缩完成之后,上传操作才开始触发。同时在服务器端,也只有当所有数据都接收完毕之后才开始解压缩操作。

更好一些的方案是使用 streams。在客户端,streams 允许我们以 chunk 为单位从文件系统逐个、分段地读取数据,并立即进行压缩和发送。同时在服务器端,每个 chunk 被接收到后会立即进行解压缩。

服务端程序:

import {createServer} from 'http'
import {createWriteStream} from 'fs'
import {createGunzip} from 'zlib'
import {basename, join} from 'path'

const server = createServer((req, res) => {
const filename = basename(req.headers['x-filename'])
const destFilename = join('received_files', filename)
console.log(`File request received: ${filename}`)
req
.pipe(createGunzip())
.pipe(createWriteStream(destFilename))
.on('finish', () => {
res.writeHead(201, {'Content-Type': 'text/plain'})
res.end('OK\n')
console.log(`File saved: ${destFilename}`)
})
})

server.listen(3000, () => console.log('Listening on http://localhost:3000'))

客户端程序:

import {request} from 'http'
import {createGzip} from 'zlib'
import {createReadStream} from 'fs'
import {basename} from 'path'

const filename = process.argv[2]
const serverHost = process.argv[3]

const httpRequestOptions = {
hostname: serverHost,
port: 3000,
path: '/',
method: 'PUT',
headers: {
'Content-Type': 'application/octet-stream',
'Content-Encoding': 'gzip',
'X-Filename': basename(filename)
}
}

const req = request(httpRequestOptions, (res) => {
console.log(`Server response: ${res.statusCode}`)
})

createReadStream(filename)
.pipe(createGzip())
.pipe(req)
.on('finish', () => {
console.log('File successfully sent')
})

mkdir received_files
node gzip-receive.js
node gzip-send.js <path to file> localhost

借助 streams,整套流程的流水线在我们接收到第一个数据块的时候就开始启动了,完全不需要等待整个文件被读取。除此之外,下一个数据块能够被读取时,不需要等到之前的任务完成就能被处理。即另一条流水线被并行地被装配执行,Node.js 可以将这些异步的任务并行化地执行。只需要保证数据块最终的顺序是固定的,而 Node.js 中 streams 的内部实现机制保证了这一点。

借助于 pipe() 方法,不同的 stream 能够被组合在一起。每个处理单元负责各自的单一功能,最终被 pipe() 连接起来。因为 streams 拥有统一的接口,它们彼此之间在 API 层面是互通的。只需要 pipeline 支持前一个 stream 生成的数据类型(可以是二进制、纯文本甚至对象等)。

客户端加密
import {createCipheriv, randomBytes} from 'crypto'
import {request} from 'http'
import {createGzip} from 'zlib'
import {createReadStream} from 'fs'
import {basename} from 'path'

const filename = process.argv[2]
const serverHost = process.argv[3]
const secret = Buffer.from(process.argv[4], 'hex')
const iv = randomBytes(16)

const httpRequestOptions = {
hostname: serverHost,
port: 3000,
path: '/',
method: 'PUT',
headers: {
'Content-Type': 'application/octet-stream',
'Content-Encoding': 'gzip',
'X-Filename': basename(filename),
'X-Initialization-Vector': iv.toString('hex')
}
}

const req = request(httpRequestOptions, (res) => {
console.log(`Server response: ${res.statusCode}`)
})

createReadStream(filename)
.pipe(createGzip())
.pipe(createCipheriv('aes192', secret, iv))
.pipe(req)
.on('finish', () => {
console.log('File successfully sent')
})
服务端加密
import {createServer} from 'http'
import {createWriteStream} from 'fs'
import {createGunzip} from 'zlib'
import {basename, join} from 'path'
import {createDecipheriv, randomBytes} from 'crypto'

const secret = randomBytes(24)
console.log(`Generated secret: ${secret.toString('hex')}`)

const server = createServer((req, res) => {
const filename = basename(req.headers['x-filename'])
const iv = Buffer.from(
req.headers['x-initialization-vector'], 'hex'
)
const destFilename = join('received_files', filename)
console.log(`File request received: ${filename}`)
req
.pipe(createDecipheriv('aes192', secret, iv))
.pipe(createGunzip())
.pipe(createWriteStream(destFilename))
.on('finish', () => {
res.writeHead(201, {'Content-Type': 'text/plain'})
res.end('OK\n')
console.log(`File saved: ${destFilename}`)
})
})

server.listen(3000, () => console.log('Listening on http://localhost:3000'))

Streams 详解

实际上在 Node.js 中的任何地方都可见到 streams。比如核心模块 fs 有 createReadStream() 方法用来读取文件内容,createWriteStream() 方法用来向文件写入数据;HTTP requestresponse 对象本质上也是 stream;zlib 模块允许我们通过流接口压缩和解压缩数据;甚至 crypto 模块也提供了一些有用的流函数比如 createCipherivcreateDecipheriv

streams 的结构

Node.js 中的每一个 stream 对象,都是对以下四种虚拟基类里任意一种的实现,这四个虚拟类都属于 stream 核心模块:

  • Readable
  • Writable
  • Duplex
  • Transform

每一个 stream 类同时也是 EventEmitter 的实例,实际上 Streams 可以生成几种类型的 event。比如当一个 Readable 流读取完毕时触发 end 事件,Writable 流吸入完毕时触发 finish 事件,或者当任意错误发生时抛出 error

Steams 之所以足够灵活,一个重要的原因就是它们不仅仅能够处理 binary data,还支持几乎任意的 JavaScript 值。实际上 streams 有以下两种操作模式:

  • Binary mode:以 chunk 的形式(比如 buffers 或 strings)传输数据
  • Object mode:通过由独立对象(可以包含任意 JavaScript 值)组成的序列传输数据

上述两种模式使得我们不仅仅可以利用 streams 处理 I/O 操作,还能够帮助我们以函数式的方式将多个处理单元优雅地组合起来。

从 Readable streams 读取数据

non-flowing mode

默认模式。readable 事件表示有新的数据可供读取,再通过 read() 方法同步地从内部 buffer 读取数据,返回一个 Buffer 对象。
即从 stream 按需拉取数据。当 stream 以 Binary 模式工作时,我们还可以给 read() 方法指定一个 size 值,以读取特定数量的数据。

process.stdin
.on('readable', () => {
let chunk
console.log('New data available')
while ((chunk = process.stdin.read()) !== null) {
console.log(
`Chunk read (${chunk.length} bytes): "${chunk.toString()}"`
)
}
})
.on('end', () => console.log('End of stream'))
flowing mode

此模式下,数据并不会像之前那样通过 read() 方法拉取,而是一旦有数据可用,就主动推送给 data 事件的 listener。flowing 模式对于数据流的控制,相对而言灵活性较低一些。
由于默认是 non-flowing 模式,为了使用 flowing 模式,需要绑定一个 listener 给 data 事件或者显式地调用 resume() 方法。调用 pause() 方法会导致 stream 暂时停止发送 data 事件,任何传入的数据会先被缓存到内部 buffer。即 stream 又切换回 non-flowing 模式。

process.stdin
.on('readable', () => {
let chunk
console.log('New data available')
while ((chunk = process.stdin.read()) !== null) {
console.log(
`Chunk read (${chunk.length} bytes): "${chunk.toString()}"`
)
}
})
.on('end', () => console.log('End of stream'))
Async iterators

Readable 流同时也是 async iterators。

async function main() {
for await (const chunk of process.stdin) {
console.log('New data available')
console.log(
`Chunk read (${chunk.length} bytes): "${chunk.toString()}"`
)
}
console.log('End of stream')
}

main()

实现 Readable streams

import {Readable} from 'stream'
import Chance from 'chance'

const chance = Chance()

export class RandomStream extends Readable {
constructor(options) {
super(options)
this.emittedBytes = 0
}

_read(size) {
const chunk = chance.string({length: size})
this.push(chunk, 'utf8')
this.emittedBytes += chunk.length
if (chance.bool({likelihood: 5})) {
this.push(null)
}
}
}

const randomStream = new RandomStream()
randomStream
.on('data', (chunk) => {
console.log(`Chunk received (${chunk.length} bytes): ${chunk.toString()}`)
})

为了实现一个自定义的 Readable stream,首先必须创建一个新的类,该类继承自 stream 模块中的 Readable。其次新创建的类中必须包含 _read() 方法的实现。
上面代码中的 _read() 方法做了以下几件事:

  • 借助第三方的 chance 模块,生成一个长度为 size 的随机字符串
  • 通过 push() 方法将字符传推送到内部 buffer
  • 依据 5% 的几率自行终止,终止时推送 null 到内部 buffer,作为 stream 的结束标志

简化版实现

import {Readable} from 'stream'
import Chance from 'chance'

const chance = new Chance()
let emittedBytes = 0

const randomStream = new Readable({
read(size) {
const chunk = chance.string({length: size})
this.push(chunk, 'utf8')
emittedBytes += chunk.length
if (chance.bool({likelihood: 5})) {
this.push(null)
}
}
})

randomStream
.on('data', (chunk) => {
console.log(`Chunk received (${chunk.length} bytes): ${chunk.toString()}`)
})
从可迭代对象创建 Readable streams

Readable.from() 方法支持从数组或者其他可迭代对象(比如 generators, iterators, async iterators)创建 Readable streams。

import {Readable} from 'stream'

const mountains = [
{name: 'Everest', height: 8848},
{name: 'K2', height: 8611},
{name: 'Kangchenjunga', height: 8586},
{name: 'Lhotse', height: 8516},
{name: 'Makalu', height: 8481}
]

const mountainsStream = Readable.from(mountains)
mountainsStream.on('data', (mountain) => {
console.log(`${mountain.name.padStart(14)}\t${mountain.height}m`)
})

Writable streams

向流写入数据

write() 方法可以向 Writable stream 写入数据。
writable.write(chunk, [encoding], [callback])

end() 方法可以向 stream 表明没有更多的数据需要写入。
writable.end([chunk], [encoding], [callback])

callback 回调函数等同于为 finish 事件注册了一个 listener,会在流中写入的所有数据刷新到底层资源中时触发。

import {createServer} from 'http'
import Chance from 'chance'

const chance = new Chance()
const server = createServer((req, res) => {
res.writeHead(200, {'Content-Type': 'text/plain'})
while (chance.bool({likelihood: 95})) {
res.write(`${chance.string()}\n`)
}
res.end('\n\n')
res.on('finish', () => console.log('All data sent'))
})

server.listen(8080, () => {
console.log('listening on http://localhost:8080')
})

上面代码中 HTTP 服务里的 res 对象是一个 http.ServerResponse 对象,实际上也是一个 Writable stream。

实现 Writable stream
import {Writable} from 'stream'
import {promises as fs} from 'fs'

class ToFileStream extends Writable {
constructor(options) {
super({...options, objectMode: true})
}

_write(chunk, encoding, cb) {
fs.writeFile(chunk.path, chunk.content)
.then(() => cb())
.catch(cb)
}
}

const tfs = new ToFileStream()

tfs.write({path: 'file1.txt', content: 'Hello'})
tfs.write({path: 'file2.txt', content: 'Node.js'})
tfs.write({path: 'file3.txt', content: 'streams'})
tfs.end(() => console.log('All files created'))

简化形式

import {Writable} from 'stream'
import {promises as fs} from 'fs'

const tfs = new Writable({
objectMode: true,
write(chunk, encoding, cb) {
fs.writeFile(chunk.path, chunk.content)
.then(() => cb())
.catch(cb)
}
})

tfs.write({path: 'file1.txt', content: 'Hello'})
tfs.write({path: 'file2.txt', content: 'Node.js'})
tfs.write({path: 'file3.txt', content: 'streams'})
tfs.end(() => console.log('All files created'))

Duplex streams

Duplex 流,既 Readable 又 Writable 的流。它的场景在于,有时候我们描述的实体既是数据源,也是数据的接收者,比如网络套接字。
Duplex 流同时继承来着 stream.Readablestream.Writable 的方法。
为了创建一个自定义的 Duplex 流,我们必须同时提供 _read()_write() 的实现。

Transform streams

Transform 流是一种特殊类型的 Duplex 流,主要针对数据的转换。
对于 Duplex 流来说,流入和流出的数据之间并没有直接的联系。比如一个 TCP 套接字,只是从远端接收或者发送数据,套接字本身不知晓输入输出之间的任何关系。

Duplex stream

而 Transform 流则会对收到的每一段数据都应用某种转换操作,从 Writable 端接收数据,进行某种形式地转换后再通过 Readable 端提供给外部。

Transform stream

实现 Transform 流
import {Transform} from 'stream'

class ReplaceStream extends Transform {
constructor(searchStr, replaceStr, options) {
super({...options})
this.searchStr = searchStr
this.replaceStr = replaceStr
this.tail = ''
}

_transform(chunk, encoding, callback) {
const pieces = (this.tail + chunk).split(this.searchStr)
const lastPiece = pieces[pieces.length - 1]
const tailLen = this.searchStr.length - 1
this.tail = lastPiece.slice(-tailLen)
pieces[pieces.length - 1] = lastPiece.slice(0, -tailLen)
this.push(pieces.join(this.replaceStr))
callback()
}

_flush(callback) {
this.push(this.tail)
callback()
}
}


const replaceStream = new ReplaceStream('World', 'Node.js')
replaceStream.on('data', chunk => console.log(chunk.toString()))
replaceStream.write('Hello W')
replaceStream.write('orld')
replaceStream.end()

其中核心的 _transform() 方法,其有着和 Writable 流的 _write() 方法基本一致的签名,但并不会将处理后的数据写入底层资源,而是通过 this.push() 推送给内部 buffer,正如 Readable 流中 _read() 方法的行为。
所以形成了 Transform 流整体上接收、转换、发送的行为。
_flush() 则会在流结束前调用。

简化形式

import {Transform} from 'stream'

const searchStr = 'World'
const replaceStr = 'Node.js'
let tail = ''

const replaceStream = new Transform({
defaultEncoding: 'utf-8',

transform(chunk, encoding, cb) {
const pieces = (tail + chunk).split(searchStr)
const lastPiece = pieces[pieces.length - 1]
const tailLen = searchStr.length - 1
tail = lastPiece.slice(-tailLen)
pieces[pieces.length - 1] = lastPiece.slice(0, -tailLen)
this.push(pieces.join(replaceStr))
cb()
},
flush(cb) {
this.push(tail)
cb()
}
})
replaceStream.on('data', chunk => console.log(chunk.toString()))
replaceStream.write('Hello W')
replaceStream.write('orld')
replaceStream.end()

Transform 流筛选和聚合数据

数据源 data.csv

type,country,profit
Household,Namibia,597290.92
Baby Food,Iceland,808579.10
Meat,Russia,277305.60
Meat,Italy,413270.00
Cereal,Malta,174965.25
Meat,Indonesia,145402.40
Household,Italy,728880.54

package.json:

{
"type": "module",
"main": "index.js",
"dependencies": {
"csv-parse": "^4.10.1"
},
"engines": {
"node": ">=14"
},
"engineStrict": true
}

FilterByCountry Transform 流 filter-by-country.js

import {Transform} from 'stream'

export class FilterByCountry extends Transform {
constructor(country, options = {}) {
options.objectMode = true
super(options)
this.country = country
}

_transform(record, enc, cb) {
if (record.country === this.country) {
this.push(record)
}
cb()
}
}

SumProfit Transform 流 sum-profit.js

import {Transform} from 'stream'

export class SumProfit extends Transform {
constructor(options = {}) {
options.objectMode = true
super(options)
this.total = 0
}

_transform(record, enc, cb) {
this.total += Number.parseFloat(record.profit)
cb()
}

_flush(cb) {
this.push(this.total.toString())
cb()
}
}

index.js

import {createReadStream} from 'fs'
import parse from 'csv-parse'
import {FilterByCountry} from './filter-by-conutry.js'
import {SumProfit} from './sum-profit.js'

const csvParser = parse({columns: true})

createReadStream('data.csv')
.pipe(csvParser)
.pipe(new FilterByCountry('Italy'))
.pipe(new SumProfit())
.pipe(process.stdout)

Node.js Design Patterns: Design and implement production-grade Node.js applications using proven patterns and techniques, 3rd Edition


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK