0

三分钟学会在 RabbitMQ 中实现发布订阅模式

 1 month ago
source link: https://www.51cto.com/article/784791.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

在这个充满挑战和收获的60天学习之旅中,你将迅速提升成为一名全栈工程师。专注于Spring Boot框架,我们将深入研究高级特性,从项目初始化到微服务架构,再到性能优化和持续集成部署。无论你是初学者还是有一定经验的开发者,这个专题都将带你穿越从零到全面掌握Spring Boot的学习曲线。

Day 32 ~ Springboot3.1.x|3分钟学会在 RabbitMQ 中实现发布订阅模式

实现发布与订阅消息模式

发布-订阅模式是一种消息传递方式,其中发送者(发布者)不会将消息直接发送到特定的接收者(订阅者)。发布者类别定义了哪些订阅者因为订阅者匹配了发布者的类别而接收消息。

以下是使用RabbitMQ实现发布-订阅模式的一种例子,我们将使用RabbitMQ的Fanout Exchange。

Producer

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;

public class EmitLog {

  private static final String EXCHANGE_NAME = "logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        String message = "Log message...";

        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));
        System.out.println("Sent '" + message + "'");
    }
  }
}

在上述代码的channel.exchangeDeclare(EXCHANGE_NAME, "fanout"),我们声明一个名为"log"的exchange,同时我们定义其类型为"fanout",意味着它会将接收到的所有消息广播给所有它所知道的队列。

Consumer

每一个订阅者都需要拥有一个queue,因此,我们需要在客户端中创建queue。

import com.rabbitmq.client.*;
import java.io.IOException;

public class ReceiveLogs {

  private static final String EXCHANGE_NAME = "logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, EXCHANGE_NAME, "");

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
        System.out.println("Received '" + message + "'");
    };
    channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
  }
}

在这个例子中,我们声明一个新的queue,并将其与"logs"的exchange绑定。然后我们定义了消息的接收以及处理方式。

处理消息发送失败的情况

在使用消息中间件的过程中,消息发送失败是无法避免的情况。因此,我们需要对此进行正确的处理以避免因此而导致的系统问题。

对于消息发送失败的处理,有以下几种常用的方案:

  • 重试: 对于有些暂时的问题,比如网络波动,可以通过简单的重试来解决。
  • 消息持久化:将消息存储在某处(例如数据库),只有当消息成功发送后,再删除它。
  • 死信队列:把无法处理的消息放入"死信队列",然后由专门的消费者来进行处理。

RabbitMQ中的消息确认(publisher confirms)和消费者应答(Consumer Acknowledgements)就是为了解决此类问题。

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection()) {
    Channel channel = connection.createChannel();

    String queueName = "test";
    String message = "Hello world";
    try {
        channel.queueDeclare(queueName, false, false, false, null);
        channel.confirmSelect();

        channel.basicPublish("", queueName, null, message.getBytes());
        if (!channel.waitForConfirms()) {
            System.out.println("消息发送失败");
        }
    } catch (Exception e) {
        System.out.println("错误: " + e.getMessage());
    }
}

上述代码中执行channel.confirmSelect();后,当前channel被设置为publisher confirm模式。在此模式下,当消息被RabbitMQ成功接收后,会发送一个确认给生产者。如果RabbitMQ没有发送确认,那么生产者可以认定该消息发送失败。

结论:掌握发布-订阅模式和消息发送失败处理策略,对于掌握消息队列的使用至关重要,可为系统的稳定性和扩展性提供保障。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK