

GitHub - morris/mongomq2: A general-purpose message and event queuing library fo...
source link: https://github.com/morris/mongomq2
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

MongoMQ2
MongoMQ2 is a light-weight Node.js library that turns MongoDB collections into general-purpose message queues or event logs, without additional server components.
At a slight expense of throughput compared to specialized message queues and brokers like SQS, SNS, RabbitMQ or Kafka, you get:
- Persistent message/event logs in MongoDB collections.
- Real-time, fan-out, at-most-once delivery to subscribers.
- Isolated, acknowledged, at-least-once delivery to queue consumers.
- Effectively exactly-once if consumer workloads are idempotent.
- All the capabilities of regular MongoDB collections, e.g.
- search indexes,
- unique indexes for message/event deduplication,
- aggregations,
- capped collections,
- sharding,
- and TTL indexes.
- No chaining of queues required because subscribers and consumers can read from the same queue.
There's more:
- Configurable number of retries
- Configurable visibility timeouts
- Configurable visibility delays
- Multiple isolated consumer groups on one queue
- Batch publishing of messages/events
MongoMQ2 can be an effective and flexible building block for message- and event-driven architectures, especially if you're already on MongoDB and don't want to introduce additional system components.
Installation
npm install mongomq2 mongodb
Quick Start
import { MongoClient } from "mongodb"; import { Consumer, Publisher, Subscriber } from "mongomq2"; const mongoClient = new MongoClient("mongodb://localhost:27017"); await mongoClient.connect(); interface MyMessage { _id?: ObjectId; type: "hello" | "world"; } const messagesCollection = mongoClient.db().collection<MyMessage>("messages"); // Subscribe to (future) messages of type "hello" const subscriber = new Subscriber(messagesCollection); subscriber.subscribe((message) => console.log("Received a hello!"), { filter: { type: "hello" }, }); // Consume messages (even past ones) of type "world" const consumer = new Consumer( messagesCollection, (message) => console.log("Saved a world!"), { filter: { type: "world" } } ); consumer.start(); // Publish some messages const publisher = new Publisher(messagesCollection); await publisher.publish({ type: "hello" }); await publisher.publish({ type: "world" }); // > Received a hello! (per active subscriber) // > Saved a world! (consumed exactly once by one consumer)
Synopsis
Publisher
const publisher = new Publisher(collection); await publisher.publish({ type: "hello" });
- Publishes the given message to the database immediately.
- Message insertion is acknowledged, or an error is thrown.
Use Cases
- Critical messages and events
- Job ingestion
- Commands
BatchPublisher
const publisher = new BatchPublisher(collection); publisher.publish({ type: "hello" });
- Queues the given message for publication in memory.
- Bulk inserts batched messages after a configurable delay.
- By default publishes messages with best effort (
majority
write concern, retries) - Can be set to "fire & forget" mode by passing
bestEffort: false
(no write concern, no retries)
Use Cases
- Uncritical messages
- Uncritical notifications
Subscriber
const subscriber = new Subscriber(collection, { filter: { /* optional global filter applied on change stream */ }, }); subscriber.subscribe((message) => console.log(message), { filter: { /* optional local filter applied in memory */ }, });
- Subscribes to matching messages in the future.
- All active subscribers will receive all future matching messages.
- Messages are delivered at most once.
- Messages are delivered in database insertion order.
- Past messages are ignored.
- Each
Subscriber
instance creates one MongoDB change stream.- Change streams occupy one connection,
- so you'll usually want only one
Subscriber
instance, - and multiple
.subscribe(...)
calls with local filters.
Use Cases
- Real-time notifications
- Cache invalidation
Consumer
const consumer = new Consumer(collection, (message) => console.log(message), { // consumer group identifier, defaults to collection name group: "myConsumerGroup", filter: { /* optional filter */ }, }); consumer.start();
- Consumes future and past matching messages.
- Order of message consumption is not guaranteed.
- Per
group
, each matching message is consumed by at most one consumer. - Messages are consumed at-least-once per
group
. - Configurable visibility timeout, visibility delay, maximum number of retries, etc.
Use Cases
- Message queues
- Job queues
- Event processing
- Command processing
Notes
- All MongoMQ2 clients are
EventEmitters
. - Always attach
.on('error', (err) => /* report error */)
to monitor errors.err.mq2
will contain the message being processed, if any.
- Always
.close()
MongoMQ2 clients on shutdown (before closing the MongoClient).- MongoMQ2 will try to finish open tasks with best effort.
- MongoDB change streams are only supported for MongoDB replica set.
- To start a one-node replica set locally e.g. for testing, see
docker-compose.yml
- To start a one-node replica set locally e.g. for testing, see
Recommend
-
295
Chroma — A general purpose syntax highlighter in pure Go NOTE: As Chroma has just been released, its API is still in flux. That said, the high-level interface should not change significantly.
-
101
README.md A Crash Course to Radamsa Radamsa is a test case generator for robustness testing, a.k.a. a fuzzer. It is typically used to test how well a program can withstand malformed and potenti...
-
63
A simple, light-weight and modern task runner for general purpose. - zaaack/foy
-
50
README.md Joe Bot A general-purpose bot library inspired by Hubot but written in Go.
-
11
Artiry/globol: General-purpose time library for Javascript/Typescript README.md ...
-
47
An Ultimate Guide to Message Queuing Telemetry Transport (MQTT) Message Queuing Telemetry Transport (MQTT) is a lightweight open messaging protocol. IoT trends provide resource-constrained network clients with a simple...
-
7
Files Permalink Latest commit message Commit time
-
10
Light Language
-
8
<?xml encoding="utf-8" ??>Introduction Message queuing is a micro-service architecture that allows you to move data between different applications for further processing. In this model, end-user...
-
8
Message Queuing Service
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK