

Node.js 设计模式笔记 —— 消息中间件及其应用模式(任务分发)
source link: https://rollingstarky.github.io/2023/01/12/node-js-design-patterns-message-broker-pattern-task-distribution/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Node.js 设计模式笔记 —— 消息中间件及其应用模式(任务分发)
将高成本的任务委派给多个工作节点,这种类型的应用并不适合由 Pub/Sub 模式实现。因为我们并不想同一个任务被多个消费者收到,相反我们更需要一种类似负载均衡的消息分发模式。在消息系统术语中,也被称为 competing consumers,fanout distribution 或 ventilator。
与 HTTP 负载均衡器不同的是,任务分发系统中的消费者是一种更活跃的角色。绝大多数时候都是消费者连接到任务队列,请求新的任务。这一点在可扩展系统中非常关键,允许我们在不修改生产者部分的情况下,直接平滑地增加工作节点的数量。
此外,在一个通用的消息系统中,我们没有必要强调生产者和消费者之间的请求/响应通信。多数情况下,更优先的选择是使用单向的异步通信,从而获得更优异的并行能力和扩展性。消息基本上总是沿着一个方向流动,这样的管道允许我们构建复杂的信息处理架构,又不必承受同步通信带来的开销。
ZeroMQ Fanout/Fanin 模式
分布式 hashsum 破解器
需要以下组件实现一个标准的并行管线:
- 一个协调节点负责在多个工作节点间分发任务
- 多个工作节点承担具体的计算任务
- 一个用于收集计算结果的节点
即一个节点负责生成所有可能的字符串组合,并将它们分发给不同的工作节点;工作节点则负责计算接收到的字符串,比较 hash 值;最后一个节点负责收集暴力破解的结果。
实现 producer
为了表示所有可能的字符组合,这里使用 N 维索引树。每个节点包含一个当前位置下可能出现的字母,比如只有 a
、b
两个字母的话,长度为 3 的字符串组合共有图示的以下几种:
indexed-string-variation
包可以帮助我们由索引计算出对应的字符串,这项工作可以在工作节点完成,因此 producer 这里只需要将分好组的索引值分发给工作节点。
generateTasks.js:
export function* generateTasks(searchHash, alphabet,
maxWordLength, batchSize) {
let nVariations = 0
for (let n = 1; n <= maxWordLength; n++) {
nVariations += Math.pow(alphabet.length, n)
}
console.log('Finding the hashsum source string over ' +
`${nVariations} possible variations`)
let batchStart = 1
while (batchStart <= nVariations) {
const batchEnd = Math.min(
batchStart + batchSize - 1, nVariations)
yield {
searchHash,
alphabet: alphabet,
batchStart,
batchEnd
}
batchStart = batchEnd + 1
}
}
producer.js:
import zmq from 'zeromq'
import delay from 'delay'
import { generateTasks } from './generateTasks.js'
const ALPHABET = 'abcdefghijklmnopqrstuvwxyz'
const BATCH_SIZE = 10000
const [, , maxLength, searchHash] = process.argv
async function main() {
const ventilator = new zmq.Push()
await ventilator.bind('tcp://*:5016')
await delay(1000)
const generatorObj = generateTasks(searchHash, ALPHABET, maxLength, BATCH_SIZE)
for (const task of generatorObj) {
await ventilator.send(JSON.stringify(task))
}
}
main().catch(err => console.log(err))
- 创建一个 PUSH socket 并绑定给本地的 5016 端口,工作节点的 PULL socket 会连接到此端口并接收任务
- 将每一个生成的任务字符串化,通过 PUSH socket 的
send()
方法发送给工作节点。工作节点以轮询的方式接收不同的任务
实现 worker
process Task.js:
import isv from 'indexed-string-variation'
import { createHash } from 'crypto'
export function processTask(task) {
const variationGen = isv.generator(task.alphabet)
console.log('processing from ' +
`${variationGen(task.batchStart)} (${task.batchStart})` +
`to ${variationGen(task.batchEnd)} (${task.batchEnd}`)
for (let idx = task.batchStart; idx <= task.batchEnd; idx++) {
const word = variationGen(idx)
const shasum = createHash('sha1')
shasum.update(word)
const digest = shasum.digest('hex')
if (digest === task.searchHash) {
return word
}
}
}
processTask()
遍历给定区间内的所有索引值,对每一个索引生成对应的字符串,再计算其 SHA1 值,与传入的 task
对象中的 searchHash
比较。
worker.js:
import zmq from 'zeromq'
import { processTask } from './processTask.js'
async function main() {
const fromVentilator = new zmq.Pull()
const toSink = new zmq.Push()
fromVentilator.connect('tcp://localhost:5016')
toSink.connect('tcp://localhost:5017')
for await (const rawMessage of fromVentilator) {
const found = processTask(JSON.parse(rawMessage.toString()))
if (found) {
console.log(`Found! => ${found}`)
await toSink.send(`Found: $found`)
}
}
}
main().catch(err => console.error(err))
worker.js
创建了两个 socket。PULL socket 负责连接到任务发布方(Ventilator),接收任务;PUSH socket 负责连接到结果收集方(sink),传递任务执行的结果。
实现 results collector
collector.js:
import zmq from 'zeromq'
async function main() {
const sink = new zmq.Pull()
await sink.bind('tcp://*:5017')
for await (const rawMessage of sink) {
console.log('Message from worker: ', rawMessage.toString())
}
}
main().catch(err => console.error(err))
运行以下命令测试结果:
node worker.js
node worker.js
node collector.js
node producer.js 4 f8e966d1e207d02c44511a58dccff2f5429e9a3b
AMQP 实现 pipeline 和 competing consumers
像前面那样在点对点的模式下,实现 pipeline 是非常直观的。假设我们需要借助 AMQP 这类系统实现任务分配模式,就必须确保每条消息都只会被一个消费者接收到。
可以直接将任务发布到目标 queue,不经过 exchange。避免了 exchange 有可能绑定了多个 queue 的情况。之后,多个消费者同时监听这一个 queue,消息即会以 fanout 的方式均匀地分发给所有的消费者。
hashsum 破解器的 AMQP 实现
producer-amqp.js:
import amqp from 'amqplib'
import { generateTasks } from './generateTasks.js'
const ALPHABET = 'abcdefghijklmnopqrstuvwxyz'
const BATCH_SIZE = 10000
const [, , maxLength, searchHash] = process.argv
async function main() {
const connection = await amqp.connect('amqp://localhost')
const channel = await connection.createConfirmChannel()
await channel.assertQueue('tasks_queue')
const generatorObj = generateTasks(searchHash, ALPHABET,
maxLength, BATCH_SIZE)
for (const task of generatorObj) {
channel.sendToQueue('tasks_queue', Buffer.from(JSON.stringify(task)))
}
await channel.waitForConfirms()
channel.close()
connection.close()
}
main().catch(err => console.error(err))
- 此处创建的是一个
confirmChannel
,它提供了一个waitForConfirms()
函数,可以在 broker 确认收到消息前等待,确保应用不会过早地关闭到 broker 的连接 channel.sendToQueue()
负责将一条消息直接发送给某个 queue,跳过任何 exchange 或者路由
worker-amqp.js:
import amqp from 'amqplib'
import { processTask } from './processTask.js'
async function main() {
const connection = await amqp.connect('amqp://localhost')
const channel = await connection.createChannel()
const { queue } = await channel.assertQueue('tasks_queue')
channel.consume(queue, async (rawMessage) => {
const found = processTask(
JSON.parse(rawMessage.content.toString()))
if (found) {
console.log(`Found! => ${found}`)
await channel.sendToQueue('results_queue',
Buffer.from(`Found: ${found}`))
}
await channel.ack(rawMessage)
})
}
main().catch(err => console.error(err))
collector-amqp.js:
import amqp from 'amqplib'
async function main() {
const connection = await amqp.connect('amqp://localhost')
const channel = await connection.createChannel()
const { queue } = await channel.assertQueue('results_queue')
channel.consume(queue, msg => {
console.log(`Message from worker: ${msg.content.toString()}`)
})
}
main().catch(err => console.error(err))
运行如下命令测试效果:
node worker-amqp.js
node worker-amqp.js
node collector-amqp.js
node producer-amqp.js 4 f8e966d1e207d02c44511a58dccff2f5429e9a3b
通过 Redis Streams 实现任务分发
Redis Stream 可以借助一种叫做 consumer groups 的特性实现任务分发模式。Consumer group 是一个有状态的实体,由一组名称标识的消费者组成,组中的消费者会以 round-robin 的方式接收记录。
每条记录都必须被显式地确认,否则该记录会一直处于 pending 状态。每个消费者都只能访问它自己的 pending 记录,假如消费者突然崩溃,在其回到线上后会先尝试获取其 pending 的记录。
Consumer group 也会记录其读取的上一条消息的 ID,因而在连续的读取操作中,consumer group 知道下一条要读取的记录时是哪个。
producer-redis.js:
import Redis from 'ioredis'
import { generateTasks } from './generateTasks.js'
const ALPHABET = 'abcdefghijklmnopqrstuvwxyz'
const BATCH_SIZE = 10000
const redisClient = new Redis()
const [, , maxLength, searchHash] = process.argv
async function main() {
const generatorObj = generateTasks(searchHash, ALPHABET,
maxLength, BATCH_SIZE)
for (const task of generatorObj) {
await redisClient.xadd('tasks_stream', '*',
'task', JSON.stringify(task))
}
redisClient.disconnect()
}
main().catch(err => console.error(err))
worker-redis.js:
import Redis from 'ioredis'
import { processTask } from './processTask.js'
const redisClient = new Redis()
const [, , consumerName] = process.argv
async function main() {
await redisClient.xgroup('CREATE', 'tasks_stream',
'workers_group', '$', 'MKSTREAM')
.catch(() => console.log('Consumer group already exists'))
const [[, records]] = await redisClient.xreadgroup(
'GROUP', 'workers_group', consumerName, 'STREAMS',
'tasks_stream', '0')
for (const [recordId, [, rawTask]] of records) {
await processAndAck(recordId, rawTask)
}
while (true) {
const [[, records]] = await redisClient.xreadgroup(
'GROUP', 'workers_group', consumerName, 'BLOCK', '0',
'COUNT', '1', 'STREAMS', 'tasks_stream', '>')
for (const [recordId, [, rawTask]] of records) {
await processAndAck(recordId, rawTask)
}
}
}
async function processAndAck(recordId, rawTask) {
const found = processTask(JSON.parse(rawTask))
if (found) {
console.log(`Found! => ${found}`)
await redisClient.xadd('results_stream', '*', 'result',
`Found: ${found}`)
}
await redisClient.xack('tasks_stream', 'workers_group', recordId)
}
main().catch(err => console.error(err))
xgroup
命令用来确保 consumer group 存在。CREATE
表示我们希望创建一个 consumer grouptasks_stream
表示我们想要读取的 stream 的名字workers_group
是 consumer group 的名字- 第四个参数表示 consumer group 开始读取的记录的位置。
$
表示当前 stream 中最后一条记录的 ID MKSTREAM
表示如果 stream 不存在则创建它
- 通过
xreadgroup
命令读取属于当前 consumer 的所有 pending 的记录。'GROUP'
、'workers_group'
、consumerName
用来指代 consumer group 和 consumer 的名字STREAMS
和tasks_stream
用来指代我们想要读取的 stream 的名字0
用来表示我们想要开始读取的记录的位置。这里表示从属于当前 consumer 的第一条记录开始,读取所有 pending 的消息
- 通过另外一条
xreadgroup
命令读取 stream 里新增加的记录。'BLOCK'
和'0'
两个参数表示如果没有新的消息,就一直阻塞等待。'0'
具体表示一直等待永不超时'COUNT'
和'1'
表示一次请求只获取一条记录- 特殊 ID
>
表示只获取还没有被当前的 consumer group 处理过的消息
processAndAck()
函数负责当xreadgroup()
返回的记录被处理完成时,调用xack
命令进行确认,将该记录从当前 consumer 的 pending 列表里移除
collector-redis.js:
import Redis from 'ioredis'
const redisClient = new Redis()
async function main() {
let lastRecordId = '$'
while (true) {
const data = await redisClient.xread(
'BLOCK', '0', 'STREAMS', 'results_stream', lastRecordId)
for (const [, logs] of data) {
for (const [recordId, [, message]] of logs) {
console.log(`Message from worker: ${message}`)
lastRecordId = recordId
}
}
}
}
main().catch(err => console.error(err))
运行程序测试效果:
node worker-redis.js workerA
node worker-redis.js workerB
node collector-redis.js
node producer-redis.js 4 f8e966d1e207d02c44511a58dccff2f5429e9a3b
Recommend
-
9
为什么需要分布式事务 我们知道ACID(原子性Atomicity、一致性Consistency、隔离性Isolation、持久性Durability)定义了单个数据库操作的事务性,这样我们就能放心的使用数据库,而不用担心数据的一致性,操作的原子性等等。由于数据库同时可以并发的...
-
39
高性能消息中间件 NSQ 解析-应用实践
-
7
桥接(Bridge)模式是指将抽象部分与实现部分相分离,使它们都可以独立的发生变化。 一、桥接模式介绍# ...
-
6
享元(Flyweight)模式:顾名思义就是被共享的单元。意图是复用对象,节省内存,提升系统的访问效率。比如在红白机冒险岛游戏中的背景花、草、树木等对象,实际上是可以多次被不同场景所复用共享,也是为什么以前的游戏占用那么小的内存,却让...
-
9
备忘录模式(Memento Design Pattern),也叫快照(Snapshot)模式。指在不违背封装原则前提下,捕获一个对象的内部状态,并在该对象之外保存这个状态,以便之后恢复对象为先前的状态。 备忘录模式在日常中很常见,比如Word中的回退,MySQL中的und...
-
5
访问者模式(Visitor Pattern)指将作用域某种数据结构中的各元素的操作分离出来封装成独立的类,使其在不改变数据结构的前提下可以添加作用于这些元素的新的操作。借用《Java设计模式》中的例子说明:在医院医生开具药单后,划价人员拿到药单后会根据药单上的...
-
6
消息队列中间件学习笔记 2016-06-02 工具
-
4
消息中间件系列介绍-传输与消费模式 作者:移动Labs 2022-10-28 13:33:05 开发 虽然Push模式在语义上更符合事件驱动架构风格,但在当...
-
4
计算存储分离在京东云消息中间件JCQ上的应用 精选 原创 京东云官方 2022-12-19 11...
-
10
主要有两类技术可以用来整合分布式应用:一类是通过共享存储作为一个中心化的协调者,跟踪和保存所有需要共享的信息;另一类则是通过消息中间件,向系统中的所有节点散布数据、事件和命令等。消息存在于软件系统的各个层级。我们通过互联网交换消息完成通信;...
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK