4

RabbitMQ由浅入深入门全总结(二)

 2 years ago
source link: https://segmentfault.com/a/1190000040194135
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

写在最前面

距离上一次发文章已经很久了,其实这段时间一直也没有停笔,只不过在忙着找工作还有学校结课的事情,重新弄了一下博客,后面也会陆陆续续会把文章最近更新出来~

  • 这篇文章有点长,就分了两篇
  • PS:那个Github上Java知识问答的文章也没有停笔,最近也会陆续更新

6. 进阶补充

6.1 过期时间设置(TTL)

过期时间(TTL)就是对消息或者队列设置一个时效,只有在时间范围内才可以被被消费者接收获取,超过过期时间后消息将自动被删除。

注:我们主要讲消息过期,在消息过期的第一种方式中,顺便也就会提到队列过期的设置方式

  1. 通过队列属性设置,队列中所有消息都有相同的过期时间
  2. 对消息进行单独设置,每条消息 TTL可以不同

两种方法同时被使用时,以两者过期时间 TTL 较小的那个数值为准。消息在队列的生存时间一旦超过设置的 TTL 值,就称为 Dead Message 被投递到死信队列,消费者将无法再收到该消息(死信队列是我们下一点要讲的)

6.1.1 应用于全部消息的过期时间

@Configuration
public class RabbitMqConfiguration {

    public static final String TOPIC_EXCHANGE = "topic_order_exchange";
    public static final String TOPIC_QUEUE_NAME_1 = "test_topic_queue_1";
    public static final String TOPIC_ROUTINGKEY_1 = "test.*";

    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(TOPIC_EXCHANGE);
    }

    @Bean
    public Queue topicQueue1() {
        // 创建参数 Map 容器
        Map<String, Object> args = new HashMap<>();
        // 设置消息过期时间 注意此处是数值 5000 不是字符串
        args.put("x-message-ttl", 5000);
        // 设置队列过期时间
        args.put("x-expires", 8000);
        // 在最后传入额外参数 即这些过期信息
        return new Queue(TOPIC_QUEUE_NAME_1, true, false, false, args);
    }

    @Bean
    public Binding bindingTopic1() {
        return BindingBuilder.bind(topicQueue1())
                .to(topicExchange())
                .with(TOPIC_ROUTINGKEY_1);
    }
}
  1. 创建参数 Map 容器:类型是在 Queue 参数中所要求的,要按照要求来。
  2. 设置消息过期时间:这里设置的消息过期时间,会应用到所有消息中。
  3. 设置队列过期时间
  4. 传入额外参数:将上述配置好的过期时间设置,通过 Queue 传入即可。
@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class RabbitMqTest {
    /**
     * 注入 RabbitTemplate
     */
    @Autowired

    @Test
    public void testTopicSendMessage() {
        rabbitTemplate.convertAndSend(RabbitMqConfiguration.TOPIC_EXCHANGE, "test.order.insert", "This is a message !");
    }
}

不要配置消费者,然后就可以在 Web 管理器中看到效果了

6.1.2 应用于单独消息的过期时间

  • 配置中保持最初的样子就行了,就不需要配置过期时间了
  • 生产者中配置消息单独的过期时间
@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class RabbitMqTest {
    /**
     * 注入 RabbitTemplate
     */
    @Autowired

    @Test
    public void testTopicSendMessage2() {
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor(){
            public Message postProcessMessage(Message message){
                // 注意此处是 字符串 “5000”
                message.getMessageProperties().setExpiration("5000");
                message.getMessageProperties().setContentEncoding("UTF-8");
                return message;
            }
        };
        rabbitTemplate.convertAndSend(RabbitMqConfiguration.TOPIC_EXCHANGE, "test.order",
                "This is a message 002 !",messagePostProcessor);
    }
}

6.2 死信队列

死信官方原文为 Dead letter ,它是RabbitMQ中的一种消息机制,当你在消费消息时,如果队列以及队列里的消息出现以下情况,说明当前消息就成为了 “死信”,如果配置了死信队列,这些数据就会传送到其中,如果没有配置就会直接丢弃。

  1. 消息被拒绝
  2. 队列达到最大长度

不过死信队列并不是什么很特殊的存在,我们只需要配置一个交换机,在消费的那个队列中配置,出现死信就重新发送到刚才配置的交换机中去,进而被路由到与交换机绑定的队列中去,这个队列也就是死信队列,所以从创建上看,它和普通的队列没什么区别。

6.2.1 应用场景

比如在一些比较重要的业务队列中,未被正确消费的消息,往往我们并不想丢弃,因为丢弃后如果想恢复这些数据,往往需要运维人员从日志获取到原消息,然后重新投递消息,而配置了死信队列,相当于给了未正确消费消息一个暂存的位置,日后需要恢复的时候,只需要编写对应的代码就可以了。

6.2.2 实现方式

  • 定义一个处理死信的交换机和队列
@Configuration
public class DeadRabbitMqConfiguration{

    @Bean
    public DirectExchange deadDirect(){
        return new DirectExchange("dead_direct_exchange");}

    @Bean
    public Queue deadQueue(){
        return new Queue("dead_direct_queue");}
    @Bean
    public Binding deadBinds(){
        return BindingBuilder.bind(deadQueue()).to(deadDirect()).with("dead");
    }
}
  • 在正常的消费队列中指定死信队列
@Configuration
public class RabbitMqConfiguration {

    public static final String TOPIC_EXCHANGE = "topic_order_exchange";
    public static final String TOPIC_QUEUE_NAME_1 = "test_topic_queue_1";
    public static final String TOPIC_ROUTINGKEY_1 = "test.*";

    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(TOPIC_EXCHANGE);
    }

    @Bean
    public Queue topicQueue1() {
        // 设置过期时间
        Map<String, Object> args = new HashMap<>();
        args.put("x-message-ttl", 5000);
        // 设置死信队列交换器
        args.put("x-dead-letter-exchange","dead_direct_exchange");
        // 设置交换路由的路由key fanout 模式不需要配置此条
        args.put("x-dead-letter-routing-key","dead");
        return new Queue(TOPIC_QUEUE_NAME_1, true, false, false, args);
    }

    @Bean
    public Binding bindingTopic1() {
        return BindingBuilder.bind(topicQueue1())
                .to(topicExchange())
                .with(TOPIC_ROUTINGKEY_1);
    }
}

6.3 内存及磁盘监控

6.3.1 内存告警及控制

为了防止避免服务器因内存不够而崩溃,所以 RabbitMQ 设定了一个阈值,当内存使用量超过阈值的时候,RabbitMQ 会暂时阻塞所有客户端的连接,并且停止继续接受新消息。

有两种方式可以修改这个阈值

  1. 通过命令(二选一即可)

    • 命令的方式会在 Broker 重启后失效
# 通过百分比设置的命令 <fraction> 处代表百分比小数例如 0.6
rabbitmqctl set_vm_memory_high_watermark <fraction>
# 通过绝对值设置的命令 <value> 处代表设置的一个固定值例如 700MB
rabbitmqctl set_vm_memory_high_watermark absolute <value>
  1. 通过修改配置文件 rabbitmq.conf

    • 配置文件每次启动都会加载,属于永久有效
# 百分比设置 默认值为 0.4 推荐 0.4-0.7 之间
vm_memory_high_watermark.relative = 0.5
# 固定值设置
vm_memory_high_watermark.absolute = 2GB

6.3.2 内存换页

在客户端连接和生产者被阻塞之前,它会尝试将队列中的消息换页到磁盘中,这种思想在操作系统中其实非常常见,以最大程度的满足消息的正常处理。

当内存换页发生后,无论持久化还是非持久化的消息,都会被转移到磁盘,而由于持久化的消息本来就在磁盘中有一个持久化的副本,所以会优先移除持久化的消息。

默认情况下,当内存达到阈值的 50 % 的时候,就会进行换页处理。

可以通过设置 vm_memory_high_watermark_paging_ratio 修改

# 值小于 1, 如果大于 1 就没有意义了
vm_memory_high_watermark_paging_ratio = 0.6

6.3.3 磁盘预警

如果无止境的换页,也很有可能会导致耗尽磁盘空间导致服务器崩溃,所以 RabbitMQ 又提供了一个磁盘预警的阈值,当低于这个值的时候就会进行报警,默认是 50MB,可以通过命令的方式修改

# 固定值
rabbitmqctl set_disk_free_limit <disk_limit>
# 百分数
rabbitmqctl set_disk_free_limit memory_limit <fraction>

6.4 消息的可靠传递

生产者向 RabbitMQ 中发送消息的时候,可能会因为网络等种种原因导致发送失败,所以 RabbitMQ 提供了一系列保证消息可靠传递的机制,可以大致分为生产者和消费者两部分的处理

6.4.1 生产者中的机制

生产者作为消息的发送者,需要保证自己的消息发送成功,RabbitMQ 提供了两种方式来保证这一点。-

  1. confirm 确认模式
  2. return 退回模式

6.4.1.1 confirm 确认模式

生产者发送消息后,会异步等待接收一个 ack 应答,收到返回的 ack 确认消息后,根据 ack是 true 还是 false,调用 confirmCallback 接口进行处理

spring:
  rabbitmq:
    # 发送确认
    publisher-confirm-type: correlated
  • 实现 ConfirmCallback 接口的 confirm 方法
@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {

    /**
     * @param correlationData 相关配置信息
     * @param ack             exchange交换机 是否成功收到了消息。true 成功,false代表失败
     * @param cause           失败原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            //接收成功
            System.out.println("消息成功发送到交换机");
        } else {
            //接收失败
            System.out.println("消息发送到交换机失败,失败原因: " + cause);
            // TODO 可以处理失败的消息,例如再次发送等等
        }
    }
}
  • 声明队列和交换机
@Configuration
public class RabbitMqConfig {
    @Bean()
    public Queue confirmTestQueue() {
        return new Queue("confirm_test_queue", true, false, false);
    }

    @Bean()
    public FanoutExchange confirmTestExchange() {
        return new FanoutExchange("confirm_test_exchange");
    }

    @Bean
    public Binding confirmTestFanoutExchangeAndQueue() {
        return BindingBuilder.bind(confirmTestQueue()).to(confirmTestExchange());
    }
}
@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class RabbitMqTest {
    /**
     * 注入 RabbitTemplate
     */
    @Autowired
    
     /**
     * 注入 ConfirmCallbackService
     */
    @Autowired
    private ConfirmCallbackService confirmCallbackService;

    @Test
    public void testConfirm() {
        // 设置确认回调类
        rabbitTemplate.setConfirmCallback(confirmCallbackService);
        // 发送消息
        rabbitTemplate.convertAndSend("confirm_test_exchange", "", "ConfirmCallback !");
    }
}

6.4.1.2 return 退回模式

当 Exchange 发送到 Queue 失败时,会调用一个 returnsCallback,我们可以通过实现这个接口,然后来处理这种失败的情况。

  • 在配置文件中开启发送回调
spring:
  rabbitmq:
    # 发送回调
    publisher-returns: true
  • 实现 ReturnsCallback 的 returnedMessage 方法
//  public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) 已经属于过时方法了
@Component
public class ReturnCallbackService implements RabbitTemplate.ReturnsCallback {
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        System.out.println(returned);
    }
}
  • 声明队列和交换机(Direct 模式)
@Configuration
public class RabbitMqConfig {

    @Bean()
    public Queue returnsTestQueue() {
        return new Queue("return_test_queue", true, false, false);
    }

    @Bean()
    public DirectExchange returnsTestExchange() {
        return new DirectExchange("returns_test_exchange");
    }

    @Bean
    public Binding returnsTestDirectExchangeAndQueue() {
        return BindingBuilder.bind(returnsTestQueue()).to(returnsTestExchange()).with("info");
    }
}
@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class RabbitMqTest {
    /**
     * 注入 RabbitTemplate
     */
    @Autowired
    
    /**
     * 注入 ConfirmCallbackService
     */
    @Autowired
    private ConfirmCallbackService confirmCallbackService;
    
    /**
     * 注入 ReturnCallbackService
     */
    @Autowired
    private ReturnCallbackService returnCallbackService;

    @Test
    public void testReturn() {
        // 确保消息发送失败后可以重新返回到队列中
        rabbitTemplate.setMandatory(true);
        // 消息投递到队列失败回调处理
        rabbitTemplate.setReturnsCallback(returnCallbackService);
        // 消息投递确认模式
        rabbitTemplate.setConfirmCallback(confirmCallbackService);
        // 发送消息
        rabbitTemplate.convertAndSend("returns_test_exchange", "info", "ReturnsCallback !");
    }
}
  • 修改不同的路由key,即可测试出结果。

6.4.2 消费者中的机制

6.4.2.1 ack 确认机制

ack 表示收到消息的确认,默认是自动确认,但是它有三种类型

acknowledge-mode 选项介绍

  • auto:自动确认,为默认选项
  • manual:手动确认(按能力分配就需要设置为手动确认)
  • none:不确认,发送后自动丢弃

其中自动确认是指,当消息一旦被消费者接收到,则自动确认收到,并把这个消息从队列中删除。

但是在实际业务处理中,正确的接收到的消息可能会因为业务上的问题,导致消息没有正确的被处理,但是如果设置了 手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用 channel.basicNack()方法,让其自动重新发送消息。

spring:
  rabbitmq:
    listener:
      simple:
          # 手动确认
        acknowledge-mode: manual 
@Component
@RabbitListener(queues = "confirm_test_queue")
public class TestConsumer {

    @RabbitHandler
    public void processHandler(String msg, Channel channel, Message message) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println("消息内容: " + new String(message.getBody()));

            System.out.println("业务出错的位置:");
            int i = 66 / 0;
            
            // 手动签收 deliveryTag标识代表队列可以删除了
            channel.basicAck(deliveryTag, true);
        } catch (Exception e) {
            // 拒绝签收
            channel.basicNack(deliveryTag, true, true);
        }
    }
}

6.5 集群 & 6.6 分布式事务(待更新)

由于这两个点篇幅也不短,实在不愿草草简单写上了事,放到后面单独的文章编写,发布哇。

关于集群的搭建暂时可参考:https://blog.csdn.net/belongh...


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK