11

Rabbitmq实战golang实现

 3 years ago
source link: https://driverzhang.github.io/post/rabbitmq%E5%AE%9E%E6%88%98golang%E5%AE%9E%E7%8E%B0/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

1. Rabbitmq 架构及原理

消息队列,又叫做消息中间件。是指用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息队列模型,可以在分布式环境下扩展进程的通信(维基百科)

MQ 的作用:

  1. 实现异步通信

MQ 带来的问题:

  1. 引入消息队列带来的延迟问题
  2. 增加了系统的复杂度
  3. 可能产生数据不一致的问题

消息丢失和消息重复消费的问题。一旦消息没有被正确地消费,就会带来数据不一致性的问题。

RabbitMQ 是一个流行的开源消息队列系统,是AMQP(高级消息队列协议)标准的实现。

关于AMQP 协议具体文档参考 https://www.amqp.org/sites/amqp.org/files/amqp.pdf

由以高性能、健壮、可伸缩性出名的Erlang语言开发,并继承了这些优点。rabbitmq简单架构如下:

架构示意图
  • Broker(代理/中介): RabbitMQ 用于收发消息的服务,默认是 5672 的端口。

  • Virtual Host(vhost):虚拟主机;标识一批交换机、消息队列和相关对象。

虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。vhost是AMQP概念的基础,必须在链接时指定,RabbitMQ默认的vhost是 /。

  • Exchange:交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。

  • Queue:消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。

  • Banding:绑定,用于消息队列和交换机之间的关联。

一个绑定就是基于路由键将交换机和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。

  • Channel:信道,多路复用连接中的一条独立的双向数据流通道。

直接创建和释放 TCP 长连接的话,对于 Broker 来说肯定会造成很大的性能损耗,因为 TCP 连接是非常宝贵的资源,创建和释放也要消耗时间。所以在 AMQP 里面引入了 Channel 的概念,它是一个虚拟的连接

  • Connection:无论是生产者发送消息,还是消费者接收消息,都必须要跟 Broker 之间建立一个连接,这个连接是一个 TCP 的长连接。

2. RabbitMQ的六种工作模式:

https://www.rabbitmq.com/getstarted.html 官方网站有详细示意图

  1. simple简单模式

python-one-overall.png

  1. work工作模式

python-two.png

  1. Publish/Subscribe 发布订阅模式(fanout)

bindings.png

  1. Routing 路由模式 (direct)
direct-exchange.png
  1. Topics 主题模式(路由模式的一种)(topic)
python-five.png
python-six.png
  1. Publisher Confirms 发布确认

具体demo参考如下地址:

go操作RabbitMQ


3. 延迟队列实现(基于死信队列转发)

3.1 消息过期时间:

有两种设置方式:

  1. 通过队列属性设置消息过期时间,所有队列中的消息超过时间未被消费时,都会过期。
_, err := r.channel.QueueDeclare(
		queueName,
		true,
		false,
		false, // 队列解锁
		false,
		amqp.Table{
			"x-message-ttl": 4000, // 在队列中声明ttl 超时时间 单位为毫秒级,类型为int
		},
	)
  1. 设置单条消息的过期时间,在发送消息的时候指定消息属性(推荐使用消息超时)。
expiration := "4000" // 4S 4000MS
err = r.channel.Publish(
		"",
		queueName,
		false,
		false,
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(message),
			Expiration:  expiration, // push 时 在消息本体上设置expiration超时时间,单位为毫秒级别 类型为 string
		})

3.2 死信队列:

消息在某些情况下会变成死信(Dead Letter)

队列在创建的时候可以指定一个死信交换机 DLX(Dead Letter Exchange)。

死信交换机绑定的队列被称为死信队列 DLQ(Dead Letter Queue),DLX 实际上也是普通的交换机,DLQ 也是普通的队列。

三种情况会让消息变成死信:

  • 消息被消费者拒绝并且未设置重回队列:(NACK || Reject ) && requeue== false
  • 队列达到最大长度,超过了 Max length(消息数)或者 Max length bytes(字节数),最先入队的消息会被发送到 DLX。 > 关于这个队里的默认长度,官方没有给出,网上找了下有说是没有设置就动态增长不限。也就是根据你机器的配置情况了。

死信队列声明如下:

_, err := r.channel.QueueDeclare(
		queueName, // 这里就是将一个队列声明为如下死信交换机的死信队列
		true,
		false,
		false, 
		false,
		amqp.Table{
			"x-dead-letter-exchange":    dlxExchange, // 声明当前队列绑定的 死信交换机
		},
	)

3.3 延迟队列demo:

延迟队列原理示意图

pusher:

func (r *RabbitMQ) PublishDelayQueue(queue, message, dlxExchange, routing, expiration string) error {
	defer r.CloseMq()
	queueName := queue + "_delay"
	_, err := r.channel.QueueDeclare(
		queueName,
		true,
		false,
		false, // 队列解锁
		false,
		amqp.Table{
			"x-dead-letter-exchange":    dlxExchange, // 声明当前队列绑定的 死信交换机
			"x-dead-letter-routing-key": routing,     // routing 模式路由名
		},
	)
	if err != nil {
		return err
	}

	// 注入消息 注册路由 routingKey
	err = r.channel.Publish(
		"",
		queueName,
		false,
		false,
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(message),
			Expiration:  expiration,
		})
	if err != nil {
		return err
	}

	fmt.Printf("push messag %s\n", message)
	return nil
}

Consumer:

func (r *RabbitMQ) ConsumeDelayQueue(queueName, dlxExchange, routing string, f func(interface{})) error {
	defer r.CloseMq()
	err := r.channel.ExchangeDeclare(
		dlxExchange,
		RoutingKind, // 交换机类型 路由模式接收
		true,
		false,
		false,
		false,
		nil,
	)
	if err != nil {
		return err
	}

	// 声明 死信队列(用于与死信交换机绑定)
	q, err := r.channel.QueueDeclare(
		queueName,
		true,
		false,
		false,
		false,
		nil,
	)
	if err != nil {
		return err
	}

	// 绑定队列到 exchange 中
	err = r.channel.QueueBind(
		q.Name,
		routing,
		dlxExchange,
		false,
		nil)
	if err != nil {
		return err
	}

	// 消费消息
	data, err := r.channel.Consume(
		q.Name,
		"",
		false,
		false,
		false,
		false,
		nil,
	)
	if err != nil {
		return err
	}

	forever := make(chan bool)
	go func() {
		for d := range data {
			fmt.Printf("Received a message: %s\n", d.Body)
			f(d.Body)
		}
	}()
	<-forever
	return nil
}

RabbitMQ工作模型与基本原理

rabbitmq消息队列原理

go操作RabbitMQ



About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK