8

GitHub - morris/mongomq2: A general-purpose message and event queuing library fo...

 3 years ago
source link: https://github.com/morris/mongomq2
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

MongoMQ2

MongoMQ2 is a light-weight Node.js library that turns MongoDB collections into general-purpose message queues or event logs, without additional server components.

At a slight expense of throughput compared to specialized message queues and brokers like SQS, SNS, RabbitMQ or Kafka, you get:

  • Persistent message/event logs in MongoDB collections.
  • Real-time, fan-out, at-most-once delivery to subscribers.
  • Isolated, acknowledged, at-least-once delivery to queue consumers.
    • Effectively exactly-once if consumer workloads are idempotent.
  • All the capabilities of regular MongoDB collections, e.g.
    • search indexes,
    • unique indexes for message/event deduplication,
    • aggregations,
    • capped collections,
    • sharding,
    • and TTL indexes.
  • No chaining of queues required because subscribers and consumers can read from the same queue.

There's more:

  • Configurable number of retries
  • Configurable visibility timeouts
  • Configurable visibility delays
  • Multiple isolated consumer groups on one queue
  • Batch publishing of messages/events

MongoMQ2 can be an effective and flexible building block for message- and event-driven architectures, especially if you're already on MongoDB and don't want to introduce additional system components.

Installation

npm install mongomq2 mongodb

Quick Start

import { MongoClient } from "mongodb";
import { Consumer, Publisher, Subscriber } from "mongomq2";

const mongoClient = new MongoClient("mongodb://localhost:27017");
await mongoClient.connect();

interface MyMessage {
  _id?: ObjectId;
  type: "hello" | "world";
}

const messagesCollection = mongoClient.db().collection<MyMessage>("messages");

// Subscribe to (future) messages of type "hello"
const subscriber = new Subscriber(messagesCollection);

subscriber.subscribe((message) => console.log("Received a hello!"), {
  filter: { type: "hello" },
});

// Consume messages (even past ones) of type "world"
const consumer = new Consumer(
  messagesCollection,
  (message) => console.log("Saved a world!"),
  { filter: { type: "world" } }
);

consumer.start();

// Publish some messages
const publisher = new Publisher(messagesCollection);

await publisher.publish({ type: "hello" });
await publisher.publish({ type: "world" });

// > Received a hello! (per active subscriber)
// > Saved a world! (consumed exactly once by one consumer)

Synopsis

Publisher

const publisher = new Publisher(collection);

await publisher.publish({ type: "hello" });
  • Publishes the given message to the database immediately.
  • Message insertion is acknowledged, or an error is thrown.

Use Cases

  • Critical messages and events
  • Job ingestion
  • Commands

BatchPublisher

const publisher = new BatchPublisher(collection);

publisher.publish({ type: "hello" });
  • Queues the given message for publication in memory.
  • Bulk inserts batched messages after a configurable delay.
  • By default publishes messages with best effort (majority write concern, retries)
  • Can be set to "fire & forget" mode by passing bestEffort: false (no write concern, no retries)

Use Cases

  • Uncritical messages
  • Uncritical notifications

Subscriber

const subscriber = new Subscriber(collection, {
  filter: {
    /* optional global filter applied on change stream */
  },
});

subscriber.subscribe((message) => console.log(message), {
  filter: {
    /* optional local filter applied in memory */
  },
});
  • Subscribes to matching messages in the future.
  • All active subscribers will receive all future matching messages.
  • Messages are delivered at most once.
  • Messages are delivered in database insertion order.
  • Past messages are ignored.
  • Each Subscriber instance creates one MongoDB change stream.
    • Change streams occupy one connection,
    • so you'll usually want only one Subscriber instance,
    • and multiple .subscribe(...) calls with local filters.

Use Cases

  • Real-time notifications
  • Cache invalidation

Consumer

const consumer = new Consumer(collection, (message) => console.log(message), {
  // consumer group identifier, defaults to collection name
  group: "myConsumerGroup",
  filter: {
    /* optional filter */
  },
});

consumer.start();
  • Consumes future and past matching messages.
  • Order of message consumption is not guaranteed.
  • Per group, each matching message is consumed by at most one consumer.
  • Messages are consumed at-least-once per group.
  • Configurable visibility timeout, visibility delay, maximum number of retries, etc.

Use Cases

  • Message queues
  • Job queues
  • Event processing
  • Command processing

Notes

  • All MongoMQ2 clients are EventEmitters.
  • Always attach .on('error', (err) => /* report error */) to monitor errors.
    • err.mq2 will contain the message being processed, if any.
  • Always .close() MongoMQ2 clients on shutdown (before closing the MongoClient).
    • MongoMQ2 will try to finish open tasks with best effort.
  • MongoDB change streams are only supported for MongoDB replica set.
    • To start a one-node replica set locally e.g. for testing, see docker-compose.yml

Recommend

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK