9

Golang 实现 RabbitMQ 的延迟队列

 2 years ago
source link: https://segmentfault.com/a/1190000041375408
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

读本文之前,你应该已经了解 RabbitMQ 的一些概念,如队列、交换机之类。

延迟队列简介

一个队列中的消息在延迟一段时间后才被消费者消费,这样的队列可以称之为延迟队列。

延迟队列的应用场景十分广泛,如:下单后30分钟内未付款则取消订单;在某个时间下发一条通知等。

通过死信实现延迟队列

通过Golang 实现 RabbitMQ 的死信队列的介绍,我们可以很容易的实现一个延迟队列。

  1. 将正常队列的消费者取消;
  2. 发消息时设置TTL;

通过上面两点,正常队列的消息始终不会被消费,而是等待消息TTL到期,进入死信队列,让死信消费者进行消费,从而达到延迟队列的效果。

上面看上去似乎没什么问题,实测一下就会发现消息不会“如期死亡”

当先生产一个TTL为60s的消息,再生产一个TTL为5s的消息,第二个消息并不会再5s后过期进入死信队列,而是需要等到第一个消息TTL到期后,与第一个消息一同进入死信队列。这是因为RabbitMQ 只会判断队列中的第一个消息是否过期。

通过插件实现延迟队列

对于上文的问题,自然有解决方法,那就是通过 RabbitMQ 的 rabbitmq_delayed_message_exchange 插件来解决。本文不赘述 RabbitMQ和插件的安装,你可以参考此文安装或使用Docker来安装。

此插件的原理是将消息在交换机处暂存储在mnesia(一个分布式数据系统)表中,延迟投递到队列中,等到消息到期再投递到队列当中。

简单了解了插件的原理,我们便可以如此设计延迟队列。

生产者实现的关键点:

1.在声明交换机时不在是direct类型,而是x-delayed-message类型,这是由插件提供的类型;

2.交换机要增加"x-delayed-type": "direct"参数设置;

3.发布消息时,要在 Headers 中设置x-delay参数,来控制消息从交换机过期时间;

err = mqCh.Publish(constant.Exchange1, constant.RoutingKey1, false, false, amqp.Publishing{
    ContentType: "text/plain",
    Body:        []byte(message),
    //Expiration: "10000", // 消息过期时间(消息级别),毫秒
    Headers: map[string]interface{}{
        "x-delay": "5000", // 消息从交换机过期时间,毫秒(x-dead-message插件提供)
    },
})

生产者完整代码:

// producter.go
package main

import (
    "fmt"
    "github.com/streadway/amqp"
    "learn_gin/go/rabbitmq/delayletter/constant"
    "learn_gin/go/rabbitmq/util"
    "strconv"
    "time"
)

func main() {
    // # ========== 1.创建连接 ==========
    mq := util.NewRabbitMQ()
    defer mq.Close()
    mqCh := mq.Channel

    // # ========== 2.设置队列(队列、交换机、绑定) ==========
    // 声明队列
    var err error
    _, err = mqCh.QueueDeclare(constant.Queue1, true, false, false, false, amqp.Table{
        // "x-message-ttl": 60000, // 消息过期时间(队列级别),毫秒
    })
    util.FailOnError(err, "创建队列失败")

    // 声明交换机
    //err = mqCh.ExchangeDeclare(Exchange1, amqp.ExchangeDirect, true, false, false, false, nil)
    err = mqCh.ExchangeDeclare(constant.Exchange1, "x-delayed-message", true, false, false, false, amqp.Table{
        "x-delayed-type": "direct", 
    })
    util.FailOnError(err, "创建交换机失败")

    // 队列绑定(将队列、routing-key、交换机三者绑定到一起)
    err = mqCh.QueueBind(constant.Queue1, constant.RoutingKey1, constant.Exchange1, false, nil)
    util.FailOnError(err, "队列、交换机、routing-key 绑定失败")

    // # ========== 4.发布消息 ==========
    message := "msg" + strconv.Itoa(int(time.Now().Unix()))
    fmt.Println(message)
    // 发布消息
    err = mqCh.Publish(constant.Exchange1, constant.RoutingKey1, false, false, amqp.Publishing{
        ContentType: "text/plain",
        Body:        []byte(message),
        //Expiration: "10000", // 消息过期时间(消息级别),毫秒
        Headers: map[string]interface{}{
            "x-delay": "5000", // 消息从交换机过期时间,毫秒(x-dead-message插件提供)
        },
    })
    util.FailOnError(err, "消息发布失败")
}

由于在生产者端建立队列和交换机,所以消费者并不需要特殊的设置,直接附代码。

消费者完整代码:

// consumer.go
package main

import (
    "learn_gin/go/rabbitmq/delayletter/constant"
    "learn_gin/go/rabbitmq/util"
    "log"
)

func main() {
    // # ========== 1.创建连接 ==========
    mq := util.NewRabbitMQ()
    defer mq.Close()
    mqCh := mq.Channel

    // # ========== 2.消费消息 ==========
    msgsCh, err := mqCh.Consume(constant.Queue1, "", false, false, false, false, nil)
    util.FailOnError(err, "消费队列失败")

    forever := make(chan bool)
    go func() {
        for d := range msgsCh {
            // 要实现的逻辑
            log.Printf("接收的消息: %s", d.Body)

            // 手动应答
            d.Ack(false)
            //d.Reject(true)
        }
    }()
    log.Printf("[*] Waiting for message, To exit press CTRL+C")
    <-forever
}

源码Mr-houzi/go-demo


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK